13
13
from multiprocessing .dummy import Pool as ThreadPool
14
14
15
15
import pytest
16
+ import random
16
17
17
18
import can
18
19
@@ -168,8 +169,6 @@ def test_broadcast_channel(self):
168
169
169
170
170
171
class TestThreadSafeBus (Back2BackTestCase ):
171
- """Does some testing that is better than nothing.
172
- """
173
172
174
173
def setUp (self ):
175
174
self .bus1 = can .ThreadSafeBus (channel = self .CHANNEL_1 ,
@@ -190,6 +189,7 @@ def test_concurrent_writes(self):
190
189
191
190
message = can .Message (
192
191
arbitration_id = 0x123 ,
192
+ channel = self .CHANNEL_1 ,
193
193
is_extended_id = True ,
194
194
timestamp = 121334.365 ,
195
195
data = [254 , 255 , 1 , 2 ]
@@ -200,12 +200,57 @@ def sender(msg):
200
200
self .bus1 .send (msg )
201
201
202
202
def receiver (_ ):
203
- result = self .bus2 .recv (timeout = 2.0 )
204
- self .assertIsNotNone (result )
205
- self .assertEqual (result , message )
203
+ return self .bus2 .recv (timeout = 2.0 )
206
204
207
205
sender_pool .map_async (sender , workload )
208
- receiver_pool .map_async (receiver , len (workload ) * [None ])
206
+ for msg in receiver_pool .map (receiver , len (workload ) * [None ]):
207
+ self .assertIsNotNone (msg )
208
+ self .assertEqual (message .arbitration_id , msg .arbitration_id )
209
+ self .assertTrue (message .equals (msg , timestamp_delta = None ))
210
+
211
+ sender_pool .close ()
212
+ sender_pool .join ()
213
+ receiver_pool .close ()
214
+ receiver_pool .join ()
215
+
216
+ @pytest .mark .timeout (5.0 )
217
+ def test_filtered_bus (self ):
218
+ sender_pool = ThreadPool (100 )
219
+ receiver_pool = ThreadPool (100 )
220
+
221
+ included_message = can .Message (
222
+ arbitration_id = 0x123 ,
223
+ channel = self .CHANNEL_1 ,
224
+ is_extended_id = True ,
225
+ timestamp = 121334.365 ,
226
+ data = [254 , 255 , 1 , 2 ]
227
+ )
228
+ excluded_message = can .Message (
229
+ arbitration_id = 0x02 ,
230
+ channel = self .CHANNEL_1 ,
231
+ is_extended_id = True ,
232
+ timestamp = 121334.300 ,
233
+ data = [1 , 2 , 3 ]
234
+ )
235
+ workload = 500 * [included_message ] + 500 * [excluded_message ]
236
+ random .shuffle (workload )
237
+
238
+ self .bus2 .set_filters ([{"can_id" : 0x123 , "can_mask" : 0xff , "extended" : True }])
239
+
240
+ def sender (msg ):
241
+ self .bus1 .send (msg )
242
+
243
+ def receiver (_ ):
244
+ return self .bus2 .recv (timeout = 2.0 )
245
+
246
+ sender_pool .map_async (sender , workload )
247
+ received_msgs = receiver_pool .map (receiver , 500 * [None ])
248
+
249
+ for msg in received_msgs :
250
+ self .assertIsNotNone (msg )
251
+ self .assertEqual (msg .arbitration_id , included_message .arbitration_id )
252
+ self .assertTrue (included_message .equals (msg , timestamp_delta = None ))
253
+ self .assertEqual (len (received_msgs ), 500 )
209
254
210
255
sender_pool .close ()
211
256
sender_pool .join ()
0 commit comments