1

If you were given an array of async functions and the task is to create a class that takes this array and runs the functions as fast as possible with a constraint that only 15 functions can run at the same time, what would be a way to do that?

If there wasn't a constraint for 15 functions, I believe Promise.all would be the way to go.

Using just async/await and waiting for one function to resolve to add the next one is very slow as we must have to wait for 1 function to resolve until we can add another one and we can thus have a bottleneck function.

Adding 15 functions to array and running them with Promise.all and after that resolves, adding another 15 or the rest of them, is again, not very efficient as what we want to do is to call another function as soon as one of the functions resolves.

Any ideas?

5
  • 1
    The ideal way would be to run the first 15 at once, and then as each of them resolve you start the next function - do you know how many functions there are ahead of time? Commented Jul 17, 2021 at 9:24
  • 1
    The input is of different size each time, so it can be any number of functions. Commented Jul 17, 2021 at 9:25
  • 1
    yes, but you know how many ahead of time, i.e. you're not adding functions to execute while functions are executing? Commented Jul 17, 2021 at 9:27
  • 1
    Yes, input array is of fixed size. So, you are given an array of 100 functions and you know that there are 100 functions in it. Commented Jul 17, 2021 at 9:29
  • I take it you'll need all the results too? Commented Jul 17, 2021 at 9:33

2 Answers 2

2

Let's create a stack that has an async popAsync method:

const createAsyncStack = () => {
    const stack = [];
    const waitingConsumers = [];
    const push = (v) => {
        if (waitingConsumers.length > 0) {
            const resolver = waitingConsumers.shift();
            resolver && resolver(v);
        }
        else {
            stack.push(v);
        }
    };
    const popAsync = () => {
        if (stack.length > 0) {
            const queueItem = stack.pop();
            return typeof queueItem !== 'undefined' 
                ? Promise.resolve(queueItem) 
                : Promise.reject(Error('unexpected'));
        }
        else {
            return new Promise((resolve) => waitingConsumers.push(resolve));
        }
    };
    return [push, popAsync];
};

This means that any consumer calling popAsync will be returned a Promise that only completes if / when an item is available in the stack.

We can now use this stack as a "gatekeeper" for a simple higher-order function (i.e. a function that returns a function).

Say we only want to allow maxDOP (maximum degrees-of-parallelism) concurrent invocations of an async function, we push maxDOP tokens into the stack (here, I've used empty objects as the tokens), then require that in order to proceed, it is necessary to acquire a token from this stack. When our function call is finished, we return our token to the stack (using push), where that token can then be consumed by any waiting consumers.

const withMaxDOP = (f, maxDop) => {
    const [push, popAsync] = createAsyncStack();
    for (let x = 0; x < maxDop; ++x) {
        push({});
    }
    return async (...args) => {
        const token = await popAsync();
        try {
            return await f(...args);
        }
        finally {
            push(token);
        }
    };
};

The function returns a new function that can be called in exactly the same way as the function that is supplied to it (i.e. is has the same signature).

Now, let's create a function that simply calls a supplied function with the supplied arguments:

const runAsync = (asyncFn, ...args) => asyncFn(...args);

and wrap it using the higher-order withMaxDOP function, which will return a new function with an identical signature to the wrapped function:

const limitedRunAsync = withMaxDOP(runAsync, 15);

Now we can use this function to call the functions in our array:

Promise.all(asyncFns.map(f => limitedRunAsync(f)))
    .then((returnValues) => console.log("all finished", returnValues));

which will ensure that there are only ever 15 "in-flight" invocations ever permitted at one time.

See this runnable snippet for a full example:

const createAsyncStack = () => {
  const stack = [];
  const waitingConsumers = [];
  const push = (v) => {
    if (waitingConsumers.length > 0) {
      const resolver = waitingConsumers.shift();
      resolver && resolver(v);
    } else {
      stack.push(v);
    }
  };
  const popAsync = () => {
    if (stack.length > 0) {
      const queueItem = stack.pop();
      return typeof queueItem !== 'undefined' ? Promise.resolve(queueItem) : Promise.reject(Error('unexpected'));
    } else {
      return new Promise((resolve) => waitingConsumers.push(resolve));
    }
  };
  return [push, popAsync];
};

const withMaxDOP = (f, maxDop) => {
  const [push, popAsync] = createAsyncStack();
  for (let x = 0; x < maxDop; ++x) {
    push({});
  }
  return async(...args) => {
    const token = await popAsync();
    try {
      return await f(...args);
    } finally {
      push(token);
    }
  };
};

const runAsync = (asyncFn, ...args) => asyncFn(...args);
const limitedRunAsync = withMaxDOP(runAsync, 15);

// set up an array of async functions
const delay = (durationMS) => new Promise((resolve) => setTimeout(() => resolve(), durationMS));
const asyncFns = [...Array(50)].map((_, i) => () => {
  console.log("starting " + i);
  return delay(Math.random() * 5000).then(v => {
    console.log("finished " + i);
    return i;
  });
});



// ...then wrap and call them all at once
Promise.all(asyncFns.map(f => limitedRunAsync(f))).then((returnValues) => console.log("all finished", returnValues));

...and see this TypeScript Playground Link for a fully type-annotated version of the same code.

Sign up to request clarification or add additional context in comments.

Comments

1

Here's something I whipped up in the last 20 minutes that should do the job

I'm sure if I thought about it I could probably do it without the Promise constructor, but ... 20 minutes is 20 minutes :p

Please, if someone can rewrite this without the Promise constructor, I'd love to see it - because in the back of my mind, I'm sure there is a way

Note, this will run regardless of rejections

Results will be either

result: actualResult

or

error: rejectionReason

So you can process results/rejections

function runPromises(arrayOfFunctions, maxLength) {
    return new Promise(resolve => {
        const queue = arrayOfFunctions.map((fn, index) => ({fn, index}));
        const results = new Array(arrayOfFunctions.length);
        let finished = 0;
        const doQ = () => {
            ++finished;
            if (queue.length) {
                const {fn, index} = queue.shift();
                fn()
                .then(result => results[index] = {result})
                .catch(error => results[index] = {error})
                .finally(doQ);
            } else {
                if (finished === arrayOfFunctions.length) {
                    resolve(results);
                }
                
            }
        };
        queue.splice(0, maxLength).forEach(({fn, index}) => fn()
            .then(result => results[index] = {result})
            .catch(error => results[index] = {error})
            .finally(doQ)
        );
    });
}
//
// demo and show that maximum 15 inflight requests
//
let inFlight = 0;
let maxInFlight = 0;
const fns = Array.from({length:50}, (_, i) => {
    return () => new Promise(resolve => {
      ++inFlight;
      maxInFlight = Math.max(inFlight, maxInFlight);
      setTimeout(() => {
        --inFlight;
        resolve(i);
      }, Math.random() * 200 + 100,)
    });
});
runPromises(fns, 15).then(results => console.log(maxInFlight, JSON.stringify(results)));

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.