Decorators
When looking at decorators, I ask 3 questions:
How long does it take me to make heads or tails of what the decorator is doing?
Does the decorated function still make sense as its original purpose?
Can the decorated function be rewritten without the decorator? If so, how would would it look?
In the majority of cases, decorators are overly complicated and not even the right tool for the job, as is most likely the case here. Let's see what can be done differently.
Consumer-Producer Concurrency Pattern
This example is a classic case of the consumer-producer concurrency pattern. The core idea is simple:
Something needs to produce data for others to use.
Something else needs to consume the data that was produced.
Both tasks should happen concurrently.
In most cases, a Queue should be use to handle the data transaction. Let's take a look at a simple example.
import asyncio
async def producer(queue: asyncio.Queue) -> None:
"""
Produces data and puts it in a queue to be retrieved elsewhere.
Places `None` to signal the final item.
"""
for i in range(10):
await queue.put(i)
print(f"Produced {i}.")
await asyncio.sleep(1)
await queue.put(None) # Signal finished.
async def consumer(queue: asyncio.Queue) -> None:
"""
Consumes data from a queue produced elsewhere.
Stop when receiving `None`.
"""
while True:
x = await queue.get()
if x is None:
break
print(f"Consumed {x}.")
async def main() -> None:
queue = asyncio.Queue()
await asyncio.gather(consumer(queue), producer(queue))
if __name__ == "__main__":
asyncio.run(main())
This example uses asyncio.Queue as a communication channel between a producer and a consumer. The producer places data it creates onto the queue, while the consumer waits for the producer to place data and runs some code with it when receiving it.
Combining asyncio with concurrent.futures
For starters, you generally don't want to create multiple ThreadPoolExecutors, this defeats the entire point of pooling (which isn't particularly relevant here as I'll explain). The way a pool works is it creates multiple threads/processes and shares them amongst several tasks by simple allocating new tasks to each thread/process when they finish. This allows you to avoid creating one thread/process for every single task, which can be expensive and slow code down instead of speeding code up. In this case, the issue is that you're not using the same executor for each, meaning there's no sharing involved. Similar to the queue, each pool should be shared between multiple things. Furthermore, concurrent.futures.Executor accepts a max_workers argument specifying how many threads/processes you want to use.
But wait! queue.put is an asynchronous task, how can we run it in our thread? This is where asyncio.run_coroutine_threadsafe(coro, loop).result() comes in. It essentially schedules the coroutine to the event loop and waits for the coroutine to finish in the other thread that the event loop lives in before continuing.
In the end, your code should ideally look more similar to this:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def producer(queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, ...) -> None:
...
asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
...
async def send_mail(queue: asyncio.Queue) -> None:
...
item = await queue.get()
...
async def mail_logging(queue: asyncio.Queue) -> None:
...
item = await queue.get()
...
async def main() -> None:
sent_mail = asyncio.Queue()
logged_mail = asyncio.Queue()
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=2) as executor:
await asyncio.gather(
asyncio.wrap_future(executor.submit(producer, sent_mail, loop, ...)),
asyncio.wrap_future(executor.submit(producer, logged_mail, loop, ...)),
send_mail(sent_mail),
mail_logging(logged_mail),
)
if __name__ == "__main__":
asyncio.run(main())
Edit
Default ThreadPoolExecutor
Rather than using asyncio.wrap_future(executor.submit(...)), you can use loop = asyncio.get_running_loop() and await loop.run_in_executor(executor, func, *args). The advantage of this form is that the executor may be None, which let's the loop use its default ThreadPoolExecutor. This method is equivalent to using the newer await asyncio.to_thread(func, *args, **kwargs) in newer versions.
This takes away the need to use your own ThreadPoolExecutor and let's you further leverage pooling with any other coroutines that are using the default ThreadPoolExecutor like you.
Note: To add **kwargs, use functools.partial.
Wrapping Queues
Now that the executor is completely unnecessary, you can easily wrap the queues into their own factory functions such as the following.
def _producer(queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, ...) -> None:
...
async def producer(...) -> asyncio.Queue:
loop = asyncio.get_running_loop()
queue = asyncio.Queue()
loop.run_in_executor(None, _producer, queue, loop, ...)
return queue
async def consumer() -> None:
queue = producer(...)
...
await queue.get()
...
Asynchronous Generators: Further Wrapping
Although using asyncio.Queue is what we want, it makes it more nuanced to use in the rest of the code. To make it friendlier to use, one can further wrap it using asynchronous generators. Asynchronous generators work the same way as normal generators, using yield but in an asynchronous function. To consume them, one can then use async for ... syntax. Let's see what this looks like.
def _producer(...):
...
async def producer(...):
# Initial setup.
queue = asyncio.Queue()
...
# The generator.
while True:
yield await queue.get()
async def consumer():
# Consume the generator.
async for item in producer(...):
print(item)
This way the usage is straightforward and there is never any queue seen by the user/consumer.