0

I want to know if it's safe to create an asyncio event loop in one thread and run the loop in another while having the ability to cancel it from outside the thread in which the event loop is running. I want to run an async function in a thread while also having ability to cancel it from outside the thread.

Here is the code I am currently using. I want to know if this is the correct approach or if there is a simpler/better way to do this:

import asyncio
from concurrent.futures import ThreadPoolExecutor, CancelledError
import time

pool = ThreadPoolExecutor()
loop = asyncio.new_event_loop()


def start_event_loop(loop):
    asyncio.set_event_loop(loop)  # is this necessary?
    loop.run_forever()


pool.submit(start_event_loop, loop)


async def long_task():
    try:
        while True:
            print("Running...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")


future = asyncio.run_coroutine_threadsafe(long_task(), loop)

time.sleep(5)
loop.call_soon_threadsafe(future.cancel)

try:
    future.result()
except CancelledError:
    time.sleep(1)
    print("Future was cancelled")

loop.call_soon_threadsafe(loop.stop)  # can i reuse the loop?

while loop.is_running():
    print("Wating for loop to stop...")
    time.sleep(0.1)

loop.close() # is this necessary?
print("Loop stopped")

This is based on this article and this question.

Also, there are few more questions I'd like to ask:

  1. Is it necessary to use asyncio.set_event_loop? Both of them produce the same output.

  2. Is it okay to reuse the event loop after stopping it or should I create a new loop each time after stopping the loop?

  3. Is it necessary to close the loop after you are done with it?

3
  • 1
    1. run_forever calls set_event_loop. 3. all eventloops call close in their __del__, so it is not needed, just let it be garbage collected. you can reuse the loop from a technical point of view after you call stop, it is more like a pause than a stop ... but i'd recommend you create a new one to avoid any surprises
    – Ahmed AEK
    Commented Dec 14, 2024 at 9:49
  • @AhmedAEK I am getting an error when I don't close the loop. This occurs when using unit tests. It doesn't give any error when I close the loop, but I am kind of confused about how I should close the loop. Should I call it from the main thread since I created the loop there, or should I call it from the worker thread by adding loop.close() underneath loop.run_forever() in start_event_loop function? So once I call call_soon_threadsafe(loop.stop), it will close the loop inside the same thread that was running it. Is this the correct approach?
    – Jishnu
    Commented Dec 29, 2024 at 18:59
  • it should only be closed after it completely stops, i think stopping it right after run_forever is a safe step, if you are to close it anyway
    – Ahmed AEK
    Commented Dec 29, 2024 at 19:49

2 Answers 2

1

No, you do not need to call asyncio.set_event_loop nor do you need to call loop.close().

But the comment # can i reuse the loop? after you have stopped the loop is a bit baffling to me. Yes, you could reuse the loop after calling close but how would you reuse the loop in a way that is different than the way it is currently being used? I would simply not stop the loop at all. If it has no work to do then the thread it is running in should not be using any CPU cycles. I would just leave the loop "running" (albeit idle) so if you should again need to run a task in a thread other than the main one, you already the necessary running loop.

In fact, the code could be simplified to:

import asyncio
from concurrent.futures import CancelledError
import time
import threading

loop = asyncio.new_event_loop()
threading.Thread(target=loop.run_forever, daemon=True).start()

async def long_task():
    try:
        while True:
            print("Running...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")


future = asyncio.run_coroutine_threadsafe(long_task(), loop)

time.sleep(5)
loop.call_soon_threadsafe(future.cancel)

try:
    future.result()
except CancelledError:
    # Why is this call to time.sleep being made?
    #time.sleep(1)
    print("Future was cancelled")

Prints:

Running...
Running...
Running...
Running...
Running...
Future was cancelled
Task was cancelled

Update

First, after testing the above code on Linux (I had been testing only under Windows), it now seems that you do need to cancel a future from the thread that is running the event loop by using for example loop.call_soon_threadsafe(future.cancel).

If for some reason you need to close the loop, this has to be done after the loop has been stopped and the original invocation of asyncio.loop.run_forever() in the child thread has returned. To be completely safe, close should be called from the same thread in which the loop had been running. This is because close attempts to call shutdown on any default executor that may have been started in response to a call to asyncio.loop.run_in_executor(None, some_function) and this may hang if called from a different thread.

So now we need a new runner function that calls asyncio.loop.run_forever() and then call asyncio.loop.close(). Here are the changes we need to make:

  1. Save a reference to the child thread that is running the event loop. The child thread no longer needs to be a daemon thread since it will be terminating. In this thread we run worker function runner that will call asyncio.loop.run_forever followed by asyncio.loop.close. The asyncio.loop.run_forever call will terminate eventually in response to asyncio.loop.stop being called. Since the stop method only sets a flag to tell the loop to stop running, you should be able to call this function from any thread. But I have had an issue where the code hangs under Linux. So you should call stop in the thread running the event loop with loop.call_soon_threadsafe(loop.stop).
  2. After the call to close has completed, the child thread will terminate. We will know that the close has completed by joining the child thread.
import asyncio
from concurrent.futures import CancelledError
import time
import threading

def runner(loop):
    loop.run_forever()
    loop.close()

loop = asyncio.new_event_loop()
t = threading.Thread(target=runner, args=(loop,))
t.start()

def worker():
    print('worker starting')
    time.sleep(1)
    print('worker ending')

async def long_task():
    try:
        await asyncio.get_running_loop().run_in_executor(None, worker)
        while True:
            print("Running...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")


future = asyncio.run_coroutine_threadsafe(long_task(), loop)

time.sleep(5)
loop.call_soon_threadsafe(future.cancel)

try:
    future.result()
except CancelledError:
    print("Future was cancelled")

# Now there are no tasks running in the event loop that need awaiting
# Stop the event loop (call stop in the thread running the loop):
loop.call_soon_threadsafe(loop.stop)
t.join()
# The loop should be closed now:
print(loop)

Prints:

worker starting
worker ending
Running...
Running...
Running...
Running...
Future was cancelled
Task was cancelled
<ProactorEventLoop running=False closed=True debug=False>
5
  • Hey, I am getting an error when I don't close the loop. This occurs when using unit tests. It doesn't give any error when I close the loop, but I am kind of confused about how I should close the loop. Should I call it from the main thread since I created the loop there, or should I call it from the worker thread by adding loop.close() underneath loop.run_forever() in start_event_loop function? So once I call call_soon_threadsafe(loop.stop), it will close the loop inside the same thread that was running it. Is this the correct approach?
    – Jishnu
    Commented Dec 29, 2024 at 18:56
  • You can call stop and close from the main thread (you have to call close from the main thread since it can only be called after the loop has stopped). You can even cancel the future from the main thread. See the Update to the above answer.
    – Booboo
    Commented Dec 29, 2024 at 19:36
  • hello there are a few doubts i would like to clear is it not necessary to wrap loop.stop inside a callsoon threadsafe shouldn't we call stop inside the worker thread? is this method in Update safe if so, can you please elaborate on why it's safe, and regarding closing the loop so i should close the loop from main thread right if so can you please elaborate on why this is so.
    – Jishnu
    Commented Dec 30, 2024 at 6:16
  • 1
    I have updated the Update. To be completely safe, the call to asyncio.loop.close should be made from the same thread running the event loop. Since asyncio.loop.stop() consists of a single statement, self._stopping = True, which will cause the event loop to stop at the end of its current iteration when it sees this flag has been set, I can't see why it would matter what thread sets this flag.
    – Booboo
    Commented Dec 30, 2024 at 10:49
  • 1
    Notwithstanding what I said in the previous comment, I do find that from time to time that if I call stop from the main thread, then I get the following message when the program exits: Task was destroyed but it is pending!. So it is prudent to call stop from the thread in which the event loop is executing.
    – Booboo
    Commented Jan 2 at 10:38
1

Simpler Approach:

import asyncio
import threading
import time

def start_event_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def long_task():
    try:
        while True:
            print("Running...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task cancelled")

# An event loop as a separate thread
loop = asyncio.new_event_loop()
thread = threading.Thread(target=start_event_loop, args=(loop,), daemon=True)
thread.start()

# Schedule task in event loop
future = asyncio.run_coroutine_threadsafe(long_task(), loop)

# Simulate some work & cancel after 5 seconds
time.sleep(5)
loop.call_soon_threadsafe(future.cancel)

try:
    future.result()  # Wait for task to finish & check for exceptions
except asyncio.CancelledError:
    print("Future cancelled")

# Stop & close loop
loop.call_soon_threadsafe(loop.stop)
thread.join()  # Wait for thread to exit
loop.close()
print("Loop stopped")
  1. The line asyncio.get_event_loop() can be skipped.

  2. After stopping the loop with loop.stop, it is not safe to re-use it.You can create a new loop with asyncio.new_event_loop.

  3. Yes, Closing the loop is necessary to release the resource properly.

2
  • Hey, I am confused about how I should close the loop. Should I call it from the main thread since I created the loop there, or should I call it from the worker thread by calling loop.close() right after loop.run_forever() in start_event_loop function? So once I call call_soon_threadsafe(loop.stop), it will close the loop inside the same thread that was running it. Is this the correct approach?
    – Jishnu
    Commented Dec 30, 2024 at 5:05
  • @Jishnu You should close the loop by adding a call to loop.close() in start_event_loop following the statement loop.run_forever(). When stop is called against the loop (from the thread running the event loop to be safe as you are doing) the run_forever method will soon terminate and then close will be called from the same thread. See my updated Update.
    – Booboo
    Commented Dec 30, 2024 at 11:46

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.