1
+
2
+
3
+ import pytest
4
+ from pytest_mock import MockerFixture
5
+
6
+ from uds .uds_communications .TransportProtocols .Can .CanTp import CanTp
7
+ from uds .uds_communications .TransportProtocols .Can .CanTpTypes import CanTpAddressingTypes
8
+ from uds .config import Config , IsoTpConfig
9
+
10
+
11
+ @pytest .fixture
12
+ def can_tp_inst (mocker : MockerFixture ):
13
+ Config .isotp = IsoTpConfig (
14
+ req_id = 0x12 ,
15
+ res_id = 0x21 ,
16
+ addressing_type = "NORMAL" ,
17
+ n_ae = 0 ,
18
+ n_sa = 0 ,
19
+ n_ta = 0 ,
20
+ m_type = 'DIAGNOSTICS' ,
21
+ discard_neg_resp = False
22
+ )
23
+ CanTp .PADDING_PATTERN = 0xCC
24
+ return CanTp (is_fd = False )
25
+
26
+
27
+ @pytest .fixture
28
+ def can_fd_tp_inst (mocker : MockerFixture ):
29
+ Config .isotp = IsoTpConfig (
30
+ req_id = 0x12 ,
31
+ res_id = 0x21 ,
32
+ addressing_type = "NORMAL" ,
33
+ n_ae = 0 ,
34
+ n_sa = 0 ,
35
+ n_ta = 0 ,
36
+ m_type = 'DIAGNOSTICS' ,
37
+ discard_neg_resp = False
38
+ )
39
+ CanTp .PADDING_PATTERN = 0xCC
40
+ return CanTp (is_fd = True )
41
+
42
+
43
+ @pytest .mark .parametrize (
44
+ "expected_stmin_value, expected_stmin_time" ,
45
+ [
46
+ (0x01 , 1e-3 ), (0x7f , 127e-3 ), (0xF1 , 1e-4 ), (0xF5 , 5e-4 ), (0xF9 , 9e-4 ),
47
+ ]
48
+ )
49
+ def test_stmin_decode_encode (expected_stmin_value , expected_stmin_time ):
50
+ stmin_value = CanTp .encode_stMin (expected_stmin_time )
51
+ assert stmin_value == expected_stmin_value
52
+
53
+ stmin_time = CanTp .decode_stMin (expected_stmin_value )
54
+ assert stmin_time == expected_stmin_time
55
+
56
+
57
+ @pytest .mark .parametrize (
58
+ "payload_to_send, expected_single_frame" ,
59
+ # less than 7 payload bytes -> 1st byte = 0x0 + MDL | payload | padding (reach 8 bytes)
60
+ [
61
+ (
62
+ [0x22 , 0x01 , 0x00 , 0x00 , 0x00 ],
63
+ [0x05 , 0x22 , 0x01 , 0x00 , 0x00 , 0x00 , 0xCC , 0xCC ]
64
+ ),
65
+ (
66
+ [0x22 , 0x01 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 ],
67
+ [0x07 , 0x22 , 0x01 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 ]
68
+ ),
69
+ # more than 7 payload bytes -> 1st byte = 0 | 2nd byte = MDL | payload | padding (reach 12 bytes)
70
+ (
71
+ [0x22 , 0x01 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 ],
72
+ [0x00 , 0x08 , 0x22 , 0x01 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0xCC , 0xCC ]
73
+ ),
74
+ ],
75
+ ids = ["less than 8 bytes" , "exactly 8 bytes" , "more than 8 bytes" ]
76
+ )
77
+ def test_make_single_frame_fd (can_fd_tp_inst : CanTp , payload_to_send , expected_single_frame ):
78
+ single_frame = can_fd_tp_inst .make_single_frame (payload_to_send )
79
+ assert single_frame == expected_single_frame
80
+
81
+
82
+ @pytest .mark .parametrize (
83
+ "payload_to_send, expected_transmit_calls, expected_recv_call_count" ,
84
+ [
85
+ pytest .param (
86
+ [0x12 ] * 5 ,
87
+ [[0x05 ] + [0x12 ] * 5 + [0xCC ] * 2 ],
88
+ 0 ,
89
+ id = "PDU smaller than 8 bytes -> send 8 bytes with padding"
90
+ ),
91
+ pytest .param (
92
+ [0x12 ] * 7 ,
93
+ [[0x07 ] + [0x12 ] * 7 ],
94
+ 0 ,
95
+ id = "PDU exactly 8 bytes -> send 8 bytes without padding"
96
+ ),
97
+ pytest .param (
98
+ [0x12 ] * 8 ,
99
+ [[0x00 , 8 ] + [0x12 ] * 8 + [0xCC ] * 2 ],
100
+ 0 ,
101
+ id = "PDU greater than 8 bytes -> send 12 bytes with padding"
102
+ ),
103
+ pytest .param (
104
+ [0x12 ] * 11 ,
105
+ [([0x00 , 11 ]) + ([0x12 ] * 11 ) + ([0xCC ] * (16 - 11 - 2 ))],
106
+ 0 ,
107
+ id = "PDU greater than 12 bytes -> send 16 bytes with padding"
108
+ ),
109
+ pytest .param (
110
+ [0x12 ] * 15 ,
111
+ [[0x00 , 15 ] + [0x12 ] * 15 + [0xCC ] * (20 - 15 - 2 )],
112
+ 0 ,
113
+ id = "PDU greater than 16 bytes -> send 20 bytes with padding"
114
+ ),
115
+ pytest .param (
116
+ [0x12 ] * 64 ,
117
+ [
118
+ ([0x10 , 64 ] + [0x12 ] * (64 - 2 )), # first frame
119
+ ([0x21 ] + [0x12 ] * 2 + [0xCC ] * 5 ) # single consecutive frame
120
+ ],
121
+ 1 , # received 'flow control - continue to send'
122
+ id = "PDU greater than 64 bytes -> flow control"
123
+ ),
124
+ ]
125
+ )
126
+ def test_encode_isotp_canfd (can_fd_tp_inst : CanTp , mocker : MockerFixture , payload_to_send , expected_transmit_calls , expected_recv_call_count ):
127
+ # the only case where this is called is for flow control -> return a 'continue to send' frame
128
+ mock_recv = mocker .patch .object (can_fd_tp_inst , "getNextBufferedMessage" , return_value = [0x30 , 0x10 , 0x14 ])
129
+ mock_send = mocker .patch .object (can_fd_tp_inst , "transmit" )
130
+ mocker .patch ("time.sleep" )
131
+
132
+ can_fd_tp_inst .encode_isotp (payload_to_send )
133
+
134
+ assert mock_recv .call_count == expected_recv_call_count
135
+ assert mock_send .call_count == len (expected_transmit_calls )
136
+ for call , expected_sent_data in zip (mock_send .call_args_list , expected_transmit_calls ):
137
+ assert call .args [0 ] == expected_sent_data
138
+
139
+
140
+ FIRST_FRAME_HEADER_LEN = 2
141
+ CAN_FRAME_LEN = 8
142
+
143
+ @pytest .mark .parametrize (
144
+ "payload_to_send, expected_transmit_calls, expected_recv_call_count" ,
145
+ [
146
+ # 12 12 12 12 12
147
+ # -> 05 12 12 12 12 CC CC CC
148
+ pytest .param (
149
+ [0x12 ] * 5 ,
150
+ [[0x05 ] + [0x12 ] * 5 + [0xCC ] * 2 ],
151
+ 0 ,
152
+ id = "PDU smaller than 8 bytes -> send 8 bytes with padding"
153
+ ),
154
+ pytest .param (
155
+ [0x12 ] * 7 ,
156
+ [[0x07 ] + [0x12 ] * 7 ],
157
+ 0 ,
158
+ id = "PDU exactly 8 bytes -> send 8 bytes without padding"
159
+ ),
160
+ # 12 12 12 12 12 12 12 12
161
+ # -> 10 08 12 12 12 12 12 12 First frame
162
+ # <- 30 10 14 Continue to send frame
163
+ # -> 21 12 12 CC CC CC CC CC Consecutive frame
164
+ pytest .param (
165
+ [0x12 ] * 8 ,
166
+ [
167
+ ([0x10 , 8 ] + [0x12 ] * (CAN_FRAME_LEN - FIRST_FRAME_HEADER_LEN )), # first frame
168
+ ([0x21 ] + [0x12 ] * FIRST_FRAME_HEADER_LEN + [0xCC ] * 5 ) # single consecutive frame
169
+ ],
170
+ 1 ,
171
+ id = "PDU greater than 8 bytes -> send 12 bytes with padding"
172
+ ),
173
+ pytest .param (
174
+ [0x12 ] * 11 ,
175
+ [
176
+ ([0x10 , 11 ] + [0x12 ] * (CAN_FRAME_LEN - FIRST_FRAME_HEADER_LEN )), # first frame
177
+ ([0x21 ] + [0x12 ] * 5 + [0xCC ] * 2 ) # single consecutive frame
178
+ ],
179
+ 1 ,
180
+ id = "PDU greater than 12 bytes -> send 16 bytes with padding"
181
+ ),
182
+ pytest .param (
183
+ [0x12 ] * 64 ,
184
+ [
185
+ ([0x10 , 64 ] + [0x12 ] * 6 ), # first frame 6 bytes
186
+ ([0x21 ] + [0x12 ] * 7 ), # consecutive frame 13 bytes
187
+ ([0x22 ] + [0x12 ] * 7 ), # consecutive frame 20 bytes
188
+ ([0x23 ] + [0x12 ] * 7 ), # consecutive frame 27 bytes
189
+ ([0x24 ] + [0x12 ] * 7 ), # consecutive frame 34 bytes
190
+ ([0x25 ] + [0x12 ] * 7 ), # consecutive frame 41 bytes
191
+ ([0x26 ] + [0x12 ] * 7 ), # consecutive frame 48 bytes
192
+ ([0x27 ] + [0x12 ] * 7 ), # consecutive frame 55 bytes
193
+ ([0x28 ] + [0x12 ] * 7 ), # consecutive frame 62 bytes
194
+ ([0x29 ] + [0x12 ] * 2 + [0xCC ] * 5 ), # consecutive frame 64 bytes + padding
195
+ ],
196
+ 1 , # received 'flow control - continue to send'
197
+ id = "PDU greater than 64 bytes -> flow control"
198
+ ),
199
+ ]
200
+ )
201
+ def test_encode_isotp_can (can_tp_inst : CanTp , mocker : MockerFixture , payload_to_send , expected_transmit_calls , expected_recv_call_count ):
202
+ # the only case where this is called is for flow control -> return a 'continue to send' frame
203
+ mock_recv = mocker .patch .object (can_tp_inst , "getNextBufferedMessage" , return_value = [0x30 , 0x10 , 0x14 ])
204
+ mock_send = mocker .patch .object (can_tp_inst , "transmit" )
205
+ mocker .patch ("time.sleep" )
206
+
207
+ can_tp_inst .encode_isotp (payload_to_send )
208
+
209
+ assert mock_recv .call_count == expected_recv_call_count
210
+ assert mock_send .call_count == len (expected_transmit_calls )
211
+ for call , expected_sent_data in zip (mock_send .call_args_list , expected_transmit_calls ):
212
+ assert call .args [0 ] == expected_sent_data
0 commit comments