Skip to content

Commit 3263680

Browse files
committed
chore: limit symdb uploaders under spawn
We use file-based IPC to ensure that Symbol DB has as most 2 active uploader processes under more general circumstances than fork, such as spawn.
1 parent aeb5df4 commit 3263680

File tree

3 files changed

+111
-23
lines changed

3 files changed

+111
-23
lines changed

ddtrace/internal/ipc.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,32 +99,38 @@ def open_file(path, mode):
9999
class SharedStringFile:
100100
"""A simple shared-file implementation for multiprocess communication."""
101101

102-
def __init__(self) -> None:
103-
self.filename: typing.Optional[str] = str(TMPDIR / secrets.token_hex(8))
102+
def __init__(self, name: typing.Optional[str] = None) -> None:
103+
self.filename: typing.Optional[str] = str(TMPDIR / (name or secrets.token_hex(8)))
104+
105+
def put_unlocked(self, f: typing.BinaryIO, data: str) -> None:
106+
f.seek(0, os.SEEK_END)
107+
dt = (data + "\x00").encode()
108+
if f.tell() + len(dt) <= MAX_FILE_SIZE:
109+
f.write(dt)
104110

105111
def put(self, data: str) -> None:
106112
"""Put a string into the file."""
107113
if self.filename is None:
108114
return
109115

110116
try:
111-
with open_file(self.filename, "ab") as f, WriteLock(f):
112-
f.seek(0, os.SEEK_END)
113-
dt = (data + "\x00").encode()
114-
if f.tell() + len(dt) <= MAX_FILE_SIZE:
115-
f.write(dt)
117+
with self.lock_exclusive() as f:
118+
self.put_unlocked(f, data)
116119
except Exception: # nosec
117120
pass
118121

122+
def peekall_unlocked(self, f: typing.BinaryIO) -> typing.List[str]:
123+
f.seek(0)
124+
return f.read().strip(b"\x00").decode().split("\x00")
125+
119126
def peekall(self) -> typing.List[str]:
120127
"""Peek at all strings from the file."""
121128
if self.filename is None:
122129
return []
123130

124131
try:
125-
with open_file(self.filename, "r+b") as f, ReadLock(f):
126-
f.seek(0)
127-
return f.read().strip(b"\x00").decode().split("\x00")
132+
with self.lock_shared() as f:
133+
return self.peekall_unlocked(f)
128134
except Exception: # nosec
129135
return []
130136

@@ -134,7 +140,7 @@ def snatchall(self) -> typing.List[str]:
134140
return []
135141

136142
try:
137-
with open_file(self.filename, "r+b") as f, WriteLock(f):
143+
with self.lock_exclusive() as f:
138144
f.seek(0)
139145
strings = f.read().strip(b"\x00").decode().split("\x00")
140146

@@ -144,3 +150,27 @@ def snatchall(self) -> typing.List[str]:
144150
return strings
145151
except Exception: # nosec
146152
return []
153+
154+
def clear(self) -> None:
155+
"""Clear all strings from the file."""
156+
if self.filename is None:
157+
return
158+
159+
try:
160+
with self.lock_exclusive() as f:
161+
f.seek(0)
162+
f.truncate()
163+
except Exception: # nosec
164+
pass
165+
166+
@contextmanager
167+
def lock_shared(self):
168+
"""Context manager to acquire a shared/read lock on the file."""
169+
with open_file(self.filename, "r+b") as f, ReadLock(f):
170+
yield f
171+
172+
@contextmanager
173+
def lock_exclusive(self):
174+
"""Context manager to acquire an exclusive/write lock on the file."""
175+
with open_file(self.filename, "r+b") as f, WriteLock(f):
176+
yield f

ddtrace/internal/symbol_db/remoteconfig.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import os
2+
import sys
23
import typing as t
34

45
from ddtrace.internal.forksafe import has_forked
6+
from ddtrace.internal.ipc import SharedStringFile
57
from ddtrace.internal.logger import get_logger
68
from ddtrace.internal.products import manager as product_manager
79
from ddtrace.internal.remoteconfig import Payload
@@ -18,20 +20,34 @@
1820

1921
log = get_logger(__name__)
2022

23+
# Use a shared file to keep track of which PIDs have Symbol DB enabled. This way
24+
# we can ensure that at most two processes are emitting symbols under a large
25+
# range of scenarios.
26+
shared_pid_file = SharedStringFile(f"{str(hash(tuple(sys.argv)))[:8]}-symdb-pids")
27+
28+
MAX_SYMDB_UPLOADERS = 2 # parent + 1 child
29+
2130

2231
def _rc_callback(data: t.Sequence[Payload]):
23-
if get_ancestor_runtime_id() is not None and has_forked():
24-
log.debug("[PID %d] SymDB: Disabling Symbol DB in forked process", os.getpid())
25-
# We assume that forking is being used for spawning child worker
26-
# processes. Therefore, we avoid uploading the same symbols from each
27-
# child process. We restrict the enablement of Symbol DB to just the
28-
# parent process and the first fork child.
29-
remoteconfig_poller.unregister("LIVE_DEBUGGING_SYMBOL_DB")
30-
31-
if SymbolDatabaseUploader.is_installed():
32-
SymbolDatabaseUploader.uninstall()
33-
34-
return
32+
with shared_pid_file.lock_exclusive() as f:
33+
if (get_ancestor_runtime_id() is not None and has_forked()) or len(
34+
set(shared_pid_file.peekall_unlocked(f))
35+
) >= MAX_SYMDB_UPLOADERS:
36+
log.debug("[PID %d] SymDB: Disabling Symbol DB in child process", os.getpid())
37+
# We assume that forking is being used for spawning child worker
38+
# processes. Therefore, we avoid uploading the same symbols from each
39+
# child process. We restrict the enablement of Symbol DB to just the
40+
# parent process and the first fork child.
41+
remoteconfig_poller.unregister("LIVE_DEBUGGING_SYMBOL_DB")
42+
43+
if SymbolDatabaseUploader.is_installed():
44+
SymbolDatabaseUploader.uninstall()
45+
46+
return
47+
48+
# Store the PID of the current process so that we know which processes
49+
# have Symbol DB enabled.
50+
shared_pid_file.put_unlocked(f, str(os.getpid()))
3551

3652
for payload in data:
3753
if payload.metadata is None:

tests/internal/symbol_db/test_symbols.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@
1515
from ddtrace.internal.symbol_db.symbols import SymbolType
1616

1717

18+
@pytest.fixture(autouse=True, scope="function")
19+
def pid_file_teardown():
20+
from ddtrace.internal.symbol_db.remoteconfig import shared_pid_file
21+
22+
yield
23+
24+
shared_pid_file.clear()
25+
26+
1827
def test_symbol_from_code():
1928
def foo(a, b, c=None):
2029
loc = 42
@@ -320,3 +329,36 @@ def test_symbols_fork_uploads():
320329

321330
for pid in pids:
322331
os.waitpid(pid, 0)
332+
333+
334+
def spawn_target(results):
335+
from ddtrace.internal.remoteconfig import ConfigMetadata
336+
from ddtrace.internal.remoteconfig import Payload
337+
from ddtrace.internal.symbol_db.remoteconfig import _rc_callback
338+
from ddtrace.internal.symbol_db.symbols import SymbolDatabaseUploader
339+
340+
SymbolDatabaseUploader.install()
341+
342+
rc_data = [Payload(ConfigMetadata("test", "symdb", "hash", 0, 0), "test", None)]
343+
_rc_callback(rc_data)
344+
results.append(SymbolDatabaseUploader.is_installed())
345+
346+
347+
def test_symbols_spawn_uploads():
348+
import multiprocessing
349+
350+
multiprocessing.set_start_method("spawn", force=True)
351+
mc_context = multiprocessing.get_context("spawn")
352+
manager = multiprocessing.Manager()
353+
returns = manager.list()
354+
jobs = []
355+
356+
for _ in range(10):
357+
p = mc_context.Process(target=spawn_target, args=(returns,))
358+
p.start()
359+
jobs.append(p)
360+
361+
for p in jobs:
362+
p.join()
363+
364+
assert sum(returns) == 2

0 commit comments

Comments
 (0)