-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Expand file tree
/
Copy pathtest_crossloop_client_cache.py
More file actions
186 lines (147 loc) · 7.65 KB
/
test_crossloop_client_cache.py
File metadata and controls
186 lines (147 loc) · 7.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""Tests for cross-loop client cache isolation fix (#2681).
Verifies that _get_cached_client() returns different AsyncOpenAI clients
when called from different event loops, preventing the httpx deadlock
that occurs when a cached async client bound to loop A is reused on loop B.
This test file is self-contained and does not import the full tool chain,
so it can run without optional dependencies like firecrawl.
"""
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import patch, MagicMock
from types import SimpleNamespace
import pytest
# ---------------------------------------------------------------------------
# Minimal stubs so we can import _get_cached_client without the full tree
# ---------------------------------------------------------------------------
def _stub_resolve_provider_client(provider, model, async_mode, **kw):
"""Return a unique mock client each time, simulating AsyncOpenAI creation."""
client = MagicMock(name=f"client-{provider}-async={async_mode}")
client.api_key = "test"
client.base_url = kw.get("explicit_base_url", "http://localhost:8081/v1")
return client, model or "test-model"
@pytest.fixture(autouse=True)
def _clean_client_cache():
"""Clear the client cache before each test."""
import importlib
# We need to patch before importing
with patch.dict("sys.modules", {}):
pass
# Import and clear
import agent.auxiliary_client as ac
ac._client_cache.clear()
yield
ac._client_cache.clear()
class TestCrossLoopCacheIsolation:
"""Verify async clients are cached per-event-loop, not globally."""
def test_same_loop_reuses_client(self):
"""Within a single event loop, the same client should be returned."""
from agent.auxiliary_client import _get_cached_client, _client_cache
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=_stub_resolve_provider_client):
client1, _ = _get_cached_client("custom", "m1", async_mode=True,
base_url="http://localhost:8081/v1")
client2, _ = _get_cached_client("custom", "m1", async_mode=True,
base_url="http://localhost:8081/v1")
assert client1 is client2, (
"Same loop should return the same cached client"
)
loop.close()
def test_different_loops_get_different_clients(self):
"""Different event loops must get separate client instances."""
from agent.auxiliary_client import _get_cached_client
results = {}
def _get_client_on_new_loop(name):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=_stub_resolve_provider_client):
client, _ = _get_cached_client("custom", "m1", async_mode=True,
base_url="http://localhost:8081/v1")
results[name] = (id(client), id(loop))
# Don't close loop — simulates real usage where loops persist
t1 = threading.Thread(target=_get_client_on_new_loop, args=("a",))
t2 = threading.Thread(target=_get_client_on_new_loop, args=("b",))
t1.start(); t1.join()
t2.start(); t2.join()
client_id_a, loop_id_a = results["a"]
client_id_b, loop_id_b = results["b"]
assert loop_id_a != loop_id_b, "Test setup error: same loop on both threads"
assert client_id_a != client_id_b, (
"Different event loops got the SAME cached client — this causes "
"httpx cross-loop deadlocks in gateway mode (#2681)"
)
def test_sync_clients_not_affected(self):
"""Sync clients (async_mode=False) should still be cached globally,
since httpx.Client (sync) doesn't bind to an event loop."""
from agent.auxiliary_client import _get_cached_client
results = {}
def _get_sync_client(name):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=_stub_resolve_provider_client):
client, _ = _get_cached_client("custom", "m1", async_mode=False,
base_url="http://localhost:8081/v1")
results[name] = id(client)
t1 = threading.Thread(target=_get_sync_client, args=("a",))
t2 = threading.Thread(target=_get_sync_client, args=("b",))
t1.start(); t1.join()
t2.start(); t2.join()
assert results["a"] == results["b"], (
"Sync clients should be shared across threads (no loop binding)"
)
def test_gateway_simulation_no_deadlock(self):
"""Simulate gateway mode: _run_async spawns a thread with asyncio.run(),
which creates a new loop. The cached client must be created on THAT loop,
not reused from a different one."""
from agent.auxiliary_client import _get_cached_client
# Simulate: first call on "gateway loop"
gateway_loop = asyncio.new_event_loop()
asyncio.set_event_loop(gateway_loop)
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=_stub_resolve_provider_client):
gateway_client, _ = _get_cached_client("custom", "m1", async_mode=True,
base_url="http://localhost:8081/v1")
# Simulate: _run_async spawns a thread with asyncio.run()
worker_client_id = [None]
def _worker():
async def _inner():
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=_stub_resolve_provider_client):
client, _ = _get_cached_client("custom", "m1", async_mode=True,
base_url="http://localhost:8081/v1")
worker_client_id[0] = id(client)
asyncio.run(_inner())
t = threading.Thread(target=_worker)
t.start()
t.join()
assert worker_client_id[0] != id(gateway_client), (
"Worker thread (asyncio.run) got the gateway's cached client — "
"this is the exact cross-loop scenario that causes httpx deadlocks. "
"The cache key must include the event loop identity (#2681)"
)
gateway_loop.close()
def test_closed_loop_client_discarded(self):
"""A cached client whose loop has closed should be replaced."""
from agent.auxiliary_client import _get_cached_client
loop1 = asyncio.new_event_loop()
asyncio.set_event_loop(loop1)
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=_stub_resolve_provider_client):
client1, _ = _get_cached_client("custom", "m1", async_mode=True,
base_url="http://localhost:8081/v1")
loop1.close()
# New loop on same thread
loop2 = asyncio.new_event_loop()
asyncio.set_event_loop(loop2)
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=_stub_resolve_provider_client):
client2, _ = _get_cached_client("custom", "m1", async_mode=True,
base_url="http://localhost:8081/v1")
assert client1 is not client2, (
"Client from closed loop should not be reused"
)
loop2.close()