2

I am using an API client supplied by a vendor (Okta) that has very poor/old examples of running with async - for example (the Python documentation says not to use get_event_loop()):

from okta.client import Client as OktaClient
import asyncio

async def main():
    client = OktaClient()
    users, resp, err = await client.list_users()
    while True:
        for user in users:
            print(user.profile.login) # Add more properties here.
        if resp.has_next():
            users, err = await resp.next()
        else:
            break

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

This works, but I need to go through the returned results and follow various links to get additional information. I created a queue using asyncio and I have the worker loop until the queue is empty. This also works.

I start running into issues when I try to have more than one worker - if the code throws an exception, the workers never return.

async def handle_queue(name, queue: asyncio.Queue, okta_client: OktaClient):
    """Handle queued API requests"""
    while True:
        log.info("Queue size: %d", queue.qsize())
        api_req = await queue.get()
        log.info('Worker %s is handling %s', name, api_req)

        api_func = getattr(okta_client, f"list_{api_req['endpoint']}")
        api_procs = getattr(sys.modules[__name__], api_req['processor'])
        log.info('Worker %s is handling %s with api_func %s, api_proc %s', name, api_req, api_func, api_proc)
 
        resp_data, resp, err = await api_func(**api_req['params'])
        log.debug(resp_data)
        while True:
            for i in resp_data:
                await api_proc(i, queue)
            if resp.has_next():
                resp_data, err = await resp.next()
            else:
                break

        queue.task_done()

async def create_workers(queue: asyncio.Queue):
    """Reusable worker creation process"""
    log.info('Creating workers')
    workers = []
    async with OktaClient() as okta_client:
        for i in range(NUM_WORKERS):
            log.info('Creating worker-%d', i)
            worker = asyncio.create_task(handle_queue(f'worker-{i}', queue, okta_client))
            workers.append(worker)

        await queue.join()
        for worker in workers:
            worker.cancel()

        await asyncio.gather(*workers, return_exceptions=True)
 
async def main():
    """Load Access Policies and their mappings and rules"""
    queue = asyncio.Queue()
    queue.put_nowait({'endpoint': 'policies', 'params': {'query_params': {'type': 'ACCESS_POLICY'}}, 'processor': 'process_policy'})

    await create_workers(queue)

    metadata['policy_count'] = len(data)
    print(yaml.dump({'_metadata': metadata, 'data': data}))

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        # Hide the exception for a Ctrl-C
        log.info('Keyboard Interrupt')

If an exception is thrown in handle_queue (or any of the functions it calls), the program hangs. When I hit Ctrl-C, I get the exception along with a message asyncio task exception was never retrieved. I understand this is because queue.join() is waiting for queue.task_done() to be called as many times as queue.put() was called, but I don't understand why the exception isn't caught.

I tried wrapping the work in handle_queue in a try:

async def handle_queue(name, queue: asyncio.Queue, okta_client: OktaClient):
    """Handle queued API requests"""
    while True:
        try:
            # REST OF THE FUNCTION
        except Exception as e:
            queue.task_done()
            raise e
        queue.task_done()

This way, the program execution does finish, but the exception still disappears. How can I capture the exception and still allow the program to finish?

2 Answers 2

2

In function create_workers you have a call to await queue.join(). Every time an item is put to the queue a counter, which is initially 0, gets incremented. Every time a call to queue.task_done() is called, that same counter gets decremented. The call await queue.join() blocks until that counter is 0. If, for example, you call await queue.join() before any items have been placed on the queue, it automatically returns since the item count is 0. If N messages have been placed, the call will block until N calls to queue.task_done() have been made.

If your worker function, handle_queue, gets an item from the queue but then gets an exception before calling queue.task_done, the call to await queue.join() will block forever. Therein lies your problem: You must call queue.task_done() whether you get an exception or not. You therefore should use a try/finally block where the finally block calls queue.task_done():

async def create_workers(queue: asyncio.Queue):
    ...
    while True:
        try:
            ...
            api_req = await queue.get()
            ...
        finally:
            queue.task_done()

I wouldn't even re-raise any caught exception because the possibility exists that all of your workers will get an exception and terminate leaving one or more items on the queue and the call to await queue.join() will block.

1

For printing the error, pythons traceback module is especially helpful. Add import traceback to your imports and then use it like so:

async def handle_queue(name, queue: asyncio.Queue, okta_client: OktaClient):
    """Handle queued API requests"""
    while True:
        try:
            # REST OF THE FUNCTION
        except Exception as e:
            print(repr(e))
            print(traceback.format_exc())
        queue.task_done()

This will go to stdout instead of stderr, but should show like a standard python stack trace. If you really want it to go to stderr (e.g. for logging purposes), you can replace that print with print(traceback.format_exc(), file=sys.stderr) as well as importing sys.

It is worthwhile to note that running asyncio.gather with return_exceptions=True will actually not raise the exception (this may be your intended behavior). If you wish to keep running the program after an exception outside of the try loop occurs, then it should likely stay that way, though also note the exception will be returned as a result and should be handled if that happens. In general, async handles errors differently and they may go unnoticed if you do not explicitly handle them

1
  • 1
    catch instead of except was a transcription error (I typed it instead of copy/paste) - I corrected the question
    – yakatz
    Commented Apr 16 at 4:23

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.