Skip to content

Commit 2040719

Browse files
committed
Simplify + pertially type usb2can; use new exceptions
1 parent 8e6f05a commit 2040719

File tree

3 files changed

+78
-108
lines changed

3 files changed

+78
-108
lines changed

can/interfaces/usb2can/serial_selector.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"""
33

44
import logging
5+
from typing import List
56

67
try:
78
import win32com.client
@@ -10,7 +11,7 @@
1011
raise
1112

1213

13-
def WMIDateStringToDate(dtmDate):
14+
def WMIDateStringToDate(dtmDate) -> str:
1415
if dtmDate[4] == 0:
1516
strDateTime = dtmDate[5] + "/"
1617
else:
@@ -39,14 +40,12 @@ def WMIDateStringToDate(dtmDate):
3940
return strDateTime
4041

4142

42-
def find_serial_devices(serial_matcher="ED"):
43+
def find_serial_devices(serial_matcher: str = "ED") -> List[str]:
4344
"""
4445
Finds a list of USB devices where the serial number (partially) matches the given string.
4546
46-
:param str serial_matcher (optional):
47+
:param serial_matcher:
4748
only device IDs starting with this string are returned
48-
49-
:rtype: List[str]
5049
"""
5150
objWMIService = win32com.client.Dispatch("WbemScripting.SWbemLocator")
5251
objSWbemServices = objWMIService.ConnectServer(".", "root\\cimv2")

can/interfaces/usb2can/usb2canInterface.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
"""
2-
This interface is for Windows only, otherwise use socketCAN.
2+
This interface is for Windows only, otherwise use SocketCAN.
33
"""
44

55
import logging
66
from ctypes import byref
7+
from typing import Optional
78

8-
from can import BusABC, Message, CanError
9-
from .usb2canabstractionlayer import *
9+
from can import BusABC, Message, CanInitializationError, CanOperationError
10+
from .usb2canabstractionlayer import Usb2CanAbstractionLayer, CanalMsg, CanalError
11+
from .usb2canabstractionlayer import flags_t, IS_ERROR_FRAME, IS_REMOTE_FRAME, IS_ID_TYPE
1012
from .serial_selector import find_serial_devices
1113

1214
# Set up logging
@@ -102,15 +104,15 @@ def __init__(
102104
if not device_id:
103105
devices = find_serial_devices()
104106
if not devices:
105-
raise CanError("could not automatically find any device")
107+
raise CanInitializationError("could not automatically find any device")
106108
device_id = devices[0]
107109

108110
# convert to kb/s and cap: max rate is 1000 kb/s
109111
baudrate = min(int(bitrate // 1000), 1000)
110112

111-
self.channel_info = "USB2CAN device {}".format(device_id)
113+
self.channel_info = f"USB2CAN device {device_id}"
112114

113-
connector = "{}; {}".format(device_id, baudrate)
115+
connector = f"{device_id}; {baudrate}"
114116
self.handle = self.can.open(connector, flags_t)
115117

116118
super().__init__(
@@ -126,7 +128,7 @@ def send(self, msg, timeout=None):
126128
status = self.can.send(self.handle, byref(tx))
127129

128130
if status != CanalError.SUCCESS:
129-
raise CanError("could not send message: status == {}".format(status))
131+
raise CanOperationError("could not send message", error_code=status)
130132

131133
def _recv_internal(self, timeout):
132134

@@ -148,37 +150,36 @@ def _recv_internal(self, timeout):
148150
):
149151
rx = None
150152
else:
151-
log.error("Canal Error %s", status)
152-
rx = None
153+
raise CanOperationError("could not receive message", error_code=status)
153154

154155
return rx, False
155156

156157
def shutdown(self):
157158
"""
158159
Shuts down connection to the device safely.
159160
160-
:raise cam.CanError: is closing the connection did not work
161+
:raise cam.CanOperationError: is closing the connection did not work
161162
"""
162163
status = self.can.close(self.handle)
163164

164165
if status != CanalError.SUCCESS:
165-
raise CanError("could not shut down bus: status == {}".format(status))
166+
raise CanOperationError("could not shut down bus", error_code=status)
166167

167168
@staticmethod
168169
def _detect_available_configs():
169170
return Usb2canBus.detect_available_configs()
170171

171172
@staticmethod
172-
def detect_available_configs(serial_matcher=None):
173+
def detect_available_configs(serial_matcher: Optional[str] = None):
173174
"""
174-
Uses the Windows Management Instrumentation to identify serial devices.
175+
Uses the *Windows Management Instrumentation* to identify serial devices.
175176
176-
:param str serial_matcher (optional):
177+
:param serial_matcher:
177178
search string for automatic detection of the device serial
178179
"""
179-
if serial_matcher:
180-
channels = find_serial_devices(serial_matcher)
181-
else:
180+
if serial_matcher is None:
182181
channels = find_serial_devices()
182+
else:
183+
channels = find_serial_devices(serial_matcher)
183184

184185
return [{"interface": "usb2can", "channel": c} for c in channels]

can/interfaces/usb2can/usb2canabstractionlayer.py

Lines changed: 56 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
"""
55

66
from ctypes import *
7-
from struct import *
8-
from enum import Enum
7+
from enum import IntEnum
98
import logging
9+
from contextlib import contextmanager
1010

1111
import can
1212

@@ -25,7 +25,7 @@
2525
IS_ID_TYPE = 1
2626

2727

28-
class CanalError(Enum):
28+
class CanalError(IntEnum):
2929
SUCCESS = 0
3030
BAUDRATE = 1
3131
BUS_OFF = 2
@@ -102,6 +102,14 @@ class CanalMsg(Structure):
102102
]
103103

104104

105+
@contextmanager
106+
def error_check(error_message: str) -> None:
107+
try:
108+
yield
109+
except Exception as error:
110+
raise can.CanOperationError(error_message) from error
111+
112+
105113
class Usb2CanAbstractionLayer:
106114
"""A low level wrapper around the usb2can library.
107115
@@ -112,21 +120,26 @@ def __init__(self, dll="usb2can.dll"):
112120
"""
113121
:type dll: str or path-like
114122
:param dll (optional): the path to the usb2can DLL to load
115-
:raises OSError: if the DLL could not be loaded
123+
124+
:raises can.CanInterfaceNotImplementedError: if the DLL could not be loaded
116125
"""
117-
self.__m_dllBasic = windll.LoadLibrary(dll)
126+
try:
127+
self.__m_dllBasic = windll.LoadLibrary(dll)
128+
if self.__m_dllBasic is None:
129+
raise Exception("__m_dllBasic is None")
118130

119-
if self.__m_dllBasic is None:
120-
log.warning("DLL failed to load at path: {}".format(dll))
131+
except Exception as error:
132+
message = f"DLL failed to load at path: {dll}"
133+
raise can.CanInterfaceNotImplementedError(message) from error
121134

122-
def open(self, configuration, flags):
135+
def open(self, configuration: str, flags: int):
123136
"""
124137
Opens a CAN connection using `CanalOpen()`.
125138
126-
:param str configuration: the configuration: "device_id; baudrate"
127-
:param int flags: the flags to be set
139+
:param configuration: the configuration: "device_id; baudrate"
140+
:param flags: the flags to be set
128141
129-
:raises can.CanError: if any error occurred
142+
:raises can.CanInitializationError: if any error occurred
130143
:returns: Valid handle for CANAL API functions on success
131144
"""
132145
try:
@@ -136,100 +149,57 @@ def open(self, configuration, flags):
136149
result = self.__m_dllBasic.CanalOpen(config_ascii, flags)
137150
except Exception as ex:
138151
# catch any errors thrown by this call and re-raise
139-
raise can.CanError(
140-
'CanalOpen() failed, configuration: "{}", error: {}'.format(
141-
configuration, ex
142-
)
152+
raise can.CanInitializationError(
153+
f'CanalOpen() failed, configuration: "{configuration}", error: {ex}'
143154
)
144155
else:
145156
# any greater-than-zero return value indicates a success
146157
# (see https://grodansparadis.gitbooks.io/the-vscp-daemon/canal_interface_specification.html)
147158
# raise an error if the return code is <= 0
148159
if result <= 0:
149-
raise can.CanError(
150-
'CanalOpen() failed, configuration: "{}", return code: {}'.format(
151-
configuration, result
152-
)
160+
raise can.CanInitializationError(
161+
f'CanalOpen() failed, configuration: "{configuration}"',
162+
error_code=result,
153163
)
154164
else:
155165
return result
156166

157-
def close(self, handle):
158-
try:
159-
res = self.__m_dllBasic.CanalClose(handle)
160-
return CanalError(res)
161-
except:
162-
log.warning("Failed to close")
163-
raise
167+
def close(self, handle) -> CanalError:
168+
with error_check("Failed to close"):
169+
return CanalError(self.__m_dllBasic.CanalClose(handle))
164170

165-
def send(self, handle, msg):
166-
try:
167-
res = self.__m_dllBasic.CanalSend(handle, msg)
168-
return CanalError(res)
169-
except:
170-
log.warning("Sending error")
171-
raise can.CanError("Failed to transmit frame")
171+
def send(self, handle, msg) -> CanalError:
172+
with error_check("Failed to transmit frame"):
173+
return CanalError(self.__m_dllBasic.CanalSend(handle, msg))
172174

173-
def receive(self, handle, msg):
174-
try:
175-
res = self.__m_dllBasic.CanalReceive(handle, msg)
176-
return CanalError(res)
177-
except:
178-
log.warning("Receive error")
179-
raise
175+
def receive(self, handle, msg) -> CanalError:
176+
with error_check("Receive error"):
177+
return CanalError(self.__m_dllBasic.CanalReceive(handle, msg))
180178

181-
def blocking_send(self, handle, msg, timeout):
182-
try:
183-
res = self.__m_dllBasic.CanalBlockingSend(handle, msg, timeout)
184-
return CanalError(res)
185-
except:
186-
log.warning("Blocking send error")
187-
raise
179+
def blocking_send(self, handle, msg, timeout) -> CanalError:
180+
with error_check("Blocking send error"):
181+
return CanalError(self.__m_dllBasic.CanalBlockingSend(handle, msg, timeout))
188182

189-
def blocking_receive(self, handle, msg, timeout):
190-
try:
191-
res = self.__m_dllBasic.CanalBlockingReceive(handle, msg, timeout)
192-
return CanalError(res)
193-
except:
194-
log.warning("Blocking Receive Failed")
195-
raise
183+
def blocking_receive(self, handle, msg, timeout) -> CanalError:
184+
with error_check("Blocking Receive Failed"):
185+
return CanalError(self.__m_dllBasic.CanalBlockingReceive(handle, msg, timeout))
196186

197-
def get_status(self, handle, status):
198-
try:
199-
res = self.__m_dllBasic.CanalGetStatus(handle, status)
200-
return CanalError(res)
201-
except:
202-
log.warning("Get status failed")
203-
raise
187+
def get_status(self, handle, status) -> CanalError:
188+
with error_check("Get status failed"):
189+
return CanalError(self.__m_dllBasic.CanalGetStatus(handle, status))
204190

205-
def get_statistics(self, handle, statistics):
206-
try:
207-
res = self.__m_dllBasic.CanalGetStatistics(handle, statistics)
208-
return CanalError(res)
209-
except:
210-
log.warning("Get Statistics failed")
211-
raise
191+
def get_statistics(self, handle, statistics) -> CanalError:
192+
with error_check("Get Statistics failed"):
193+
return CanalError(self.__m_dllBasic.CanalGetStatistics(handle, statistics))
212194

213195
def get_version(self):
214-
try:
215-
res = self.__m_dllBasic.CanalGetVersion()
216-
return res
217-
except:
218-
log.warning("Failed to get version info")
219-
raise
196+
with error_check("Failed to get version info"):
197+
return self.__m_dllBasic.CanalGetVersion()
220198

221199
def get_library_version(self):
222-
try:
223-
res = self.__m_dllBasic.CanalGetDllVersion()
224-
return res
225-
except:
226-
log.warning("Failed to get DLL version")
227-
raise
200+
with error_check("Failed to get DLL version"):
201+
return self.__m_dllBasic.CanalGetDllVersion()
228202

229203
def get_vendor_string(self):
230-
try:
231-
res = self.__m_dllBasic.CanalGetVendorString()
232-
return res
233-
except:
234-
log.warning("Failed to get vendor string")
235-
raise
204+
with error_check("Failed to get vendor string"):
205+
return self.__m_dllBasic.CanalGetVendorString()

0 commit comments

Comments
 (0)