Skip to content

Commit fa5b133

Browse files
authored
improve ThreadBasedCyclicSendTask timing (hardbyte#1539)
Co-authored-by: zariiii9003 <[email protected]>
1 parent 4096817 commit fa5b133

File tree

1 file changed

+35
-18
lines changed

1 file changed

+35
-18
lines changed

can/broadcastmanager.py

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,39 @@
55
:meth:`can.BusABC.send_periodic`.
66
"""
77

8+
import abc
9+
import logging
10+
import sys
11+
import threading
12+
import time
813
from typing import Optional, Sequence, Tuple, Union, Callable, TYPE_CHECKING
914

15+
from typing_extensions import Final
16+
1017
from can import typechecking
18+
from can.message import Message
1119

1220
if TYPE_CHECKING:
1321
from can.bus import BusABC
1422

15-
from can.message import Message
16-
17-
import abc
18-
import logging
19-
import threading
20-
import time
2123

2224
# try to import win32event for event-based cyclic send task (needs the pywin32 package)
25+
USE_WINDOWS_EVENTS = False
2326
try:
2427
import win32event
2528

26-
HAS_EVENTS = True
29+
# Python 3.11 provides a more precise sleep implementation on Windows, so this is not necessary.
30+
# Put version check here, so mypy does not complain about `win32event` not being defined.
31+
if sys.version_info < (3, 11):
32+
USE_WINDOWS_EVENTS = True
2733
except ImportError:
28-
HAS_EVENTS = False
34+
pass
2935

3036
log = logging.getLogger("can.bcm")
3137

38+
NANOSECONDS_IN_SECOND: Final[int] = 1_000_000_000
39+
NANOSECONDS_IN_MILLISECOND: Final[int] = 1_000_000
40+
3241

3342
class CyclicTask(abc.ABC):
3443
"""
@@ -64,6 +73,7 @@ def __init__(
6473
# Take the Arbitration ID of the first element
6574
self.arbitration_id = messages[0].arbitration_id
6675
self.period = period
76+
self.period_ns = int(round(period * 1e9))
6777
self.messages = messages
6878

6979
@staticmethod
@@ -246,7 +256,7 @@ def __init__(
246256
)
247257
self.on_error = on_error
248258

249-
if HAS_EVENTS:
259+
if USE_WINDOWS_EVENTS:
250260
self.period_ms = int(round(period * 1000, 0))
251261
try:
252262
self.event = win32event.CreateWaitableTimerEx(
@@ -261,7 +271,7 @@ def __init__(
261271
self.start()
262272

263273
def stop(self) -> None:
264-
if HAS_EVENTS:
274+
if USE_WINDOWS_EVENTS:
265275
win32event.CancelWaitableTimer(self.event.handle)
266276
self.stopped = True
267277

@@ -272,7 +282,7 @@ def start(self) -> None:
272282
self.thread = threading.Thread(target=self._run, name=name)
273283
self.thread.daemon = True
274284

275-
if HAS_EVENTS:
285+
if USE_WINDOWS_EVENTS:
276286
win32event.SetWaitableTimer(
277287
self.event.handle, 0, self.period_ms, None, None, False
278288
)
@@ -281,10 +291,11 @@ def start(self) -> None:
281291

282292
def _run(self) -> None:
283293
msg_index = 0
294+
msg_due_time_ns = time.perf_counter_ns()
295+
284296
while not self.stopped:
285297
# Prevent calling bus.send from multiple threads
286298
with self.send_lock:
287-
started = time.perf_counter()
288299
try:
289300
self.bus.send(self.messages[msg_index])
290301
except Exception as exc: # pylint: disable=broad-except
@@ -294,13 +305,19 @@ def _run(self) -> None:
294305
break
295306
else:
296307
break
308+
msg_due_time_ns += self.period_ns
297309
if self.end_time is not None and time.perf_counter() >= self.end_time:
298310
break
299311
msg_index = (msg_index + 1) % len(self.messages)
300312

301-
if HAS_EVENTS:
302-
win32event.WaitForSingleObject(self.event.handle, self.period_ms)
303-
else:
304-
# Compensate for the time it takes to send the message
305-
delay = self.period - (time.perf_counter() - started)
306-
time.sleep(max(0.0, delay))
313+
# Compensate for the time it takes to send the message
314+
delay_ns = msg_due_time_ns - time.perf_counter_ns()
315+
316+
if delay_ns > 0:
317+
if USE_WINDOWS_EVENTS:
318+
win32event.WaitForSingleObject(
319+
self.event.handle,
320+
int(round(delay_ns / NANOSECONDS_IN_MILLISECOND)),
321+
)
322+
else:
323+
time.sleep(delay_ns / NANOSECONDS_IN_SECOND)

0 commit comments

Comments
 (0)