1
1
"""
2
- Interface for isCAN from Thorsis Technologies GmbH, former ifak system GmbH.
2
+ Interface for isCAN from * Thorsis Technologies GmbH* , former * ifak system GmbH* .
3
3
"""
4
4
5
5
import ctypes
6
6
import time
7
7
import logging
8
+ from typing import Optional , Tuple , Union
8
9
9
- from can import CanError , BusABC , Message
10
+ from can import BusABC , Message
11
+ from can import (
12
+ CanError ,
13
+ CanInterfaceNotImplementedError ,
14
+ CanInitializationError ,
15
+ CanOperationError ,
16
+ )
10
17
11
18
logger = logging .getLogger (__name__ )
12
19
@@ -23,9 +30,15 @@ class MessageExStruct(ctypes.Structure):
23
30
]
24
31
25
32
26
- def check_status (result , function , arguments ):
33
+ def check_status_initialization (result : int , function , arguments ) -> int :
27
34
if result > 0 :
28
- raise IscanError (function , result , arguments )
35
+ raise IscanInitializationError (function , result , arguments )
36
+ return result
37
+
38
+
39
+ def check_status (result : int , function , arguments ) -> int :
40
+ if result > 0 :
41
+ raise IscanOperationError (function , result , arguments )
29
42
return result
30
43
31
44
@@ -36,12 +49,15 @@ def check_status(result, function, arguments):
36
49
logger .warning ("Failed to load IS-CAN driver: %s" , e )
37
50
else :
38
51
iscan .isCAN_DeviceInitEx .argtypes = [ctypes .c_ubyte , ctypes .c_ubyte ]
39
- iscan .isCAN_DeviceInitEx .errcheck = check_status
52
+ iscan .isCAN_DeviceInitEx .errcheck = check_status_initialization
40
53
iscan .isCAN_DeviceInitEx .restype = ctypes .c_ubyte
54
+
41
55
iscan .isCAN_ReceiveMessageEx .errcheck = check_status
42
56
iscan .isCAN_ReceiveMessageEx .restype = ctypes .c_ubyte
57
+
43
58
iscan .isCAN_TransmitMessageEx .errcheck = check_status
44
59
iscan .isCAN_TransmitMessageEx .restype = ctypes .c_ubyte
60
+
45
61
iscan .isCAN_CloseDevice .errcheck = check_status
46
62
iscan .isCAN_CloseDevice .restype = ctypes .c_ubyte
47
63
@@ -62,24 +78,29 @@ class IscanBus(BusABC):
62
78
1000000 : 9 ,
63
79
}
64
80
65
- def __init__ (self , channel , bitrate = 500000 , poll_interval = 0.01 , ** kwargs ):
81
+ def __init__ (
82
+ self ,
83
+ channel : Union [str , int ],
84
+ bitrate : int = 500000 ,
85
+ poll_interval : float = 0.01 ,
86
+ ** kwargs ,
87
+ ) -> None :
66
88
"""
67
- :param int channel:
89
+ :param channel:
68
90
Device number
69
- :param int bitrate:
91
+ :param bitrate:
70
92
Bitrate in bits/s
71
- :param float poll_interval:
93
+ :param poll_interval:
72
94
Poll interval in seconds when reading messages
73
95
"""
74
96
if iscan is None :
75
- raise ImportError ("Could not load isCAN driver" )
97
+ raise CanInterfaceNotImplementedError ("Could not load isCAN driver" )
76
98
77
99
self .channel = ctypes .c_ubyte (int (channel ))
78
- self .channel_info = "IS-CAN: %s" % channel
100
+ self .channel_info = f "IS-CAN: { self . channel } "
79
101
80
102
if bitrate not in self .BAUDRATES :
81
- valid_bitrates = ", " .join (str (bitrate ) for bitrate in self .BAUDRATES )
82
- raise ValueError ("Invalid bitrate, choose one of " + valid_bitrates )
103
+ raise ValueError (f"Invalid bitrate, choose one of { set (self .BAUDRATES )} " )
83
104
84
105
self .poll_interval = poll_interval
85
106
iscan .isCAN_DeviceInitEx (self .channel , self .BAUDRATES [bitrate ])
@@ -88,14 +109,16 @@ def __init__(self, channel, bitrate=500000, poll_interval=0.01, **kwargs):
88
109
channel = channel , bitrate = bitrate , poll_interval = poll_interval , ** kwargs
89
110
)
90
111
91
- def _recv_internal (self , timeout ):
112
+ def _recv_internal (
113
+ self , timeout : Optional [float ]
114
+ ) -> Tuple [Optional [Message ], bool ]:
92
115
raw_msg = MessageExStruct ()
93
116
end_time = time .time () + timeout if timeout is not None else None
94
117
while True :
95
118
try :
96
119
iscan .isCAN_ReceiveMessageEx (self .channel , ctypes .byref (raw_msg ))
97
120
except IscanError as e :
98
- if e .error_code != 8 :
121
+ if e .error_code != 8 : # "No message received"
99
122
# An error occurred
100
123
raise
101
124
if end_time is not None and time .time () > end_time :
@@ -118,7 +141,7 @@ def _recv_internal(self, timeout):
118
141
)
119
142
return msg , False
120
143
121
- def send (self , msg , timeout = None ):
144
+ def send (self , msg : Message , timeout : Optional [ float ] = None ) -> None :
122
145
raw_msg = MessageExStruct (
123
146
msg .arbitration_id ,
124
147
bool (msg .is_extended_id ),
@@ -128,14 +151,14 @@ def send(self, msg, timeout=None):
128
151
)
129
152
iscan .isCAN_TransmitMessageEx (self .channel , ctypes .byref (raw_msg ))
130
153
131
- def shutdown (self ):
154
+ def shutdown (self ) -> None :
132
155
iscan .isCAN_CloseDevice (self .channel )
133
156
134
157
135
158
class IscanError (CanError ):
136
- # TODO: document
137
159
138
160
ERROR_CODES = {
161
+ 0 : "Success" ,
139
162
1 : "No access to device" ,
140
163
2 : "Device with ID not found" ,
141
164
3 : "Driver operation failed" ,
@@ -161,17 +184,28 @@ class IscanError(CanError):
161
184
40 : "Need a licence number under NT4" ,
162
185
}
163
186
164
- def __init__ (self , function , error_code , arguments ):
165
- super ().__init__ ()
166
- # :Status code
187
+ def __init__ (self , function , error_code : int , arguments ) -> None :
188
+ try :
189
+ description = ": " + self .ERROR_CODES [self .error_code ]
190
+ except KeyError :
191
+ description = ""
192
+
193
+ super ().__init__ (
194
+ f"Function { self .function .__name__ } failed{ description } " ,
195
+ error_code = error_code ,
196
+ )
197
+
198
+ #: Status code
167
199
self .error_code = error_code
168
- # : Function that failed
200
+ #: Function that failed
169
201
self .function = function
170
- # : Arguments passed to function
202
+ #: Arguments passed to function
171
203
self .arguments = arguments
172
204
173
- def __str__ (self ):
174
- description = self .ERROR_CODES .get (
175
- self .error_code , "Error code %d" % self .error_code
176
- )
177
- return "Function %s failed: %s" % (self .function .__name__ , description )
205
+
206
+ class IscanOperationError (IscanError , CanOperationError ):
207
+ pass
208
+
209
+
210
+ class IscanInitializationError (IscanError , CanInitializationError ):
211
+ pass
0 commit comments