0

My code requests two api gets and compares them. I'm iterating against a dictionary in a for loop to determine where to point the get request. Sending GET, waiting for reply, then performing math, then moving to next item in dictionary takes a long time because of the waiting for reply.

There are numerous conversations that asyncio is a solution to this, but the code chunks I see in answers don't seem to help (or I can't figure out how to apply). See here, here, here for excellent conversations on asyncio. I'm fairly certain that if I batch the requests it will speed up iteration significantly. IE send 20 requests (arbitrary), get 20 responses, compare the outputs, repeat.

Here is my non-async code, modified a bit for readability.

value_ops = {'name1':['text','ticker1','ticker2','ticker3'],
'name2':['text','ticker1','ticker2','ticker3'],
...
}

# This are the two api GET reqeusts
def pm_check(od_pm,pSide):
    #[code cunk here] output is json that I'm grabbing two values from


def ks_check(od_ks,kSide,direction):
    #[code cunk here] output is json that I'm grabbing two values from

def opp_check(tradename,ticker1,ticker2,ticker3):
    pm_check(ticker1,'asks')
    ks_check(ticker2,'yes','buy')
    #comparison math code chunk omitted

for value in value_ops.values()
    opp_check(*value)

So the code that's working iterates against the dictionary to grab the values, feeds them into the api call function, and does some calculation, and then goes on to the next value to repeat.

I think ending point would be to send all the api requests at same time, store them in a table, and then do some calcs on that entire table. IE in a batch of 20 etc.

What I was trying as a starting point was:

value_ops = {'name1':['tradename','ticker1','ticker2','ticker3'],
'name2':['tradename','ticker1','ticker2','ticker3'],
...
}

# This are the two api GET reqeusts
async def pm_check(od_pm,pSide):
    #[code cunk here] output is json that I'm grabbing two values from


async def ks_check(od_ks,kSide,direction):
    #[code cunk here] output is json that I'm grabbing two values from

async def opp_check(tradename,ticker1,ticker2,ticker3):
    pm_check(ticker1,'asks')
    ks_check(ticker2,'yes','buy')
    #comparison math code chunk omitted

import asyncio
async def process_all():
    tasks = []
    async for value in value_ops.values():
        task = asyncio.create_task(opp_check(*value))
        tasks.append(task)
    await asyncio.gather(*tasks)
    
asyncio.run(process_all())

I was hoping that this would let me proof of concept async iteration on my loop. It's throwing a runtimeerror [asyncio.run() cannot be called from a running event loop]. I suspect that while I can get past this error, the result won't actually be what I'm looking for.

Any feedback on how to speed this up is appreciated. I had also tried multiprocessing but that did nothing to speed me up (which I think makes sense this isn't a cpu issues it's the downtime while waiting for the GET reply).

EDIT: I'm using anaconda environment if relevant.

2
  • 2
    Does this help with your question? You might also need to specify your running environment, like it's a standalone script, a IPython or Jupyter environment.
    – RibomBalt
    Commented Nov 30, 2024 at 2:31
  • Good callout - I'm using conda Commented Dec 1, 2024 at 16:58

2 Answers 2

1

You say, "It's throwing a runtimeerror [asyncio.run() cannot be called from a running event loop]" but provide no details as to where it is happening (a stack trace and a complete minimal, reproducible, example would have been helpful). You can test for an already-running event loop as follows:

async def foo():
    ...

# This could raise an exception if there is already a running event loop:
#asyncio.run(foo())
# So try instead:
try:
    loop = asyncio.get_running_loop()
except RuntimeError:
    # No running loop
    loop = asyncio.new_event_loop()  # Create a new loop
    asyncio.set_event_loop(loop)

loop.run_run_until_complete(foo())
loop.close()

Now for the code you did post:

First, your process_all can be simplifed a bit because asyncio.gather will create the necessary tasks if you pass it coroutines. But the real problem is with function opp_check. Currently you have expressions such as pm_check(ticker1,'asks') and ks_check(ticker2,'yes','buy'), which evaluate to coroutines. To actually run these, you need to await them. Moreover, you would like to run them concurrently, so asyncio.gather would be appropriate here, too:

import asyncio
import time

value_ops = {
    'name1': ['tradename','ticker1','ticker2','ticker3'],
    'name2': ['tradename','ticker1','ticker2','ticker3'],
    'name3': ['tradename','ticker1','ticker2','ticker3'],
    'name4': ['tradename','ticker1','ticker2','ticker3'],
    'name5': ['tradename','ticker1','ticker2','ticker3'],
}

# This are the two api GET reqeusts
async def pm_check(od_pm, pSide):
    print('pm_check started at', time.time())
    await asyncio.sleep(1)  # simulate network activity
    print('pm_check ended at time', time.time())
    # Return JSON
    return "abc"

async def ks_check(od_ks, kSide, direction):
    print('ks_check started at', time.time())
    await asyncio.sleep(1)  # simulate network activity
    print('ks_check ended at time', time.time())
    # Return JSON
    return "abc"

async def opp_check(tradename, ticker1, ticker2, ticker3):
    result1, result2 = await asyncio.gather(
        pm_check(ticker1, 'asks'),
        ks_check(ticker2, 'yes', 'buy')
    )

    ... # Compare results

async def process_all():
    await asyncio.gather(
        *(opp_check(*value) for value in value_ops.values())
    )

asyncio.run(process_all())

Prints:

pm_check started at 1732968318.333226
ks_check started at 1732968318.333226
pm_check started at 1732968318.333226
ks_check started at 1732968318.338283
pm_check started at 1732968318.338283
ks_check started at 1732968318.338283
pm_check started at 1732968318.338283
ks_check started at 1732968318.338283
pm_check started at 1732968318.338283
ks_check started at 1732968318.338283
pm_check ended at time 1732968319.3388453
ks_check ended at time 1732968319.3388453
pm_check ended at time 1732968319.3433995
pm_check ended at time 1732968319.3485363
pm_check ended at time 1732968319.3535638
ks_check ended at time 1732968319.3535638
ks_check ended at time 1732968319.3587465
ks_check ended at time 1732968319.3685863
pm_check ended at time 1732968319.3685863
ks_check ended at time 1732968319.3685863

As you can see, in the simulation above all instances of pm_check and ks_check are executing concurrently. But this is just a simulation. What you haven't specified is what API you are using and how it is being invoked. Your real code would be replacing the statement await asyncio.sleep(1) with the actual API request. If you want all the API requests to run concurrently, then there are three possibilities:

  1. The API is invoked using, for example, a GET or POST request to some URL. In this case you do not want to use the requests package, which does not support asyncio and would end up preventing concurrent API calls. Instead use the aiohttp package or httpx package, both installable from the PyPi repository.
  2. The API hides to the user whatever network activity it requires, but provides an asyncio-compatible interface.
  3. The API hides to the user whatever network activity it requires, but does not provide an asyncio-compatible interface.

If case 3 is your situation, you will need to invoke you API in another thread using asyncio.loop.run_in_executor as follows:

async def pm_check(od_pm, pSide):
    loop = asyncio.get_running_loop()
    # We assume there is a non-asyncio function my_api_function
    # that takes as argument od_pm and pSide:
    result = await loop.run_in_executor(None, my_api_function, od_pm, pSide)
    return result

The above code will use a default thread pool whose size will limit the degree of concurrency.

You haven't specified how large value_ops is. Generally, there is no problem creating hundreds, if not thousands, of asyncio tasks. But if you wanted or needed to submit API requests in batches, then:

async def process_all():
    batch = []

    for value in value_ops.values():
        batch.append(opp_check(*value))
        if len(batch) == 20:  # a batch size of 20
            await asyncio.gather(*batch)
            batch = []

    if batch:
        await asyncio.gather(*batch)

Update

I am not familiar with anaconda nor its peculiarities. You get the error message that there is an already running event loop yet when you try await asyncio.create_task(process_all()) (or more simply await process_all()), you get a syntax error stating that await is being executed outside (an async) function. Is there a reason why you cannot execute your script from the shell's command line?

You might try running your async function in a new thread as follows:

import asyncio
import threading

_loop = None

def run_async(coro):
    """await a coroutine from a synchronous function/method."""

    global _loop

    if _loop is None:
        _loop = asyncio.new_event_loop()
        threading.Thread(target=_loop.run_forever, name="Async Runner", daemon=True).start()

    result = asyncio.run_coroutine_threadsafe(coro, _loop).result()
    return result

run_async(process_all())
8
  • Thank you for the thoughtful reply, I am grateful. Commented Dec 1, 2024 at 17:13
  • When I try to run the code snip you created ref testing the async loop, I'm getting "RuntimeError: asyncio.run() cannot be called from a running event loop" from the asyncio.run(process_all()) line. Any idea why it's working for you but not for me? Commented Dec 1, 2024 at 17:14
  • Are you referring to the first code snippet I posted that is followed by the resulting printout? If so, tell me what environment you are you running under and instead of asyncio.run(process_all()), try asyncio.get_running_loop().run_until_complete(process_all()). If asyncio.get_running_loop() does not generate a RuntimeError, then your environment has an event loop already running.
    – Booboo
    Commented Dec 1, 2024 at 17:27
  • I'm using anaconda environment. The asyncio.get_running_loop().run_until_complete(process_all()) gives "RuntimeError: This event loop is already running". (EDIT: And yes I was refering to the first code chunk you posted) Commented Dec 1, 2024 at 17:34
  • Try instead of asyncio.run(process_all()) the following: await asyncio.create_task(process_all()). You could/should also just run the code from a command line with python3 my_script.py
    – Booboo
    Commented Dec 1, 2024 at 17:40
0

a single asyncio.run call at the bottom of the script is how it should be used, if you are running a standalone script. Jupyter and other runtime environments may already have an asyncio loop ongoing - and actually if you are not trying to call asyncio,run from within your functions that is what is happening there.

Then you can create your gather call as an asyncio task - so that it can be called from a synchronous function, and them just wait for that to complete (if you are on an interactive enviroment,you can manually check for its .done() method.

Otherwise, yes - you organized your tasks in a way to take advantage of asyncio, as long as the API calls themselves are not blocking (' requests' is blocking - but you can switch to aiohttp or httpx: they are almost a drop-in replacement for requests, but the actuall .get or .post calls can be awaited: so while the requests are sent, and results come back, the event loop will process other requests concurrently.

%pip install httpx

import asyncio
import httpx

value_ops = {'name1':['tradename','ticker1','ticker2','ticker3'],
'name2':['tradename','ticker1','ticker2','ticker3'],
...
}

# This are the two api GET requests
async def pm_check(od_pm,pSide):
    #[code chunk here] output is json that I'm grabbing two values from
    async with httpx.AsyncClient() as client:
        # if answer 2 doesn't depend on answer1
        # these 2 can actually be made concurrent as well
        answer1 = await client.get(...)
        answer2 = await client.get(...)
        


async def ks_check(od_ks,kSide,direction):
    ...
    #[code chunk here] output is json that I'm grabbing two values from

async def opp_check(tradename,ticker1,ticker2,ticker3):
    pm_check(ticker1,'asks')
    ks_check(ticker2,'yes','buy')
    #comparison math code chunk omitted

import asyncio
async def process_all():
    tasks = []
    async for value in value_ops.values():
        task = asyncio.create_task(opp_check(*value))
        tasks.append(task)
    # The gather pattern will ensure you concurrency.
    # the only downside is it will create __all__ your
    # requests in one goe - if there is
    # some API limit, or throtling you might 
    # want to use a more detailed pattern to
    # ensue you emmit just a reasonable number
    # of requests at each time;
    await asyncio.gather(*tasks)
    
    
# and here, if you already running in an async loop, this will fail:
# asyncio.run(process_all())

# But you can create the `process_all` call as a task and check for
# its completion:

main_task = asyncio.create_task(process_all())

# and then, in other cell, check the return of `main_task.done()`


Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.