-1

I am running multiple asyncio. Task to execute user request faster on my fastAPI websocket server. However occasionally (once per week), my whole async loop freezes on line where I check if one of my task done or not (task.done()). As a result whole fastAPI server hangs unless restart.

With some research i have understood that done() is just checking memory flag and there is no way issue is fundamentally caused by done method. I already tried wrapping the task with asyncio.wait_for and added proper try except handlers. However I did not receive any error log or something suspicious.

Right now I am thinking of running my code that checks if other task is done() or not on different thread and wait for it. But the root cause is still bugging me.

Here is not exact but functionally identical code to my server. One of the biggest issue is this code freezes occasionally, making it harder to reproduce.

staller_task = asyncio.create_task(self.staller(state))
pipeline_state = {}
db_search_task = asyncio.create_task(
            asyncio.wait_for(
                self.db_search_pipeline(state, pipeline_state),  # This code connects to weaviateDB and generates LLM answer. Also it is wrapped with "try, except Exception as e" block that logs all exception to console.
                timeout=30
            )
        )

total_resp = ""
use_db_answer = False
async for token in self.should_call_db_or_respond_directly(state)
    total_resp += token
    if '"search_db": true,' in total_resp:
        use_db_answer = True
        break
    elif '"answer_to_user": "' in total_resp:
        state.send_to_user(token)

if use_db_answer:
    stop_now = False
    while not stop_now:
        stop_now = True
        stop_now = db_search_task.done()  # This is the line (src/agent/react_cruzr.py:638)
        if len(pipeline_state["output_buffer"]) > 0:
            state.send_to_user(pipeline_state["output_buffer"])
            pipeline_state["output_buffer"] = ""
        await asyncio.sleep(0.1)

Following is py-spy dump when server is frozen

Process 9: /opt/conda/bin/python3.11 /opt/conda/bin/fastapi run --workers 1 src/channels/websocket_call/serve.py --port 8766
Python v3.11.10 (/opt/conda/bin/python3.11)

Thread 9 (active+gil): "MainThread"
    check_knowledge (src/agent/react_cruzr.py:638)  -> This line is where I check 'task.done()'
    run (asyncio/runners.py:118)
    run (asyncio/runners.py:190)
    run (uvicorn/server.py:66)
    run (uvicorn/main.py:580)
    _run (fastapi_cli/cli.py:162)
    run (fastapi_cli/cli.py:334)
    wrapper (typer/main.py:697)
    invoke (click/core.py:783)
    invoke (click/core.py:1434)
    invoke (click/core.py:1688)
    _main (typer/core.py:195)
    main (typer/core.py:740)
    __call__ (click/core.py:1157)
    __call__ (typer/main.py:322)
    main (fastapi_cli/cli.py:348)
    <module> (fastapi:8)
Thread 394 (idle): "pymongo_server_monitor_thread"
    receive_data (pymongo/network_layer.py:389)
    receive_message (pymongo/synchronous/network.py:315)
    receive_message (pymongo/synchronous/pool.py:588)
    _next_reply (pymongo/synchronous/pool.py:469)
    _check_with_socket (pymongo/synchronous/monitor.py:362)
    _check_once (pymongo/synchronous/monitor.py:327)
    _check_server (pymongo/synchronous/monitor.py:254)
    _run (pymongo/synchronous/monitor.py:207)
    target (pymongo/synchronous/monitor.py:76)
    _run (pymongo/periodic_executor.py:230)
    run (threading.py:982)
    _bootstrap_inner (threading.py:1045)
    _bootstrap (threading.py:1002)
Thread 395 (idle): "pymongo_kill_cursors_thread"
    _run (pymongo/periodic_executor.py:245)
    run (threading.py:982)
    _bootstrap_inner (threading.py:1045)
    _bootstrap (threading.py:1002)
Thread 396 (idle): "pymongo_server_rtt_thread"
    _run (pymongo/periodic_executor.py:245)
    run (threading.py:982)
    _bootstrap_inner (threading.py:1045)
    _bootstrap (threading.py:1002)
Thread 1036 (idle): "LogProcessingWorker"
    wait (threading.py:331)
    wait (threading.py:629)
    _delay_processing (logstash_async/worker.py:203)
    _fetch_events (logstash_async/worker.py:148)
    run (logstash_async/worker.py:92)
    _bootstrap_inner (threading.py:1045)
    _bootstrap (threading.py:1002)
Thread 1037 (idle): "ThreadPoolExecutor-1_0"
    _worker (concurrent/futures/thread.py:81)
    run (threading.py:982)
    _bootstrap_inner (threading.py:1045)
    _bootstrap (threading.py:1002)
Thread 1039 (idle): "ThreadPoolExecutor-1_1"
    _worker (concurrent/futures/thread.py:81)
    run (threading.py:982)
    _bootstrap_inner (threading.py:1045)
    _bootstrap (threading.py:1002)
2
  • Sorry - please post enough code to be a Minimal Reproducible Example, otherwise no one will be ever able to help you. This top level code is not even syntactic valid async code.
    – jsbueno
    Commented Apr 22 at 13:42
  • to be fair: if it is a problem that is answerable at all, it likely has to do with some deadlock taking place due to code running in other threads, not only pure asyncio code. Without a MRE, there is no way this can be answered.
    – jsbueno
    Commented 2 days ago

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.