Let's create a stack that has an async popAsync method:
const createAsyncStack = () => {
const stack = [];
const waitingConsumers = [];
const push = (v) => {
if (waitingConsumers.length > 0) {
const resolver = waitingConsumers.shift();
resolver && resolver(v);
}
else {
stack.push(v);
}
};
const popAsync = () => {
if (stack.length > 0) {
const queueItem = stack.pop();
return typeof queueItem !== 'undefined'
? Promise.resolve(queueItem)
: Promise.reject(Error('unexpected'));
}
else {
return new Promise((resolve) => waitingConsumers.push(resolve));
}
};
return [push, popAsync];
};
This means that any consumer calling popAsync will be returned a Promise that only completes if / when an item is available in the stack.
We can now use this stack as a "gatekeeper" for a simple higher-order function (i.e. a function that returns a function).
Say we only want to allow maxDOP (maximum degrees-of-parallelism) concurrent invocations of an async function, we push maxDOP tokens into the stack (here, I've used empty objects as the tokens), then require that in order to proceed, it is necessary to acquire a token from this stack. When our function call is finished, we return our token to the stack (using push), where that token can then be consumed by any waiting consumers.
const withMaxDOP = (f, maxDop) => {
const [push, popAsync] = createAsyncStack();
for (let x = 0; x < maxDop; ++x) {
push({});
}
return async (...args) => {
const token = await popAsync();
try {
return await f(...args);
}
finally {
push(token);
}
};
};
The function returns a new function that can be called in exactly the same way as the function that is supplied to it (i.e. is has the same signature).
Now, let's create a function that simply calls a supplied function with the supplied arguments:
const runAsync = (asyncFn, ...args) => asyncFn(...args);
and wrap it using the higher-order withMaxDOP function, which will return a new function with an identical signature to the wrapped function:
const limitedRunAsync = withMaxDOP(runAsync, 15);
Now we can use this function to call the functions in our array:
Promise.all(asyncFns.map(f => limitedRunAsync(f)))
.then((returnValues) => console.log("all finished", returnValues));
which will ensure that there are only ever 15 "in-flight" invocations ever permitted at one time.
See this runnable snippet for a full example:
const createAsyncStack = () => {
const stack = [];
const waitingConsumers = [];
const push = (v) => {
if (waitingConsumers.length > 0) {
const resolver = waitingConsumers.shift();
resolver && resolver(v);
} else {
stack.push(v);
}
};
const popAsync = () => {
if (stack.length > 0) {
const queueItem = stack.pop();
return typeof queueItem !== 'undefined' ? Promise.resolve(queueItem) : Promise.reject(Error('unexpected'));
} else {
return new Promise((resolve) => waitingConsumers.push(resolve));
}
};
return [push, popAsync];
};
const withMaxDOP = (f, maxDop) => {
const [push, popAsync] = createAsyncStack();
for (let x = 0; x < maxDop; ++x) {
push({});
}
return async(...args) => {
const token = await popAsync();
try {
return await f(...args);
} finally {
push(token);
}
};
};
const runAsync = (asyncFn, ...args) => asyncFn(...args);
const limitedRunAsync = withMaxDOP(runAsync, 15);
// set up an array of async functions
const delay = (durationMS) => new Promise((resolve) => setTimeout(() => resolve(), durationMS));
const asyncFns = [...Array(50)].map((_, i) => () => {
console.log("starting " + i);
return delay(Math.random() * 5000).then(v => {
console.log("finished " + i);
return i;
});
});
// ...then wrap and call them all at once
Promise.all(asyncFns.map(f => limitedRunAsync(f))).then((returnValues) => console.log("all finished", returnValues));
...and see this TypeScript Playground Link for a fully type-annotated version of the same code.