I am using an API client supplied by a vendor (Okta) that has very poor/old examples of running with async - for example (the Python documentation says not to use get_event_loop()
):
from okta.client import Client as OktaClient
import asyncio
async def main():
client = OktaClient()
users, resp, err = await client.list_users()
while True:
for user in users:
print(user.profile.login) # Add more properties here.
if resp.has_next():
users, err = await resp.next()
else:
break
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
This works, but I need to go through the returned results and follow various links to get additional information. I created a queue using asyncio
and I have the worker loop until the queue is empty. This also works.
I start running into issues when I try to have more than one worker - if the code throws an exception, the workers never return.
async def handle_queue(name, queue: asyncio.Queue, okta_client: OktaClient):
"""Handle queued API requests"""
while True:
log.info("Queue size: %d", queue.qsize())
api_req = await queue.get()
log.info('Worker %s is handling %s', name, api_req)
api_func = getattr(okta_client, f"list_{api_req['endpoint']}")
api_procs = getattr(sys.modules[__name__], api_req['processor'])
log.info('Worker %s is handling %s with api_func %s, api_proc %s', name, api_req, api_func, api_proc)
resp_data, resp, err = await api_func(**api_req['params'])
log.debug(resp_data)
while True:
for i in resp_data:
await api_proc(i, queue)
if resp.has_next():
resp_data, err = await resp.next()
else:
break
queue.task_done()
async def create_workers(queue: asyncio.Queue):
"""Reusable worker creation process"""
log.info('Creating workers')
workers = []
async with OktaClient() as okta_client:
for i in range(NUM_WORKERS):
log.info('Creating worker-%d', i)
worker = asyncio.create_task(handle_queue(f'worker-{i}', queue, okta_client))
workers.append(worker)
await queue.join()
for worker in workers:
worker.cancel()
await asyncio.gather(*workers, return_exceptions=True)
async def main():
"""Load Access Policies and their mappings and rules"""
queue = asyncio.Queue()
queue.put_nowait({'endpoint': 'policies', 'params': {'query_params': {'type': 'ACCESS_POLICY'}}, 'processor': 'process_policy'})
await create_workers(queue)
metadata['policy_count'] = len(data)
print(yaml.dump({'_metadata': metadata, 'data': data}))
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
# Hide the exception for a Ctrl-C
log.info('Keyboard Interrupt')
If an exception is thrown in handle_queue
(or any of the functions it calls), the program hangs. When I hit Ctrl-C, I get the exception along with a message asyncio task exception was never retrieved
. I understand this is because queue.join()
is waiting for queue.task_done()
to be called as many times as queue.put()
was called, but I don't understand why the exception isn't caught.
I tried wrapping the work in handle_queue
in a try
:
async def handle_queue(name, queue: asyncio.Queue, okta_client: OktaClient):
"""Handle queued API requests"""
while True:
try:
# REST OF THE FUNCTION
except Exception as e:
queue.task_done()
raise e
queue.task_done()
This way, the program execution does finish, but the exception still disappears. How can I capture the exception and still allow the program to finish?