Skip to content

Commit a9bc035

Browse files
Fix: Connection Pool race condition (livekit#3705)
1 parent 82d5a78 commit a9bc035

File tree

1 file changed

+23
-20
lines changed

1 file changed

+23
-20
lines changed

livekit-agents/livekit/agents/utils/connection_pool.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def __init__(
4141
self._connections: dict[T, float] = {} # conn -> connected_at timestamp
4242
self._available: set[T] = set()
4343
self._connect_timeout = connect_timeout
44+
self._connect_lock = asyncio.Lock()
4445

4546
# store connections to be reaped (closed) later.
4647
self._to_close: set[T] = set()
@@ -90,23 +91,24 @@ async def get(self, *, timeout: float) -> T:
9091
Returns:
9192
An active connection object
9293
"""
93-
await self._drain_to_close()
94-
now = time.time()
95-
96-
# try to reuse an available connection that hasn't expired
97-
while self._available:
98-
conn = self._available.pop()
99-
if (
100-
self._max_session_duration is None
101-
or now - self._connections[conn] <= self._max_session_duration
102-
):
103-
if self._mark_refreshed_on_get:
104-
self._connections[conn] = now
105-
return conn
106-
# connection expired; mark it for resetting.
107-
self.remove(conn)
108-
109-
return await self._connect(timeout)
94+
async with self._connect_lock:
95+
await self._drain_to_close()
96+
now = time.time()
97+
98+
# try to reuse an available connection that hasn't expired
99+
while self._available:
100+
conn = self._available.pop()
101+
if (
102+
self._max_session_duration is None
103+
or now - self._connections[conn] <= self._max_session_duration
104+
):
105+
if self._mark_refreshed_on_get:
106+
self._connections[conn] = now
107+
return conn
108+
# connection expired; mark it for resetting.
109+
self.remove(conn)
110+
111+
return await self._connect(timeout)
110112

111113
def put(self, conn: T) -> None:
112114
"""Mark a connection as available for reuse.
@@ -161,9 +163,10 @@ def prewarm(self) -> None:
161163
return
162164

163165
async def _prewarm_impl() -> None:
164-
if not self._connections:
165-
conn = await self._connect(timeout=self._connect_timeout)
166-
self._available.add(conn)
166+
async with self._connect_lock:
167+
if not self._connections:
168+
conn = await self._connect(timeout=self._connect_timeout)
169+
self._available.add(conn)
167170

168171
task = asyncio.create_task(_prewarm_impl())
169172
self._prewarm_task = weakref.ref(task)

0 commit comments

Comments
 (0)