Skip to content

Commit 23d9955

Browse files
authored
Merge branch 'develop' into fix_scripts
2 parents 7155dc4 + c83ee71 commit 23d9955

File tree

11 files changed

+241
-18
lines changed

11 files changed

+241
-18
lines changed

can/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ class CanError(IOError):
2323
pass
2424

2525
from .listener import Listener, BufferedReader, RedirectReader
26+
try:
27+
from .listener import AsyncBufferedReader
28+
except ImportError:
29+
pass
2630

2731
from .io import Logger, Printer, LogReader, MessageSync
2832
from .io import ASCWriter, ASCReader

can/interfaces/serial/serial_can.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,3 +155,9 @@ def _recv_internal(self, timeout):
155155

156156
else:
157157
return None, False
158+
159+
def fileno(self):
160+
if hasattr(self.ser, 'fileno'):
161+
return self.ser.fileno()
162+
# Return an invalid file descriptor on Windows
163+
return -1

can/interfaces/slcan.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515
import time
1616
import logging
1717

18-
import serial
19-
2018
from can import BusABC, Message
2119

2220
logger = logging.getLogger(__name__)
2321

22+
try:
23+
import serial
24+
except ImportError:
25+
logger.warning("You won't be able to use the slcan can backend without "
26+
"the serial module installed!")
27+
serial = None
28+
2429

2530
class slcanBus(BusABC):
2631
"""
@@ -43,7 +48,7 @@ class slcanBus(BusABC):
4348

4449
_SLEEP_AFTER_SERIAL_OPEN = 2 # in seconds
4550

46-
def __init__(self, channel, ttyBaudrate=115200, timeout=1, bitrate=None,
51+
def __init__(self, channel, ttyBaudrate=115200, bitrate=None,
4752
rtscts=False, **kwargs):
4853
"""
4954
:param str channel:
@@ -55,8 +60,6 @@ def __init__(self, channel, ttyBaudrate=115200, timeout=1, bitrate=None,
5560
Bitrate in bit/s
5661
:param float poll_interval:
5762
Poll interval in seconds when reading messages
58-
:param float timeout:
59-
timeout in seconds when reading message
6063
:param bool rtscts:
6164
turn hardware handshake (RTS/CTS) on and off
6265
"""
@@ -68,7 +71,7 @@ def __init__(self, channel, ttyBaudrate=115200, timeout=1, bitrate=None,
6871
(channel, ttyBaudrate) = channel.split('@')
6972

7073
self.serialPortOrig = serial.serial_for_url(
71-
channel, baudrate=ttyBaudrate, timeout=timeout, rtscts=rtscts)
74+
channel, baudrate=ttyBaudrate, rtscts=rtscts)
7275

7376
time.sleep(self._SLEEP_AFTER_SERIAL_OPEN)
7477

@@ -81,7 +84,7 @@ def __init__(self, channel, ttyBaudrate=115200, timeout=1, bitrate=None,
8184

8285
self.open()
8386

84-
super(slcanBus, self).__init__(channel, ttyBaudrate=115200, timeout=1,
87+
super(slcanBus, self).__init__(channel, ttyBaudrate=115200,
8588
bitrate=None, rtscts=False, **kwargs)
8689

8790
def write(self, string):
@@ -97,7 +100,7 @@ def close(self):
97100
self.write('C')
98101

99102
def _recv_internal(self, timeout):
100-
if timeout is not None:
103+
if timeout != self.serialPortOrig.timeout:
101104
self.serialPortOrig.timeout = timeout
102105

103106
canId = None
@@ -145,7 +148,10 @@ def _recv_internal(self, timeout):
145148
else:
146149
return None, False
147150

148-
def send(self, msg, timeout=None):
151+
def send(self, msg, timeout=0):
152+
if timeout != self.serialPortOrig.write_timeout:
153+
self.serialPortOrig.write_timeout = timeout
154+
149155
if msg.is_remote_frame:
150156
if msg.is_extended_id:
151157
sendStr = "R%08X0" % (msg.arbitration_id)
@@ -163,3 +169,9 @@ def send(self, msg, timeout=None):
163169

164170
def shutdown(self):
165171
self.close()
172+
173+
def fileno(self):
174+
if hasattr(self.serialPortOrig, 'fileno'):
175+
return self.serialPortOrig.fileno()
176+
# Return an invalid file descriptor on Windows
177+
return -1

can/interfaces/socketcan/socketcan.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,9 @@ def _apply_filters(self, filters):
604604
else:
605605
self._is_filtered = True
606606

607+
def fileno(self):
608+
return self.socket.fileno()
609+
607610
@staticmethod
608611
def _detect_available_configs():
609612
return [{'interface': 'socketcan', 'channel': channel}

can/listener.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
# Python 2
1919
from Queue import Queue as SimpleQueue, Empty
2020

21+
try:
22+
import asyncio
23+
except ImportError:
24+
asyncio = None
25+
2126

2227
class Listener(object):
2328
"""The basic listener that can be called directly to handle some
@@ -47,6 +52,12 @@ def on_message_received(self, msg):
4752
def __call__(self, msg):
4853
return self.on_message_received(msg)
4954

55+
def on_error(self, exc):
56+
"""This method is called to handle any exception in the receive thread.
57+
58+
:param Exception exc: The exception causing the thread to stop
59+
"""
60+
5061
def stop(self):
5162
"""
5263
Override to cleanup any open resources.
@@ -116,3 +127,44 @@ def stop(self):
116127
"""Prohibits any more additions to this reader.
117128
"""
118129
self.is_stopped = True
130+
131+
132+
if asyncio is not None:
133+
class AsyncBufferedReader(Listener):
134+
"""A message buffer for use with :mod:`asyncio`.
135+
136+
See :ref:`asyncio` for how to use with :class:`can.Notifier`.
137+
138+
Can also be used as an asynchronous iterator::
139+
140+
async for msg in reader:
141+
print(msg)
142+
"""
143+
144+
def __init__(self, loop=None):
145+
# set to "infinite" size
146+
self.buffer = asyncio.Queue(loop=loop)
147+
148+
def on_message_received(self, msg):
149+
"""Append a message to the buffer.
150+
151+
Must only be called inside an event loop!
152+
"""
153+
self.buffer.put_nowait(msg)
154+
155+
def get_message(self):
156+
"""
157+
Retrieve the latest message when awaited for::
158+
159+
msg = await reader.get_message()
160+
161+
:rtype: can.Message
162+
:return: The CAN message.
163+
"""
164+
return self.buffer.get()
165+
166+
def __aiter__(self):
167+
return self
168+
169+
def __anext__(self):
170+
return self.buffer.get()

can/notifier.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,30 @@
88
import threading
99
import logging
1010
import time
11+
try:
12+
import asyncio
13+
except ImportError:
14+
asyncio = None
1115

1216
logger = logging.getLogger('can.Notifier')
1317

1418

1519
class Notifier(object):
1620

17-
def __init__(self, bus, listeners, timeout=1.0):
21+
def __init__(self, bus, listeners, timeout=1.0, loop=None):
1822
"""Manages the distribution of **Messages** from a given bus/buses to a
1923
list of listeners.
2024
2125
:param can.BusABC bus: A :ref:`bus` or a list of buses to listen to.
2226
:param list listeners: An iterable of :class:`~can.Listener`
2327
:param float timeout: An optional maximum number of seconds to wait for any message.
28+
:param asyncio.AbstractEventLoop loop:
29+
An :mod:`asyncio` event loop to schedule listeners in.
2430
"""
2531
self.listeners = listeners
2632
self.bus = bus
2733
self.timeout = timeout
34+
self._loop = loop
2835

2936
#: Exception raised in thread
3037
self.exception = None
@@ -35,11 +42,24 @@ def __init__(self, bus, listeners, timeout=1.0):
3542
self._readers = []
3643
buses = self.bus if isinstance(self.bus, list) else [self.bus]
3744
for bus in buses:
45+
self.add_bus(bus)
46+
47+
def add_bus(self, bus):
48+
"""Add a bus for notification.
49+
50+
:param can.BusABC bus:
51+
CAN bus instance.
52+
"""
53+
if self._loop is not None and hasattr(bus, 'fileno') and bus.fileno() >= 0:
54+
# Use file descriptor to watch for messages
55+
reader = bus.fileno()
56+
self._loop.add_reader(reader, self._on_message_available, bus)
57+
else:
3858
reader = threading.Thread(target=self._rx_thread, args=(bus,),
39-
name='can.notifier for bus "{}"'.format(bus.channel_info))
59+
name='can.notifier for bus "{}"'.format(bus.channel_info))
4060
reader.daemon = True
4161
reader.start()
42-
self._readers.append(reader)
62+
self._readers.append(reader)
4363

4464
def stop(self, timeout=5):
4565
"""Stop notifying Listeners when new :class:`~can.Message` objects arrive
@@ -52,25 +72,54 @@ def stop(self, timeout=5):
5272
self._running = False
5373
end_time = time.time() + timeout
5474
for reader in self._readers:
55-
now = time.time()
56-
if now < end_time:
57-
reader.join(end_time - now)
75+
if isinstance(reader, threading.Thread):
76+
now = time.time()
77+
if now < end_time:
78+
reader.join(end_time - now)
79+
else:
80+
# reader is a file descriptor
81+
self._loop.remove_reader(reader)
5882
for listener in self.listeners:
59-
listener.stop()
83+
if hasattr(listener, 'stop'):
84+
listener.stop()
6085

6186
def _rx_thread(self, bus):
6287
msg = None
6388
try:
6489
while self._running:
6590
if msg is not None:
6691
with self._lock:
67-
for callback in self.listeners:
68-
callback(msg)
92+
if self._loop is not None:
93+
self._loop.call_soon_threadsafe(
94+
self._on_message_received, msg)
95+
else:
96+
self._on_message_received(msg)
6997
msg = bus.recv(self.timeout)
7098
except Exception as exc:
7199
self.exception = exc
100+
if self._loop is not None:
101+
self._loop.call_soon_threadsafe(self._on_error, exc)
102+
else:
103+
self._on_error(exc)
72104
raise
73105

106+
def _on_message_available(self, bus):
107+
msg = bus.recv(0)
108+
if msg is not None:
109+
self._on_message_received(msg)
110+
111+
def _on_message_received(self, msg):
112+
for callback in self.listeners:
113+
res = callback(msg)
114+
if self._loop is not None and asyncio.iscoroutine(res):
115+
# Schedule coroutine
116+
self._loop.create_task(res)
117+
118+
def _on_error(self, exc):
119+
for listener in self.listeners:
120+
if hasattr(listener, 'on_error'):
121+
listener.on_error(exc)
122+
74123
def add_listener(self, listener):
75124
"""Add new Listener to the notification list.
76125
If it is already present, it will be called two times

doc/api.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ A form of CAN interface is also required.
1515
bus
1616
message
1717
listeners
18+
asyncio
1819
bcm
1920

2021

doc/asyncio.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
.. _asyncio:
2+
3+
Asyncio support
4+
===============
5+
6+
The :mod:`asyncio` module built into Python 3.4 and later can be used to write
7+
asynchronos code in a single thread. This library supports receiving messages
8+
asynchronosly in an event loop using the :class:`can.Notifier` class.
9+
There will still be one thread per CAN bus but the user application will execute
10+
entirely in the event loop, allowing simpler concurrency without worrying about
11+
threading issues. Interfaces that have a valid file descriptor will however be
12+
supported natively without a thread.
13+
14+
You can also use the :class:`can.AsyncBufferedReader` listener if you prefer
15+
to write coroutine based code instead of using callbacks.
16+
17+
18+
Example
19+
-------
20+
21+
Here is an example using both callback and coroutine based code:
22+
23+
.. literalinclude:: ../examples/asyncio_demo.py
24+
:language: python

doc/listeners.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ BufferedReader
2626
.. autoclass:: can.BufferedReader
2727
:members:
2828

29+
.. autoclass:: can.AsyncBufferedReader
30+
:members:
31+
2932

3033
Logger
3134
------

examples/asyncio_demo.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import asyncio
2+
import can
3+
4+
def print_message(msg):
5+
"""Regular callback function. Can also be a coroutine."""
6+
print(msg)
7+
8+
async def main():
9+
can0 = can.Bus('vcan0', bustype='virtual', receive_own_messages=True)
10+
reader = can.AsyncBufferedReader()
11+
logger = can.Logger('logfile.asc')
12+
13+
listeners = [
14+
print_message, # Callback function
15+
reader, # AsyncBufferedReader() listener
16+
logger # Regular Listener object
17+
]
18+
# Create Notifier with an explicit loop to use for scheduling of callbacks
19+
loop = asyncio.get_event_loop()
20+
notifier = can.Notifier(can0, listeners, loop=loop)
21+
# Start sending first message
22+
can0.send(can.Message(arbitration_id=0))
23+
24+
print('Bouncing 10 messages...')
25+
for _ in range(10):
26+
# Wait for next message from AsyncBufferedReader
27+
msg = await reader.get_message()
28+
# Delay response
29+
await asyncio.sleep(0.5)
30+
msg.arbitration_id += 1
31+
can0.send(msg)
32+
# Wait for last message to arrive
33+
await reader.get_message()
34+
print('Done!')
35+
36+
# Clean-up
37+
notifier.stop()
38+
can0.shutdown()
39+
40+
# Get the default event loop
41+
loop = asyncio.get_event_loop()
42+
# Run until main coroutine finishes
43+
loop.run_until_complete(main())
44+
loop.close()

0 commit comments

Comments
 (0)