2
\$\begingroup\$

I'm trying to change my sync code to asynchronous.
It's my first time doing it like this, so I don't know if this is the right way.

ASYNC_CONTAINER = []


def async_kafka_listener(*topics, **configs):
    from concurrent.futures import ThreadPoolExecutor

    pool = ThreadPoolExecutor()

    def get_message(fn: Callable):
        consumer = KafkaConsumer(*topics, **configs)

        while True:
            for message in consumer:
                asyncio.run(fn(message))

    def decorator(fn: Callable):
        async def wrapper():
            future = pool.submit(get_message, fn)
            async_future = asyncio.wrap_future(future)

            return async_future

        ASYNC_CONTAINER.append(wrapper())
        return wrapper

    return decorator


@async_kafka_listener(
    "mail",
    bootstrap_servers=["localhost:9092"],
    group_id="send-mail",
)
async def send_mail(message):
    print(f"send_mail : {message}")


@async_kafka_listener(
    "mail",
    bootstrap_servers=["localhost:9092"],
    group_id="mail-logging",
)
async def mail_logging(message):
    print(f"mail_logging : {message}")


async def main():
    await asyncio.gather(*ASYNC_CONTAINER)


asyncio.run(main())
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

Decorators

When looking at decorators, I ask 3 questions:

  • How long does it take me to make heads or tails of what the decorator is doing?

  • Does the decorated function still make sense as its original purpose?

  • Can the decorated function be rewritten without the decorator? If so, how would would it look?

In the majority of cases, decorators are overly complicated and not even the right tool for the job, as is most likely the case here. Let's see what can be done differently.

Consumer-Producer Concurrency Pattern

This example is a classic case of the consumer-producer concurrency pattern. The core idea is simple:

  • Something needs to produce data for others to use.

  • Something else needs to consume the data that was produced.

  • Both tasks should happen concurrently.

In most cases, a Queue should be use to handle the data transaction. Let's take a look at a simple example.

import asyncio

async def producer(queue: asyncio.Queue) -> None:
    """
    Produces data and puts it in a queue to be retrieved elsewhere.
    Places `None` to signal the final item.
    """
    for i in range(10):
        await queue.put(i)
        print(f"Produced {i}.")
        await asyncio.sleep(1)
    await queue.put(None)  # Signal finished.

async def consumer(queue: asyncio.Queue) -> None:
    """
    Consumes data from a queue produced elsewhere.
    Stop when receiving `None`.
    """
    while True:
        x = await queue.get()
        if x is None:
            break
        print(f"Consumed {x}.")

async def main() -> None:
    queue = asyncio.Queue()
    await asyncio.gather(consumer(queue), producer(queue))

if __name__ == "__main__":
    asyncio.run(main())

This example uses asyncio.Queue as a communication channel between a producer and a consumer. The producer places data it creates onto the queue, while the consumer waits for the producer to place data and runs some code with it when receiving it.

Combining asyncio with concurrent.futures

For starters, you generally don't want to create multiple ThreadPoolExecutors, this defeats the entire point of pooling (which isn't particularly relevant here as I'll explain). The way a pool works is it creates multiple threads/processes and shares them amongst several tasks by simple allocating new tasks to each thread/process when they finish. This allows you to avoid creating one thread/process for every single task, which can be expensive and slow code down instead of speeding code up. In this case, the issue is that you're not using the same executor for each, meaning there's no sharing involved. Similar to the queue, each pool should be shared between multiple things. Furthermore, concurrent.futures.Executor accepts a max_workers argument specifying how many threads/processes you want to use.

But wait! queue.put is an asynchronous task, how can we run it in our thread? This is where asyncio.run_coroutine_threadsafe(coro, loop).result() comes in. It essentially schedules the coroutine to the event loop and waits for the coroutine to finish in the other thread that the event loop lives in before continuing.

In the end, your code should ideally look more similar to this:

import asyncio
from concurrent.futures import ThreadPoolExecutor

def producer(queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, ...) -> None:
    ...
    asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
    ...

async def send_mail(queue: asyncio.Queue) -> None:
    ...
    item = await queue.get()
    ...

async def mail_logging(queue: asyncio.Queue) -> None:
    ...
    item = await queue.get()
    ...

async def main() -> None:
    sent_mail = asyncio.Queue()
    logged_mail = asyncio.Queue()
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(max_workers=2) as executor:
        await asyncio.gather(
            asyncio.wrap_future(executor.submit(producer, sent_mail, loop, ...)),
            asyncio.wrap_future(executor.submit(producer, logged_mail, loop, ...)),
            send_mail(sent_mail),
            mail_logging(logged_mail),
        )

if __name__ == "__main__":
    asyncio.run(main())

Edit

Default ThreadPoolExecutor

Rather than using asyncio.wrap_future(executor.submit(...)), you can use loop = asyncio.get_running_loop() and await loop.run_in_executor(executor, func, *args). The advantage of this form is that the executor may be None, which let's the loop use its default ThreadPoolExecutor. This method is equivalent to using the newer await asyncio.to_thread(func, *args, **kwargs) in newer versions.

This takes away the need to use your own ThreadPoolExecutor and let's you further leverage pooling with any other coroutines that are using the default ThreadPoolExecutor like you.

Note: To add **kwargs, use functools.partial.

Wrapping Queues

Now that the executor is completely unnecessary, you can easily wrap the queues into their own factory functions such as the following.

def _producer(queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, ...) -> None:
    ...

async def producer(...) -> asyncio.Queue:
    loop = asyncio.get_running_loop()
    queue = asyncio.Queue()
    loop.run_in_executor(None, _producer, queue, loop, ...)
    return queue

async def consumer() -> None:
    queue = producer(...)
    ...
    await queue.get()
    ...

Asynchronous Generators: Further Wrapping

Although using asyncio.Queue is what we want, it makes it more nuanced to use in the rest of the code. To make it friendlier to use, one can further wrap it using asynchronous generators. Asynchronous generators work the same way as normal generators, using yield but in an asynchronous function. To consume them, one can then use async for ... syntax. Let's see what this looks like.

def _producer(...):
    ...

async def producer(...):
    # Initial setup.
    queue = asyncio.Queue()
    ...
    # The generator.
    while True:
        yield await queue.get()

async def consumer():
    # Consume the generator.
    async for item in producer(...):
        print(item)

This way the usage is straightforward and there is never any queue seen by the user/consumer.

\$\endgroup\$
6
  • \$\begingroup\$ First of all, thank you for the review. The reason why I used a decorator is that I worked on it to use it as a kind of router or controller. \$\endgroup\$ Commented Nov 10, 2022 at 5:22
  • \$\begingroup\$ Depending on the topic and group_id, there was an inconvenience of having to create a consumer every time, and people who processed and processed the data were We created a high-level decorator so that you don't have to know how to send and receive data. \$\endgroup\$ Commented Nov 10, 2022 at 5:23
  • \$\begingroup\$ If the decorator is not used, it is expected as below. pastebin.com/uU1K5fLc \$\endgroup\$ Commented Nov 10, 2022 at 5:25
  • \$\begingroup\$ Other questions Thank you for pointing out the ThreadPoolExecutors part. What do you think about using thread and async at the same time? Threads work asynchronously when they meet i/o, so async is not necessary, but I wonder what are the pros and cons of using them at the same time. \$\endgroup\$ Commented Nov 10, 2022 at 5:29
  • \$\begingroup\$ @gongul Updated the answer demonstrating how to further wrap the queues so that they're more friendly for those using it. Other than that, the only thing that "there was an inconvenience of having to create a consumer every time" sounds to me like is the fact that in order to use a decorator you would need to create a whole function for each case, whereas with my suggestion you can directly write async for item in producer(): in the middle of other code. \$\endgroup\$ Commented Nov 10, 2022 at 21:59

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.