Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 89 additions & 69 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from typing import Tuple
from typing import Type

import wrapt

from ddtrace.internal.datadog.profiling import ddup
from ddtrace.profiling import _threading
from ddtrace.profiling import collector
Expand All @@ -40,40 +38,62 @@ def _current_thread() -> Tuple[int, str]:
return thread_id, _threading.get_thread_name(thread_id)


# We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will
# appear in the stack trace and we need to hide it.
WRAPT_C_EXT: bool
if os.environ.get("WRAPT_DISABLE_EXTENSIONS"):
WRAPT_C_EXT = False
else:
try:
import wrapt._wrappers as _w # noqa: F401
except ImportError:
WRAPT_C_EXT = False
else:
WRAPT_C_EXT = True
del _w
class _ProfiledLock:
"""
Lightweight lock wrapper that profiles lock acquire/release operations.
It intercepts lock methods without the overhead of a full proxy object.
"""

__slots__ = (
"__wrapped__",
"tracer",
"max_nframes",
"capture_sampler",
"init_location",
"acquired_time",
"name",
)

class _ProfiledLock(wrapt.ObjectProxy):
def __init__(
self,
wrapped: Any,
tracer: Optional[Tracer],
max_nframes: int,
capture_sampler: collector.CaptureSampler,
endpoint_collection_enabled: bool,
) -> None:
wrapt.ObjectProxy.__init__(self, wrapped)
self._self_tracer: Optional[Tracer] = tracer
self._self_max_nframes: int = max_nframes
self._self_capture_sampler: collector.CaptureSampler = capture_sampler
self._self_endpoint_collection_enabled: bool = endpoint_collection_enabled
frame: FrameType = sys._getframe(2 if WRAPT_C_EXT else 3)
self.__wrapped__: Any = wrapped
self.tracer: Optional[Tracer] = tracer
self.max_nframes: int = max_nframes
self.capture_sampler: collector.CaptureSampler = capture_sampler
# Frame depth: 0=__init__, 1=_profiled_allocate_lock, 2=_LockAllocatorWrapper.__call__, 3=caller
frame: FrameType = sys._getframe(3)
code: CodeType = frame.f_code
self._self_init_loc: str = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno)
self._self_acquired_at: int = 0
self._self_name: Optional[str] = None
self.init_location: str = f"{os.path.basename(code.co_filename)}:{frame.f_lineno}"
self.acquired_time: int = 0
self.name: Optional[str] = None

### DUNDER methods ###

def __eq__(self, other: Any) -> bool:
if isinstance(other, _ProfiledLock):
return self.__wrapped__ == other.__wrapped__
return self.__wrapped__ == other

def __getattr__(self, name: str) -> Any:
# Delegates acquire_lock, release_lock, locked_lock, and any future methods
return getattr(self.__wrapped__, name)

def __hash__(self) -> int:
return hash(self.__wrapped__)

def __repr__(self) -> str:
return f"<_ProfiledLock({self.__wrapped__!r}) at {self.init_location}>"

### Regular methods ###

def locked(self) -> bool:
"""Return True if lock is currently held."""
return self.__wrapped__.locked()

def acquire(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.acquire, *args, **kwargs)
Expand All @@ -85,17 +105,17 @@ def __aenter__(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.__aenter__, *args, **kwargs)

def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
if not self._self_capture_sampler.capture():
if not self.capture_sampler.capture():
return inner_func(*args, **kwargs)

start: int = time.monotonic_ns()
try:
return inner_func(*args, **kwargs)
finally:
end: int = time.monotonic_ns()
self._self_acquired_at = end
self.acquired_time = end
try:
self._maybe_update_self_name()
self._update_name()
self._flush_sample(start, end, is_acquire=True)
except AssertionError:
if config.enable_asserts:
Expand All @@ -115,22 +135,16 @@ def __aexit__(self, *args: Any, **kwargs: Any) -> Any:
return self._release(self.__wrapped__.__aexit__, *args, **kwargs)

def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
# The underlying threading.Lock class is implemented using C code, and
# it doesn't have the __dict__ attribute. So we can't do
# self.__dict__.pop("_self_acquired_at", None) to remove the attribute.
# Instead, we need to use the following workaround to retrieve and
# remove the attribute.
start: Optional[int] = getattr(self, "_self_acquired_at", None)
start: Optional[int] = getattr(self, "acquired_time", None)
try:
# Though it should generally be avoided to call release() from
# multiple threads, it is possible to do so. In that scenario, the
# following statement code will raise an AttributeError. This should
# not be propagated to the caller and to the users. The inner_func
# will raise an RuntimeError as the threads are trying to release()
# and unlocked lock, and the expected behavior is to propagate that.
del self._self_acquired_at
del self.acquired_time
except AttributeError:
# We just ignore the error, if the attribute is not found.
pass

try:
Expand All @@ -151,7 +165,7 @@ def _flush_sample(self, start: int, end: int, is_acquire: bool) -> None:

handle.push_monotonic_ns(end)

lock_name: str = f"{self._self_init_loc}:{self._self_name}" if self._self_name else self._self_init_loc
lock_name: str = f"{self.init_location}:{self.name}" if self.name else self.init_location
handle.push_lock_name(lock_name)

duration_ns: int = end - start
Expand All @@ -175,36 +189,39 @@ def _flush_sample(self, start: int, end: int, is_acquire: bool) -> None:
thread_native_id: int = _threading.get_thread_native_id(thread_id)
handle.push_threadinfo(thread_id, thread_native_id, thread_name)

if self._self_tracer is not None:
handle.push_span(self._self_tracer.current_span())
if self.tracer is not None:
handle.push_span(self.tracer.current_span())

# If we can't get the task frame, we use the caller frame.
# Call stack: 0: _flush_sample, 1: _acquire/_release, 2: acquire/release/__enter__/__exit__, 3: caller
frame: FrameType = task_frame or sys._getframe(3)
frames: List[DDFrame]
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
frames, _ = _traceback.pyframe_to_frames(frame, self.max_nframes)
for ddframe in frames:
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)

handle.flush_sample()

def _find_self_name(self, var_dict: Dict[str, Any]) -> Optional[str]:
def _find_name(self, var_dict: Dict[str, Any]) -> Optional[str]:
for name, value in var_dict.items():
if name.startswith("__") or isinstance(value, ModuleType):
continue
if value is self:
return name
if config.lock.name_inspect_dir:
for attribute in dir(value):
if not attribute.startswith("__") and getattr(value, attribute) is self:
self._self_name = attribute
return attribute
try:
if not attribute.startswith("__") and getattr(value, attribute) is self:
return attribute
except AttributeError:
# Accessing unset attributes in __slots__ raises AttributeError.
pass
return None

# Get lock acquire/release call location and variable name the lock is assigned to
# This function propagates ValueError if the frame depth is <= 3.
def _maybe_update_self_name(self) -> None:
if self._self_name is not None:
def _update_name(self) -> None:
if self.name is not None:
return
# We expect the call stack to be like this:
# 0: this
Expand All @@ -222,14 +239,22 @@ def _maybe_update_self_name(self) -> None:

# First, look at the local variables of the caller frame, and then the global variables
frame = sys._getframe(3)
self._self_name = self._find_self_name(frame.f_locals) or self._find_self_name(frame.f_globals) or ""
self.name = self._find_name(frame.f_locals) or self._find_name(frame.f_globals) or ""


class _LockAllocatorWrapper:
"""Wrapper for lock allocator functions that prevents method binding."""

__slots__ = ("_func",)

def __init__(self, func: Callable[..., Any]) -> None:
self._func: Callable[..., Any] = func

def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self._func(*args, **kwargs)

class FunctionWrapper(wrapt.FunctionWrapper):
# Override the __get__ method: whatever happens, _allocate_lock is always considered by Python like a "static"
# method, even when used as a class attribute. Python never tried to "bind" it to a method, because it sees it is a
# builtin function. Override default wrapt behavior here that tries to detect bound method.
def __get__(self, instance: Any, owner: Optional[Type] = None) -> FunctionWrapper: # type: ignore
def __get__(self, instance: Any, owner: Optional[Type] = None) -> _LockAllocatorWrapper:
# Prevent automatic method binding (e.g., Foo.lock_class = threading.Lock)
return self


Expand All @@ -241,16 +266,14 @@ class LockCollector(collector.CaptureSamplerCollector):
def __init__(
self,
nframes: int = config.max_frames,
endpoint_collection_enabled: bool = config.endpoint_collection,
tracer: Optional[Tracer] = None,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.nframes: int = nframes
self.endpoint_collection_enabled: bool = endpoint_collection_enabled
self.tracer: Optional[Tracer] = tracer
self._original: Optional[Any] = None
self._original_lock: Any = None

@abc.abstractmethod
def _get_patch_target(self) -> Callable[..., Any]:
Expand All @@ -272,23 +295,20 @@ def _stop_service(self) -> None:

def patch(self) -> None:
"""Patch the module for tracking lock allocation."""
# We only patch the lock from the `threading` module.
# Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile.
self._original = self._get_patch_target()
self._original_lock = self._get_patch_target()
original_lock: Any = self._original_lock # Capture non-None value

# TODO: `instance` is unused
def _allocate_lock(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> _ProfiledLock:
lock: Any = wrapped(*args, **kwargs)
def _profiled_allocate_lock(*args: Any, **kwargs: Any) -> _ProfiledLock:
"""Simple wrapper that returns profiled locks."""
return self.PROFILED_LOCK_CLASS(
lock,
self.tracer,
self.nframes,
self._capture_sampler,
self.endpoint_collection_enabled,
wrapped=original_lock(*args, **kwargs),
tracer=self.tracer,
max_nframes=self.nframes,
capture_sampler=self._capture_sampler,
)

self._set_patch_target(FunctionWrapper(self._original, _allocate_lock))
self._set_patch_target(_LockAllocatorWrapper(_profiled_allocate_lock))

def unpatch(self) -> None:
"""Unpatch the threading module for tracking lock allocation."""
self._set_patch_target(self._original)
self._set_patch_target(self._original_lock)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
other:
- |
profiling: This removes the ``wrapt`` library dependency from the Lock Profiler implementation, improving performance and reducing overhead during lock instrumentation.
4 changes: 1 addition & 3 deletions tests/profiling_v2/collector/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
def test_repr():
test_collector._test_repr(
collector_asyncio.AsyncioLockCollector,
"AsyncioLockCollector(status=<ServiceStatus.STOPPED: 'stopped'>, "
"capture_pct=1.0, nframes=64, "
"endpoint_collection_enabled=True, tracer=None)",
"AsyncioLockCollector(status=<ServiceStatus.STOPPED: 'stopped'>, capture_pct=1.0, nframes=64, tracer=None)", # noqa: E501
)


Expand Down
Loading
Loading