5

I am a new python programmer who is trying to write a 'bot' to trade on betfair for myself. (ambitious!!!!)

My problem that has arisen is this - I have the basics of an asyncio event loop running but I have noticed that if one of the coroutines fails in its process ( for instance an API call fails or a mongodb read) then the asyncio event loop just continues running but ignores the one failed coroutine

my question is how could I either restart that one coroutine automatically or handle an error to stop the complete asyncio loop but at the moment everything runs seemingly oblivious to the fact that something is not right and one portion of it has failed. In my case the loop never returned to the 'rungetcompetitionids' function after a database read was not successful and it never returned to the function again even though it is in a while true loop The usergui is not yet functional but only there to try asyncio

thanks Clive

import sys
import datetime
from login import sessiontoken as gst
from mongoenginesetups.setupmongo import global_init as initdatabase
from asyncgetcompetitionids import competition_id_pass as gci
from asyncgetcompetitionids import create_comp_id_table_list as ccid
import asyncio
import PySimpleGUI as sg

sg.change_look_and_feel('DarkAmber')

layout = [
    [sg.Text('Password'), sg.InputText(password_char='*', key='password')],
    [sg.Text('', key='status')],
    [sg.Button('Submit'), sg.Button('Cancel')]
]
window = sg.Window('Betfair', layout)


def initialisethedatabase():
    initdatabase('xxxx', 'xxxx', xxxx, 'themongo1', True)


async def runsessiontoken():
    nextlogontime = datetime.datetime.now()
    while True:
        returned_login_time = gst(nextlogontime)
        nextlogontime = returned_login_time
        await asyncio.sleep(15)


async def rungetcompetitionids(compid_from_compid_table_list):
    nextcompidtime = datetime.datetime.now()
    while True:
        returned_time , returned_list = gci(nextcompidtime, compid_from_compid_table_list)
        nextcompidtime = returned_time
        compid_from_compid_table_list = returned_list
        await asyncio.sleep(10)


async def userinterface():
    while True:
        event, value = window.read(timeout=1)
        if event in (None, 'Cancel'):
            sys.exit()
        if event != "__TIMEOUT__":
            print(f"{event} {value}")
        await asyncio.sleep(0.0001)


async def wait_list():
    await asyncio.wait([runsessiontoken(), 
                       rungetcompetitionids(compid_from_compid_table_list), 
                       userinterface()
                      ])


initialisethedatabase()
compid_from_compid_table_list = ccid()
print(compid_from_compid_table_list)
nextcompidtime = datetime.datetime.now()
print(nextcompidtime)
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_list())
loop.close()
2
  • Instead of asyncio.wait([x, y, z]) use asyncio.gather(x, y, z). If a coroutine raises, gather will also raise, thus stopping the loop. Using asyncio.wait() is a mistake unless you either inspect the return values or have a specialized use case such as return_when=FIRST_COMPLETED. Commented Sep 11, 2020 at 20:35
  • Thanks for this , it has been incorporated into my solution
    – Clive
    Commented Sep 16, 2020 at 20:31

1 Answer 1

2

A simple solution would be to use a wrapper function (or "supervisor") that catches Exception and then just blindly retries the function. More elegant solutions would include printing out the exception and stack trace for diagnostic purposes, and querying the application state to see if it makes sense to try and continue. For instance, if betfair tells you your account is not authorised, then continuing makes no sense. And if it's a general network error then retying immediately might be worthwhile. You might also want to stop retrying if the supervisor notices it has restarted quite a lot in a short space of time.

eg.

import asyncio
import traceback
import functools
from collections import deque
from time import monotonic

MAX_INTERVAL = 30
RETRY_HISTORY = 3
# That is, stop after the 3rd failure in a 30 second moving window

def supervise(func, name=None, retry_history=RETRY_HISTORY, max_interval=MAX_INTERVAL):
    """Simple wrapper function that automatically tries to name tasks"""
    if name is None:
        if hasattr(func, '__name__'): # raw func
            name = func.__name__
        elif hasattr(func, 'func'): # partial
            name = func.func.__name__
    return asyncio.create_task(supervisor(func, retry_history, max_interval), name=name)


async def supervisor(func, retry_history=RETRY_HISTORY, max_interval=MAX_INTERVAL):
    """Takes a noargs function that creates a coroutine, and repeatedly tries
        to run it. It stops is if it thinks the coroutine is failing too often or
        too fast.
    """
    start_times = deque([float('-inf')], maxlen=retry_history)
    while True:
        start_times.append(monotonic())
        try:
            return await func()
        except Exception:
            if min(start_times) > monotonic() - max_interval:
                print(
                    f'Failure in task {asyncio.current_task().get_name()!r}.'
                    ' Is it in a restart loop?'
                )
                # we tried our best, this coroutine really isn't working.
                # We should try to shutdown gracefully by setting a global flag
                # that other coroutines should periodically check and stop if they
                # see that it is set. However, here we just reraise the exception.
                raise
            else:
                print(func.__name__, 'failed, will retry. Failed because:')
                traceback.print_exc()


async def a():
    await asyncio.sleep(2)
    raise ValueError


async def b(greeting):
    for i in range(15):
        print(greeting, i)
        await asyncio.sleep(0.5)


async def main_async():
    tasks = [
        supervise(a),
        # passing repeated argument to coroutine (or use lambda)
        supervise(functools.partial(b, 'hello'))
    ]
    await asyncio.wait(
        tasks,
        # Only stop when all coroutines have completed
        # -- this allows for a graceful shutdown
        # Alternatively use FIRST_EXCEPTION to stop immediately 
        return_when=asyncio.ALL_COMPLETED,
    )
    return tasks


def main():
    # we run outside of the event loop, so we can carry out a post-mortem
    # without needing the event loop to be running.
    done = asyncio.run(main_async())
    for task in done:
        if task.cancelled():
            print(task, 'was cancelled')
        elif task.exception():
            print(task, 'failed with:')
            # we use a try/except here to reconstruct the traceback for logging purposes
            try:
                task.result()
            except:
                # we can use a bare-except as we are not trying to block
                # the exception -- just record all that may have happened.
                traceback.print_exc()

main()

And this will result in output like:

hello 0
hello 1
hello 2
hello 3
a failed, will retry. Failed because:
Traceback (most recent call last):
  File "C:\Users\User\Documents\python\src\main.py", line 30, in supervisor
    return await func()
  File "C:\Users\User\Documents\python\src\main.py", line 49, in a
    raise ValueError
ValueError
hello 4
hello 5
hello 6
hello 7
a failed, will retry. Failed because:
Traceback (most recent call last):
  File "C:\Users\User\Documents\python\src\main.py", line 30, in supervisor
    return await func()
  File "C:\Users\User\Documents\python\src\main.py", line 49, in a
    raise ValueError
ValueError
hello 8
hello 9
hello 10
hello 11
Failure in task 'a'. Is it in a restart loop?
hello 12
hello 13
hello 14
 exception=ValueError()> failed with:
Traceback (most recent call last):
  File "C:\Users\User\Documents\python\src\main.py", line 84, in main
    task.result()
  File "C:\Users\User\Documents\python\src\main.py", line 30, in supervisor
    return await func()
  File "C:\Users\User\Documents\python\src\main.py", line 49, in a
    raise ValueError
ValueError
6
  • Thanks to both above comments. I managed to get a hybrid solution going with both above ideas coming in handy
    – Clive
    Commented Sep 16, 2020 at 20:29
  • interesting solution. Do you have a version that also deals with coroutines that have multiple parameters?
    – LeanMan
    Commented Aug 12, 2021 at 5:38
  • 1
    Use a lambda or partial to wrap the coroutine function, as suggested in the main_async function.
    – Dunes
    Commented Aug 13, 2021 at 16:56
  • Thanks for the response! I was just reviewing your code and didn't know what the partial meant but this comment also helped me understand better. TY
    – LeanMan
    Commented Aug 14, 2021 at 2:40
  • I'm not too certain but is it possible there is a bug in the placement of start_times.append(monotonic()). If you place it in the while true scope it will potentially capture times that aren't exceptions. Shouldn't it be within the except Exception: scope?
    – LeanMan
    Commented Aug 15, 2021 at 3:52

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.