Skip to content

Add typing annotations for slcan interface #832

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ jobs:
can/broadcastmanager.py
can/bus.py
can/interface.py
can/interfaces/slcan.py
can/interfaces/socketcan/**.py
can/interfaces/virtual.py
can/listener.py
Expand Down
77 changes: 42 additions & 35 deletions can/interfaces/slcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

"""

from typing import Any, Optional, Tuple
from can import typechecking

import time
import logging

Expand Down Expand Up @@ -52,38 +55,39 @@ class slcanBus(BusABC):

def __init__(
self,
channel,
ttyBaudrate=115200,
bitrate=None,
btr=None,
sleep_after_open=_SLEEP_AFTER_SERIAL_OPEN,
rtscts=False,
**kwargs
):
channel: typechecking.ChannelStr,
ttyBaudrate: int = 115200,
bitrate: Optional[int] = None,
btr: Optional[str] = None,
sleep_after_open: float = _SLEEP_AFTER_SERIAL_OPEN,
rtscts: bool = False,
**kwargs: Any
) -> None:
"""
:raise ValueError: if both *bitrate* and *btr* are set

:param str channel:
:param channel:
port of underlying serial or usb device (e.g. /dev/ttyUSB0, COM8, ...)
Must not be empty.
:param int ttyBaudrate:
:param ttyBaudrate:
baudrate of underlying serial or usb device
:param int bitrate:
:param bitrate:
Bitrate in bit/s
:param str btr:
:param btr:
BTR register value to set custom can speed
:param float poll_interval:
:param poll_interval:
Poll interval in seconds when reading messages
:param float sleep_after_open:
:param sleep_after_open:
Time to wait in seconds after opening serial connection
:param bool rtscts:
:param rtscts:
turn hardware handshake (RTS/CTS) on and off
"""

if not channel: # if None or empty
raise TypeError("Must specify a serial port.")
if "@" in channel:
(channel, ttyBaudrate) = channel.split("@")
(channel, baudrate) = channel.split("@")
ttyBaudrate = int(baudrate)
self.serialPortOrig = serial.serial_for_url(
channel, baudrate=ttyBaudrate, rtscts=rtscts
)
Expand All @@ -104,36 +108,38 @@ def __init__(
channel, ttyBaudrate=115200, bitrate=None, rtscts=False, **kwargs
)

def set_bitrate(self, bitrate):
def set_bitrate(self, bitrate: int) -> None:
"""
:raise ValueError: if both *bitrate* is not among the possible values

:param int bitrate:
:param bitrate:
Bitrate in bit/s
"""
self.close()
if bitrate in self._BITRATES:
self._write(self._BITRATES[bitrate])
else:
raise ValueError(
"Invalid bitrate, choose one of " + (", ".join(self._BITRATES)) + "."
"Invalid bitrate, choose one of "
+ (", ".join(str(k) for k in self._BITRATES.keys()))
+ "."
)
self.open()

def set_bitrate_reg(self, btr):
def set_bitrate_reg(self, btr: str) -> None:
"""
:param str btr:
:param btr:
BTR register value to set custom can speed
"""
self.close()
self._write("s" + btr)
self.open()

def _write(self, string):
def _write(self, string: str) -> None:
self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
self.serialPortOrig.flush()

def _read(self, timeout):
def _read(self, timeout: Optional[float]) -> Optional[str]:

# first read what is already in receive buffer
while self.serialPortOrig.in_waiting:
Expand Down Expand Up @@ -165,18 +171,20 @@ def _read(self, timeout):
break
return string

def flush(self):
def flush(self) -> None:
del self._buffer[:]
while self.serialPortOrig.in_waiting:
self.serialPortOrig.read()

def open(self):
def open(self) -> None:
self._write("O")

def close(self):
def close(self) -> None:
self._write("C")

def _recv_internal(self, timeout):
def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[Message], bool]:

canId = None
remote = False
Expand Down Expand Up @@ -223,7 +231,7 @@ def _recv_internal(self, timeout):
return msg, False
return None, False

def send(self, msg, timeout=None):
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
if timeout != self.serialPortOrig.write_timeout:
self.serialPortOrig.write_timeout = timeout
if msg.is_remote_frame:
Expand All @@ -239,20 +247,21 @@ def send(self, msg, timeout=None):
sendStr += "".join(["%02X" % b for b in msg.data])
self._write(sendStr)

def shutdown(self):
def shutdown(self) -> None:
self.close()
self.serialPortOrig.close()

def fileno(self):
def fileno(self) -> int:
if hasattr(self.serialPortOrig, "fileno"):
return self.serialPortOrig.fileno()
# Return an invalid file descriptor on Windows
return -1

def get_version(self, timeout):
def get_version(
self, timeout: Optional[float]
) -> Tuple[Optional[int], Optional[int]]:
"""Get HW and SW version of the slcan interface.

:type timeout: int or None
:param timeout:
seconds to wait for version or None to wait indefinitely

Expand Down Expand Up @@ -288,14 +297,12 @@ def get_version(self, timeout):
else:
return None, None

def get_serial_number(self, timeout):
def get_serial_number(self, timeout: Optional[float]) -> Optional[str]:
"""Get serial number of the slcan interface.

:type timeout: int or None
:param timeout:
seconds to wait for serial number or None to wait indefinitely

:rtype str or None
:return:
None on timeout or a str object.
"""
Expand Down
4 changes: 3 additions & 1 deletion can/typechecking.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
CanData = typing.Union[bytes, bytearray, int, typing.Iterable[int]]

# Used for the Abstract Base Class
Channel = typing.Union[int, str]
ChannelStr = str
ChannelInt = int
Channel = typing.Union[ChannelInt, ChannelStr]

# Used by the IO module
FileLike = typing.IO[typing.Any]
Expand Down