@@ -35,6 +35,19 @@ def _task_get_name(task: "asyncio.Task") -> str:
3535 return "Task-%d" % id (task )
3636
3737
38+ def _call_init_asyncio (asyncio : ModuleType ) -> None :
39+ from asyncio import tasks as asyncio_tasks
40+
41+ if sys .hexversion >= 0x030C0000 :
42+ scheduled_tasks = asyncio_tasks ._scheduled_tasks .data # type: ignore[attr-defined]
43+ eager_tasks = asyncio_tasks ._eager_tasks # type: ignore[attr-defined]
44+ else :
45+ scheduled_tasks = asyncio_tasks ._all_tasks .data # type: ignore[attr-defined]
46+ eager_tasks = None
47+
48+ stack_v2 .init_asyncio (asyncio_tasks ._current_tasks , scheduled_tasks , eager_tasks ) # type: ignore[attr-defined]
49+
50+
3851@ModuleWatchdog .after_module_imported ("asyncio" )
3952def _ (asyncio ):
4053 # type: (ModuleType) -> None
@@ -57,7 +70,15 @@ def _(asyncio):
5770 if THREAD_LINK is None :
5871 THREAD_LINK = _threading ._ThreadLink ()
5972
60- init_stack_v2 = config .stack .v2_enabled and stack_v2 .is_available
73+ init_stack_v2 : bool = config .stack .v2_enabled and stack_v2 .is_available
74+
75+ if init_stack_v2 :
76+ try :
77+ running_loop = asyncio .get_running_loop ()
78+ stack_v2 .track_asyncio_loop (typing .cast (int , ddtrace_threading .current_thread ().ident ), running_loop )
79+ except RuntimeError :
80+ # No existing loop to track, continue
81+ pass
6182
6283 @partial (wrap , sys .modules ["asyncio.events" ].BaseDefaultEventLoopPolicy .set_event_loop )
6384 def _ (f , args , kwargs ):
@@ -91,17 +112,28 @@ def _(f, args, kwargs):
91112 for child in children :
92113 stack_v2 .link_tasks (parent , child )
93114
94- if sys .hexversion >= 0x030C0000 :
95- scheduled_tasks = asyncio .tasks ._scheduled_tasks .data
96- eager_tasks = asyncio .tasks ._eager_tasks
97- else :
98- scheduled_tasks = asyncio .tasks ._all_tasks .data
99- eager_tasks = None
100-
101- stack_v2 .init_asyncio (asyncio .tasks ._current_tasks , scheduled_tasks , eager_tasks )
115+ _call_init_asyncio (asyncio )
102116
103117
104118def get_event_loop_for_thread (thread_id : int ) -> typing .Union ["asyncio.AbstractEventLoop" , None ]:
105119 global THREAD_LINK
106120
107121 return THREAD_LINK .get_object (thread_id ) if THREAD_LINK is not None else None
122+
123+
124+ def link_existing_loop_to_current_thread () -> None :
125+ import asyncio
126+
127+ running_loop : typing .Union ["asyncio.AbstractEventLoop" , None ] = None
128+ try :
129+ running_loop = asyncio .get_running_loop ()
130+ stack_v2 .track_asyncio_loop (typing .cast (int , ddtrace_threading .current_thread ().ident ), running_loop )
131+ except RuntimeError :
132+ # No existing loop to track, continue
133+ pass # nosec: we only want to detect there is no existing loop to track
134+ finally :
135+ assert THREAD_LINK is not None # nosec: assert is used for typing
136+ if running_loop is not None :
137+ THREAD_LINK .link_object (running_loop )
138+
139+ _call_init_asyncio (asyncio )
0 commit comments