0

I am new to Python and coroutines, I am trying to leverage Python's asyncio library to parallel process a blocking function. I am using python 3.8.6. I have a blocking function that takes in a different input from an array inputs, I need the blocking functions for each input to run in parallel.

For example, if this is the time taken for each function to complete:

blocking_function(5) - takes 5 seconds
blocking_function(3) - takes 3 seconds
blocking_function(2) - takes 2 seconds

I have tried this but they still seem to run sequentially:

input = [5,3,2]
async def main():
    tasks = [asyncio.create_task(blocking_function(input)) for input in inputs]
    result = await asyncio.gather(*tasks)
    print(result)

There are no errors running this but this executes the function sequentially and not in parallel. This implementation takes 8 secs. How do I get them to execute in parallel such that its done in 5 seconds?

5
  • Can you please state more clearly what exactly your problem is? Is there any errors, or if there's none, then please describe precisely what you expected. Also, the current code provided isn't enough for others to diagnose the problem. Please see minimal reproducible code.
    – Luke L
    Commented Aug 8, 2024 at 18:55
  • 1
    "to parallel process a blocking function" Concurency and parallelism are two different things. Concurency can be achieved with only 1 thread. asyncio.create_task does not seems to do things in parallel based on the documentation. Commented Aug 8, 2024 at 19:24
  • I have updated the question to make it easier to understand. Hope this helps @LukeL Commented Aug 8, 2024 at 21:04
  • Oh i see, would you happen to how can I achieve parallelism in my case? @JérômeRichard Commented Aug 8, 2024 at 21:05
  • I would use joblib but the solution of jsbueno seems interesting Commented Aug 8, 2024 at 21:18

1 Answer 1

1

Any blocking function has to be wrapped in something which makes sense as an awaitable in an asynchonous environment.

The straightforward way to do that is to use the loop.run_in_executor call, which will call your blocking code in a separate thread (or even in a separate process) -

import asyncio

...

async def main():
    loop = asyncio.get_running_loop()
    tasks = [asyncio.create_task(loop.run_in_executor(None, blocking_function, input)) for input in inputs]
    result = await asyncio.gather(*tasks)
    print(result)

Check the docs at https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor , and optionally instantiate your own executor from concurrent.futures, instead of using the default one by passing None as the first argument.


exchanging data between asyncio and off-thread workers

In the comments bellow we see that for the task at hand the blocking function is not simply a "leave" and have to exchange data which will should be fetched in the asyncio-loop thread.

One way to exchange data is to use queues - because reading an asyncio.Queue inside and asyncio task can be no blocking (the threaded version have to resort to pool the queue periodically, though)

Since unlimited "max" queues are not blocking to put, you can use an asyncio.Queue instance to pass data from the blocking function into an asyncio task - which can then perform an I/O operation and put the response back in a queue.Queue instance - which is pooled in the thread .

So, say:

import queue, asyncio
from functools import partial

...
def blocking_task(*args, incoming_queue, outgoing_queue):
      # code performing task setup with args
      
      for step in multi_steps:
          # step setup code
          ...
          # maybe repeat thse 2 lines more than once 
          # so that external data con be pre-fetched:
          fetch_data_from = <external_data_source>
          outgoing_queue.put_no_wait((incoming_queue, fetch_data_from))
          
          # this will block the thread until data is available:
          work_data = incoming_queue.get() 
          # CPU intensive, blocking code with work_data
          ...

async def data_fetcher(incoming_queue):
     running_tasks = set()
     while True:
          outgoing_queue, fetch_data_from = incomng_queue.get()
          task = asyncio.create_task(my_data_retriever_async_function(fetch_data_from))
          # need a trick here to eager bind the current value of 
          # outgoing_queue to the callback for the task:
          task.add_done_callback(
              lambda task, queue=outgoing_queue:
                  queue.put_no_wait(task.result())
              ) 
         running_tasks.add(task)
         # clear reference to completed data-fetching tasks:
         running_tasks = await asyncio.wait(tasks, timeout=0)
  

# for illustrativ purposes - this function might not even
# be needed, if no error handlign or pre-or-post
# processing is needed at this step - just
# call the external lib directly from `data_fetcher`  
async def my_data_retriever_async_function(source):
        return await whatever_io_lib.fetch(source)


async def main():
        
    incoming_queue = asyncio.Queue()
    data_fetcher_task = asyncio.create_task(data_fetcher(incoming_queue))

    loop = asyncio.get_running_loop()
 
    tasks = set()
    for input in inputs:
         outgoing_queue = queue.Queue()
         # we need partial to send named arguments
         callable = partial(
             blocking_function,
             input,
             outgoing_queue=incoming_queue,
             incoming_queue = queue.Queue(),  # create one queue for each external task to send results back
         )
         tasks.add(loop.run_in_executor(callable))
    result = await asyncio.gather(*tasks)
    data_fetcher_task.cancel()
    print(result)

And this is actuallya peculiar arranement onlu needed if the "blocking_function" needs asynchronous obtainable I/O data in the middle of its processing. A simpler design would be to move the for step in multi_steps: loop into an async function, and break the pre and pos data fetching parts in separate synchronous functions to be called. each, with run_in_executor.

I realizes this sounds complex - either you are getting these ideas up to here, which I hope so, or you will probably need some paid-for consulting work - sorry, these answers can't meaningfully cover all possible alternatives.

7
  • hmm, I tried that but I am getting this error: TypeError: a coroutine was expected, got <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/futures.py:360]> Commented Aug 8, 2024 at 21:54
  • Ah, sorry - as you didn´t post the code for your blocking_function, I could not make the modifications in its declaration (and eventual preparing) in order this pattern can work. The argument to run_in_executor has to be a plain synchronous Python function.
    – jsbueno
    Commented Aug 8, 2024 at 22:24
  • in other words: it should not be declared with async def, nor otherwise wrapped with a Task or Future. That may require some code reorganzation so that all I.O. operations take place in async functions called in the usual way, and the data needed by the blocking functions is passed in as arguments. If the blocking code is intertwined with I/O code and it is hard to separate, you may need more sophisticated strategies such as the use of queues and calling back co-routines from the off-thread worker.
    – jsbueno
    Commented Aug 8, 2024 at 22:32
  • Why not use asyncio.to_thread? Commented Aug 9, 2024 at 20:40
  • Hmm the issue is, the inside of the blocking function there is a loop that needs to run sequentially. Hence, cant really modify anything within the blocking_function. hmm, so you think this would require a more complicated strategy? would you happen to know where I can read more about these strategies? @jsbueno Commented Aug 9, 2024 at 23:02

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.