3
3
import re
4
4
import sys
5
5
from datetime import datetime
6
- from typing import TYPE_CHECKING , Any , Dict , List , Sequence , Tuple , Union
6
+ from typing import (
7
+ TYPE_CHECKING ,
8
+ Any ,
9
+ Dict ,
10
+ List ,
11
+ Optional ,
12
+ Sequence ,
13
+ Tuple ,
14
+ Union ,
15
+ )
7
16
8
17
import can
18
+ from can import Bus , BusState , Logger , SizedRotatingLogger
19
+ from can .typechecking import TAdditionalCliArgs
9
20
from can .util import cast_from_string
10
21
11
- from . import Bus , BusState , Logger , SizedRotatingLogger
12
- from .typechecking import CanFilter , CanFilters
13
-
14
22
if TYPE_CHECKING :
15
23
from can .io import BaseRotatingLogger
16
24
from can .io .generic import MessageWriter
25
+ from can .typechecking import CanFilter
17
26
18
27
19
28
def _create_base_argument_parser (parser : argparse .ArgumentParser ) -> None :
@@ -60,10 +69,7 @@ def _create_base_argument_parser(parser: argparse.ArgumentParser) -> None:
60
69
61
70
62
71
def _append_filter_argument (
63
- parser : Union [
64
- argparse .ArgumentParser ,
65
- argparse ._ArgumentGroup ,
66
- ],
72
+ parser : Union [argparse .ArgumentParser , argparse ._ArgumentGroup ],
67
73
* args : str ,
68
74
** kwargs : Any ,
69
75
) -> None :
@@ -78,16 +84,17 @@ def _append_filter_argument(
78
84
"\n <can_id>~<can_mask> (matches when <received_can_id> & mask !="
79
85
" can_id & mask)"
80
86
"\n Fx to show only frames with ID 0x100 to 0x103 and 0x200 to 0x20F:"
81
- "\n python -m can.viewer -f 100:7FC 200:7F0"
87
+ "\n python -m can.viewer --filter 100:7FC 200:7F0"
82
88
"\n Note that the ID and mask are always interpreted as hex values" ,
83
89
metavar = "{<can_id>:<can_mask>,<can_id>~<can_mask>}" ,
84
90
nargs = argparse .ONE_OR_MORE ,
85
- default = "" ,
91
+ action = _CanFilterAction ,
92
+ dest = "can_filters" ,
86
93
** kwargs ,
87
94
)
88
95
89
96
90
- def _create_bus (parsed_args : Any , ** kwargs : Any ) -> can .BusABC :
97
+ def _create_bus (parsed_args : argparse . Namespace , ** kwargs : Any ) -> can .BusABC :
91
98
logging_level_names = ["critical" , "error" , "warning" , "info" , "debug" , "subdebug" ]
92
99
can .set_logging_level (logging_level_names [min (5 , parsed_args .verbosity )])
93
100
@@ -100,16 +107,27 @@ def _create_bus(parsed_args: Any, **kwargs: Any) -> can.BusABC:
100
107
config ["fd" ] = True
101
108
if parsed_args .data_bitrate :
102
109
config ["data_bitrate" ] = parsed_args .data_bitrate
110
+ if getattr (parsed_args , "can_filters" , None ):
111
+ config ["can_filters" ] = parsed_args .can_filters
103
112
104
113
return Bus (parsed_args .channel , ** config )
105
114
106
115
107
- def _parse_filters (parsed_args : Any ) -> CanFilters :
108
- can_filters : List [CanFilter ] = []
116
+ class _CanFilterAction (argparse .Action ):
117
+ def __call__ (
118
+ self ,
119
+ parser : argparse .ArgumentParser ,
120
+ namespace : argparse .Namespace ,
121
+ values : Union [str , Sequence [Any ], None ],
122
+ option_string : Optional [str ] = None ,
123
+ ) -> None :
124
+ if not isinstance (values , list ):
125
+ raise argparse .ArgumentError (None , "Invalid filter argument" )
126
+
127
+ print (f"Adding filter(s): { values } " )
128
+ can_filters : List [CanFilter ] = []
109
129
110
- if parsed_args .filter :
111
- print (f"Adding filter(s): { parsed_args .filter } " )
112
- for filt in parsed_args .filter :
130
+ for filt in values :
113
131
if ":" in filt :
114
132
parts = filt .split (":" )
115
133
can_id = int (parts [0 ], base = 16 )
@@ -122,12 +140,10 @@ def _parse_filters(parsed_args: Any) -> CanFilters:
122
140
raise argparse .ArgumentError (None , "Invalid filter argument" )
123
141
can_filters .append ({"can_id" : can_id , "can_mask" : can_mask })
124
142
125
- return can_filters
143
+ setattr ( namespace , self . dest , can_filters )
126
144
127
145
128
- def _parse_additional_config (
129
- unknown_args : Sequence [str ],
130
- ) -> Dict [str , Union [str , int , float , bool ]]:
146
+ def _parse_additional_config (unknown_args : Sequence [str ]) -> TAdditionalCliArgs :
131
147
for arg in unknown_args :
132
148
if not re .match (r"^--[a-zA-Z\-]*?=\S*?$" , arg ):
133
149
raise ValueError (f"Parsing argument { arg } failed" )
@@ -142,12 +158,18 @@ def _split_arg(_arg: str) -> Tuple[str, str]:
142
158
return args
143
159
144
160
145
- def main () -> None :
161
+ def _parse_logger_args (
162
+ args : List [str ],
163
+ ) -> Tuple [argparse .Namespace , TAdditionalCliArgs ]:
164
+ """Parse command line arguments for logger script."""
165
+
146
166
parser = argparse .ArgumentParser (
147
167
description = "Log CAN traffic, printing messages to stdout or to a "
148
168
"given file." ,
149
169
)
150
170
171
+ # Generate the standard arguments:
172
+ # Channel, bitrate, data_bitrate, interface, app_name, CAN-FD support
151
173
_create_base_argument_parser (parser )
152
174
153
175
parser .add_argument (
@@ -200,13 +222,18 @@ def main() -> None:
200
222
)
201
223
202
224
# print help message when no arguments were given
203
- if len ( sys . argv ) < 2 :
225
+ if not args :
204
226
parser .print_help (sys .stderr )
205
227
raise SystemExit (errno .EINVAL )
206
228
207
- results , unknown_args = parser .parse_known_args ()
229
+ results , unknown_args = parser .parse_known_args (args )
208
230
additional_config = _parse_additional_config ([* results .extra_args , * unknown_args ])
209
- bus = _create_bus (results , can_filters = _parse_filters (results ), ** additional_config )
231
+ return results , additional_config
232
+
233
+
234
+ def main () -> None :
235
+ results , additional_config = _parse_logger_args (sys .argv [1 :])
236
+ bus = _create_bus (results , ** additional_config )
210
237
211
238
if results .active :
212
239
bus .state = BusState .ACTIVE
0 commit comments