3

Context. I want to run a few LLM requests in parallel. I'm using Flask, LangChain, and asyncio. Flask is installed with the "async" extra.

Problem. Every second (!) time the app handles a request, I get the following note in the terminal, repeated as many times as there are items in the analyses list:

ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-44' coro=<AsyncClient.aclose() done, defined at [path]\flask-server\venv\Lib\site-packages\httpx\_client.py:2011> exception=RuntimeError('Event loop is closed')>
[Traceback without references to application files]
RuntimeError: Event loop is closed

The issue doesn't seem to be critical, as the app just goes on to send a response as expected.

Questions: How can I prevent the Runtime Errors? Also, what happening here? Which event loops are getting closed at which point, and why is that a problem?

Code

# logic
import asyncio

async def run_analyses(analyses):
    tasks = []
    for analysis in analyses:
        task = asyncio.create_task(run_analysis(analysis))
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    return results

async def run_analysis(analysis):
    messages = ...
    llm = ChatAnthropic(...)
    response = await llm.ainvoke(messages)
    result = response.content
    return result

# app.py using Flask
import asyncio

@app.route('/process', methods=['POST'])
async def process():
    data = request.json
    analyses = data['analyses']
    results = await run_analyses(analyses)
    return jsonify(results)

if __name__ == '__main__':
    app.run(debug=True, port=5000)

Things I tried

  • await the main app.run
  • with and without asyncio.create_task
  • 1h of following GPT4's suggestions to no avail :)
1
  • Please clarify your specific problem or provide additional details to highlight exactly what you need. As it's currently written, it's hard to tell exactly what you're asking. Commented Mar 7, 2024 at 6:49

2 Answers 2

4

TL;DR: Try

llm = ChatAnthropic(
    ...,
    default_headers={
        "Connection": "close",
    },
)

Details:

I was seeing a similar effect (non-fatal RuntimeError: Event loop is closed from httpx/asyncio) in a slightly different setup:

  • Streamlit instead of Flask
  • ChatOpenAI instead of ChatAnthropic
  • asyncio.run() instead of asyncio.create_task()/asyncio.gather()

However I'm pretty sure the root cause is the same: httpx using connection pooling, but not playing nicely with different consecutive asyncio event loops. Source: https://github.com/encode/httpx/discussions/2959

The workaround depicted there consists of sending a custom Connection: close HTTP header together with all requests. In my case I could achieve that via the ChatOpenAI(default_headers=...) argument.

1

There are Flask dedicated to asgi, I suggest you can try using Quart.

This one is similar to Flask and is designed async to meet Flask requirements and is compatible with the flask libraries.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.