7
7
8
8
"""
9
9
10
+ from typing import Any , Optional , Tuple
11
+ from can import typechecking
12
+
10
13
import time
11
14
import logging
12
15
@@ -52,38 +55,39 @@ class slcanBus(BusABC):
52
55
53
56
def __init__ (
54
57
self ,
55
- channel ,
56
- ttyBaudrate = 115200 ,
57
- bitrate = None ,
58
- btr = None ,
59
- sleep_after_open = _SLEEP_AFTER_SERIAL_OPEN ,
60
- rtscts = False ,
61
- ** kwargs
62
- ):
58
+ channel : typechecking . ChannelStr ,
59
+ ttyBaudrate : int = 115200 ,
60
+ bitrate : Optional [ int ] = None ,
61
+ btr : Optional [ str ] = None ,
62
+ sleep_after_open : float = _SLEEP_AFTER_SERIAL_OPEN ,
63
+ rtscts : bool = False ,
64
+ ** kwargs : Any
65
+ ) -> None :
63
66
"""
64
67
:raise ValueError: if both *bitrate* and *btr* are set
65
68
66
- :param str channel:
69
+ :param channel:
67
70
port of underlying serial or usb device (e.g. /dev/ttyUSB0, COM8, ...)
68
71
Must not be empty.
69
- :param int ttyBaudrate:
72
+ :param ttyBaudrate:
70
73
baudrate of underlying serial or usb device
71
- :param int bitrate:
74
+ :param bitrate:
72
75
Bitrate in bit/s
73
- :param str btr:
76
+ :param btr:
74
77
BTR register value to set custom can speed
75
- :param float poll_interval:
78
+ :param poll_interval:
76
79
Poll interval in seconds when reading messages
77
- :param float sleep_after_open:
80
+ :param sleep_after_open:
78
81
Time to wait in seconds after opening serial connection
79
- :param bool rtscts:
82
+ :param rtscts:
80
83
turn hardware handshake (RTS/CTS) on and off
81
84
"""
82
85
83
86
if not channel : # if None or empty
84
87
raise TypeError ("Must specify a serial port." )
85
88
if "@" in channel :
86
- (channel , ttyBaudrate ) = channel .split ("@" )
89
+ (channel , baudrate ) = channel .split ("@" )
90
+ ttyBaudrate = int (baudrate )
87
91
self .serialPortOrig = serial .serial_for_url (
88
92
channel , baudrate = ttyBaudrate , rtscts = rtscts
89
93
)
@@ -104,36 +108,38 @@ def __init__(
104
108
channel , ttyBaudrate = 115200 , bitrate = None , rtscts = False , ** kwargs
105
109
)
106
110
107
- def set_bitrate (self , bitrate ) :
111
+ def set_bitrate (self , bitrate : int ) -> None :
108
112
"""
109
113
:raise ValueError: if both *bitrate* is not among the possible values
110
114
111
- :param int bitrate:
115
+ :param bitrate:
112
116
Bitrate in bit/s
113
117
"""
114
118
self .close ()
115
119
if bitrate in self ._BITRATES :
116
120
self ._write (self ._BITRATES [bitrate ])
117
121
else :
118
122
raise ValueError (
119
- "Invalid bitrate, choose one of " + (", " .join (self ._BITRATES )) + "."
123
+ "Invalid bitrate, choose one of "
124
+ + (", " .join (str (k ) for k in self ._BITRATES .keys ()))
125
+ + "."
120
126
)
121
127
self .open ()
122
128
123
- def set_bitrate_reg (self , btr ) :
129
+ def set_bitrate_reg (self , btr : str ) -> None :
124
130
"""
125
- :param str btr:
131
+ :param btr:
126
132
BTR register value to set custom can speed
127
133
"""
128
134
self .close ()
129
135
self ._write ("s" + btr )
130
136
self .open ()
131
137
132
- def _write (self , string ) :
138
+ def _write (self , string : str ) -> None :
133
139
self .serialPortOrig .write (string .encode () + self .LINE_TERMINATOR )
134
140
self .serialPortOrig .flush ()
135
141
136
- def _read (self , timeout ) :
142
+ def _read (self , timeout : Optional [ float ]) -> Optional [ str ] :
137
143
138
144
# first read what is already in receive buffer
139
145
while self .serialPortOrig .in_waiting :
@@ -165,18 +171,20 @@ def _read(self, timeout):
165
171
break
166
172
return string
167
173
168
- def flush (self ):
174
+ def flush (self ) -> None :
169
175
del self ._buffer [:]
170
176
while self .serialPortOrig .in_waiting :
171
177
self .serialPortOrig .read ()
172
178
173
- def open (self ):
179
+ def open (self ) -> None :
174
180
self ._write ("O" )
175
181
176
- def close (self ):
182
+ def close (self ) -> None :
177
183
self ._write ("C" )
178
184
179
- def _recv_internal (self , timeout ):
185
+ def _recv_internal (
186
+ self , timeout : Optional [float ]
187
+ ) -> Tuple [Optional [Message ], bool ]:
180
188
181
189
canId = None
182
190
remote = False
@@ -223,7 +231,7 @@ def _recv_internal(self, timeout):
223
231
return msg , False
224
232
return None , False
225
233
226
- def send (self , msg , timeout = None ):
234
+ def send (self , msg : Message , timeout : Optional [ float ] = None ) -> None :
227
235
if timeout != self .serialPortOrig .write_timeout :
228
236
self .serialPortOrig .write_timeout = timeout
229
237
if msg .is_remote_frame :
@@ -239,20 +247,21 @@ def send(self, msg, timeout=None):
239
247
sendStr += "" .join (["%02X" % b for b in msg .data ])
240
248
self ._write (sendStr )
241
249
242
- def shutdown (self ):
250
+ def shutdown (self ) -> None :
243
251
self .close ()
244
252
self .serialPortOrig .close ()
245
253
246
- def fileno (self ):
254
+ def fileno (self ) -> int :
247
255
if hasattr (self .serialPortOrig , "fileno" ):
248
256
return self .serialPortOrig .fileno ()
249
257
# Return an invalid file descriptor on Windows
250
258
return - 1
251
259
252
- def get_version (self , timeout ):
260
+ def get_version (
261
+ self , timeout : Optional [float ]
262
+ ) -> Tuple [Optional [int ], Optional [int ]]:
253
263
"""Get HW and SW version of the slcan interface.
254
264
255
- :type timeout: int or None
256
265
:param timeout:
257
266
seconds to wait for version or None to wait indefinitely
258
267
@@ -288,14 +297,12 @@ def get_version(self, timeout):
288
297
else :
289
298
return None , None
290
299
291
- def get_serial_number (self , timeout ) :
300
+ def get_serial_number (self , timeout : Optional [ float ]) -> Optional [ str ] :
292
301
"""Get serial number of the slcan interface.
293
302
294
- :type timeout: int or None
295
303
:param timeout:
296
304
seconds to wait for serial number or None to wait indefinitely
297
305
298
- :rtype str or None
299
306
:return:
300
307
None on timeout or a str object.
301
308
"""
0 commit comments