(Check out the community wiki answer I shared for another approach.)
In one of your comments, you mentioned:
I am working on a consensus algorithm where each source must send a response within a specified time frame. If some participants fail to respond (i.e., do not send values), the loop will hang indefinitely!
This situation sounds like it requires a timeout mechanism. Typically, timeouts are implemented using Promise.race
with a promise wrapped around a timer function (setTimeout
or similar). The Promise.race
function resolves as soon as any of the promises passed to it resolve, regardless of the others' outcomes.
To achieve this, you should consider looping in a different way instead of using for-await-of
, and directly utilizing the promise returned by the result object. Suppose you have a utility function like:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
This function returns a promise that resolves after X milliseconds with a provided value (if any).
Next:
(async () => {
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
if (result === GOT_TIMEOUT) {
// No response received within the allotted time
console.log("Timeout");
} else {
// Response received
if (result.done) {
// Iteration completed
console.log("Iteration complete");
break;
}
// Perform data processing on 'result.value'...
console.log(`Process ${result.value}`);
}
}
} finally {
try {
it.return?.(); // Close the iterator if necessary
} catch { }
}
})();
Live Example illustrating random durations for the async operations, but intentionally causing a timeout on the third iteration:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example();
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Timeout occurred
console.log(`Got timeout in ${elapsed}ms`);
} else {
// Response received
if (result.done) {
// Iteration completed
console.log(`Got iteration complete result in ${elapsed}ms`);
break;
}
// Perform data processing on 'result.value'...
console.log(`Got result ${result.value} to process in ${elapsed}ms`);
}
}
} finally {
try {
it.return?.();
} catch { }
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
A variation of the above scenario can be seen where a timeout is triggered during the first iteration only, addressing your concern regarding this particular case:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example();
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Timeout detected
console.log(`Got timeout in ${elapsed}ms`);
} else {
// Response obtained
if (result.done) {
// Iteration complete
console.log(`Got iteration complete result in ${elapsed}ms`);
break;
}
// Perform data processing on 'result.value'...
console.log(`Got result ${result.value} to process in ${elapsed}ms`);
}
}
} finally {
try {
it.return?.();
} catch { }
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
If you aim to continue collecting subsequent values without waiting for the processing, avoid using await
during processing tasks (consider queuing up an array of promises representing these tasks and resolving them collectively using Promise.all
at the end).
Alternatively, if you intend to terminate the entire operation prematurely:
(async () => {
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const results = [];
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
if (result === GOT_TIMEOUT) {
// No timely response received, bail out
console.log("Timeout");
break;
}
// Response acquired
if (result.done) {
// Iteration complete
console.log("Iteration complete");
break;
}
console.log(`Received ${result.value}`);
results.push(result.value);
}
} finally {
try {
it.return?.();
} catch { }
}
// ...processing logic for items stored in `results`...
for (const value of results) {
console.log(`Process ${value}`);
}
})();
Live Example:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example(); // For illustrative purposes
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const results = [];
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Timed out, exiting early
console.log(`Got timeout after ${elapsed}ms`);
break;
}
// Response obtained
if (result.done) {
// Iteration complete
console.log(`Got iteration complete after ${elapsed}ms`);
break;
}
console.log(`Got value ${result.value} after ${elapsed}ms`);
results.push(result.value);
}
} finally {
try {
it.return?.();
} catch { }
}
// ...processing logic for items stored in `results`...
for (const value of results) {
console.log(`Process ${value}`);
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
An alternative scenario could involve triggering a timeout on the initial pass but not thereafter (since bailing out on the first timeout means subsequent ones won't occur):
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example(); // For demonstrative purposes
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const results = [];
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// No timely response received, aborting
console.log(`Got timeout after ${elapsed}ms`);
break;
}
// Response obtained
if (result.done) {
// Iteration complete
console.log(`Got iteration complete after ${elapsed}ms`);
break;
}
console.log(`Got value ${result.value} after ${elapsed}ms`);
results.push(result.value);
}
} finally {
try {
it.return?.();
} catch { }
}
// ...processing logic for items stored in `results`...
for (const value of results) {
console.log(`Process ${value}`);
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
You may need to adjust this based on your specific requirements, but it provides a starting point for further development.
Throughout all scenarios described above:
- If optional chaining (${it.return?.();}) isn't supported in your environment, replace it with standard checks (${if (it.return) { it.return(); }}).
- If optional catch bindings (${catch { }}) aren't supported, replace them with explicit error handling (${catch (e) { }}).