I am running multiple asyncio. Task to execute user request faster on my fastAPI websocket server. However occasionally (once per week), my whole async loop freezes on line where I check if one of my task done or not (task.done()). As a result whole fastAPI server hangs unless restart.
With some research i have understood that done() is just checking memory flag and there is no way issue is fundamentally caused by done method. I already tried wrapping the task with asyncio.wait_for and added proper try except handlers. However I did not receive any error log or something suspicious.
Right now I am thinking of running my code that checks if other task is done() or not on different thread and wait for it. But the root cause is still bugging me.
Here is not exact but functionally identical code to my server. One of the biggest issue is this code freezes occasionally, making it harder to reproduce.
staller_task = asyncio.create_task(self.staller(state))
pipeline_state = {}
db_search_task = asyncio.create_task(
asyncio.wait_for(
self.db_search_pipeline(state, pipeline_state), # This code connects to weaviateDB and generates LLM answer. Also it is wrapped with "try, except Exception as e" block that logs all exception to console.
timeout=30
)
)
total_resp = ""
use_db_answer = False
async for token in self.should_call_db_or_respond_directly(state)
total_resp += token
if '"search_db": true,' in total_resp:
use_db_answer = True
break
elif '"answer_to_user": "' in total_resp:
state.send_to_user(token)
if use_db_answer:
stop_now = False
while not stop_now:
stop_now = True
stop_now = db_search_task.done() # This is the line (src/agent/react_cruzr.py:638)
if len(pipeline_state["output_buffer"]) > 0:
state.send_to_user(pipeline_state["output_buffer"])
pipeline_state["output_buffer"] = ""
await asyncio.sleep(0.1)
Following is py-spy dump when server is frozen
Process 9: /opt/conda/bin/python3.11 /opt/conda/bin/fastapi run --workers 1 src/channels/websocket_call/serve.py --port 8766
Python v3.11.10 (/opt/conda/bin/python3.11)
Thread 9 (active+gil): "MainThread"
check_knowledge (src/agent/react_cruzr.py:638) -> This line is where I check 'task.done()'
run (asyncio/runners.py:118)
run (asyncio/runners.py:190)
run (uvicorn/server.py:66)
run (uvicorn/main.py:580)
_run (fastapi_cli/cli.py:162)
run (fastapi_cli/cli.py:334)
wrapper (typer/main.py:697)
invoke (click/core.py:783)
invoke (click/core.py:1434)
invoke (click/core.py:1688)
_main (typer/core.py:195)
main (typer/core.py:740)
__call__ (click/core.py:1157)
__call__ (typer/main.py:322)
main (fastapi_cli/cli.py:348)
<module> (fastapi:8)
Thread 394 (idle): "pymongo_server_monitor_thread"
receive_data (pymongo/network_layer.py:389)
receive_message (pymongo/synchronous/network.py:315)
receive_message (pymongo/synchronous/pool.py:588)
_next_reply (pymongo/synchronous/pool.py:469)
_check_with_socket (pymongo/synchronous/monitor.py:362)
_check_once (pymongo/synchronous/monitor.py:327)
_check_server (pymongo/synchronous/monitor.py:254)
_run (pymongo/synchronous/monitor.py:207)
target (pymongo/synchronous/monitor.py:76)
_run (pymongo/periodic_executor.py:230)
run (threading.py:982)
_bootstrap_inner (threading.py:1045)
_bootstrap (threading.py:1002)
Thread 395 (idle): "pymongo_kill_cursors_thread"
_run (pymongo/periodic_executor.py:245)
run (threading.py:982)
_bootstrap_inner (threading.py:1045)
_bootstrap (threading.py:1002)
Thread 396 (idle): "pymongo_server_rtt_thread"
_run (pymongo/periodic_executor.py:245)
run (threading.py:982)
_bootstrap_inner (threading.py:1045)
_bootstrap (threading.py:1002)
Thread 1036 (idle): "LogProcessingWorker"
wait (threading.py:331)
wait (threading.py:629)
_delay_processing (logstash_async/worker.py:203)
_fetch_events (logstash_async/worker.py:148)
run (logstash_async/worker.py:92)
_bootstrap_inner (threading.py:1045)
_bootstrap (threading.py:1002)
Thread 1037 (idle): "ThreadPoolExecutor-1_0"
_worker (concurrent/futures/thread.py:81)
run (threading.py:982)
_bootstrap_inner (threading.py:1045)
_bootstrap (threading.py:1002)
Thread 1039 (idle): "ThreadPoolExecutor-1_1"
_worker (concurrent/futures/thread.py:81)
run (threading.py:982)
_bootstrap_inner (threading.py:1045)
_bootstrap (threading.py:1002)