Skip to content

Commit c2a77fc

Browse files
committed
wip
1 parent f43bedb commit c2a77fc

File tree

9 files changed

+556
-246
lines changed

9 files changed

+556
-246
lines changed

can/cli.py

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
import argparse
2+
import re
3+
from collections.abc import Sequence
4+
from typing import Any, Optional, Union
5+
6+
import can
7+
from can.typechecking import CanFilter, TAdditionalCliArgs
8+
from can.util import _dict2timing, cast_from_string
9+
10+
11+
def add_bus_arguments(
12+
parser: argparse.ArgumentParser,
13+
*,
14+
filter_arg: bool = False,
15+
prefix: Optional[str] = None,
16+
group_title: str = "bus arguments",
17+
) -> None:
18+
"""Adds CAN bus configuration options to an argument parser.
19+
20+
:param parser:
21+
The argument parser to which the options will be added.
22+
:param filter_arg:
23+
Whether to include the filter argument.
24+
:param prefix:
25+
An optional prefix for the argument names, allowing configuration of multiple buses.
26+
:param group_title:
27+
The title of the argument group.
28+
"""
29+
if prefix:
30+
group_title += f" ({prefix})"
31+
group = parser.add_argument_group(group_title)
32+
33+
flags = [f"--{prefix}-channel"] if prefix else ["-c", "--channel"]
34+
dest = f"{prefix}_channel" if prefix else "channel"
35+
group.add_argument(
36+
*flags,
37+
dest=dest,
38+
default=argparse.SUPPRESS,
39+
metavar="CHANNEL",
40+
help=r"Most backend interfaces require some sort of channel. For "
41+
r"example with the serial interface the channel might be a rfcomm"
42+
r' device: "/dev/rfcomm0". With the socketcan interface valid '
43+
r'channel examples include: "can0", "vcan0".',
44+
)
45+
46+
flags = [f"--{prefix}-interface"] if prefix else ["-i", "--interface"]
47+
dest = f"{prefix}_interface" if prefix else "interface"
48+
group.add_argument(
49+
*flags,
50+
dest=dest,
51+
default=argparse.SUPPRESS,
52+
choices=sorted(can.VALID_INTERFACES),
53+
help="""Specify the backend CAN interface to use. If left blank,
54+
fall back to reading from configuration files.""",
55+
)
56+
57+
flags = [f"--{prefix}-bitrate"] if prefix else ["-b", "--bitrate"]
58+
dest = f"{prefix}_bitrate" if prefix else "bitrate"
59+
group.add_argument(
60+
*flags,
61+
dest=dest,
62+
type=int,
63+
default=argparse.SUPPRESS,
64+
metavar="BITRATE",
65+
help="Bitrate to use for the CAN bus.",
66+
)
67+
68+
flags = [f"--{prefix}-fd"] if prefix else ["--fd"]
69+
dest = f"{prefix}_fd" if prefix else "fd"
70+
group.add_argument(
71+
*flags,
72+
dest=dest,
73+
default=argparse.SUPPRESS,
74+
action="store_true",
75+
help="Activate CAN-FD support",
76+
)
77+
78+
flags = [f"--{prefix}-data-bitrate"] if prefix else ["--data-bitrate"]
79+
dest = f"{prefix}_data_bitrate" if prefix else "data_bitrate"
80+
group.add_argument(
81+
*flags,
82+
dest=dest,
83+
type=int,
84+
default=argparse.SUPPRESS,
85+
metavar="DATA_BITRATE",
86+
help="Bitrate to use for the data phase in case of CAN-FD.",
87+
)
88+
89+
flags = [f"--{prefix}-timing"] if prefix else ["--timing"]
90+
dest = f"{prefix}_timing" if prefix else "timing"
91+
group.add_argument(
92+
*flags,
93+
dest=dest,
94+
action=_BitTimingAction,
95+
nargs=argparse.ONE_OR_MORE,
96+
default=argparse.SUPPRESS,
97+
metavar="TIMING_ARG",
98+
help="Configure bit rate and bit timing. For example, use "
99+
"`--timing f_clock=8_000_000 tseg1=5 tseg2=2 sjw=2 brp=2 nof_samples=1` for classical CAN "
100+
"or `--timing f_clock=80_000_000 nom_tseg1=119 nom_tseg2=40 nom_sjw=40 nom_brp=1 "
101+
"data_tseg1=29 data_tseg2=10 data_sjw=10 data_brp=1` for CAN FD. "
102+
"Check the python-can documentation to verify whether your "
103+
"CAN interface supports the `timing` argument.",
104+
)
105+
106+
if filter_arg:
107+
flags = [f"--{prefix}-filter"] if prefix else ["--filter"]
108+
dest = f"{prefix}_can_filters" if prefix else "can_filters"
109+
group.add_argument(
110+
*flags,
111+
dest=dest,
112+
nargs=argparse.ONE_OR_MORE,
113+
action=_CanFilterAction,
114+
default=argparse.SUPPRESS,
115+
metavar="{<can_id>:<can_mask>,<can_id>~<can_mask>}",
116+
help="R|Space separated CAN filters for the given CAN interface:"
117+
"\n <can_id>:<can_mask> (matches when <received_can_id> & mask =="
118+
" can_id & mask)"
119+
"\n <can_id>~<can_mask> (matches when <received_can_id> & mask !="
120+
" can_id & mask)"
121+
"\nFx to show only frames with ID 0x100 to 0x103 and 0x200 to 0x20F:"
122+
"\n python -m can.viewer --filter 100:7FC 200:7F0"
123+
"\nNote that the ID and mask are always interpreted as hex values",
124+
)
125+
126+
flags = [f"--{prefix}-bus-kwargs"] if prefix else ["--bus-kwargs"]
127+
dest = f"{prefix}_bus_kwargs" if prefix else "bus_kwargs"
128+
group.add_argument(
129+
*flags,
130+
dest=dest,
131+
action=_BusKwargsAction,
132+
nargs=argparse.ONE_OR_MORE,
133+
default=argparse.SUPPRESS,
134+
metavar="BUS_KWARG",
135+
help="Pass keyword arguments down to the instantiation of the bus class. "
136+
"For example, `-i vector -c 1 --bus-kwargs app_name=MyCanApp serial=1234` is equivalent "
137+
"to opening the bus with `can.Bus('vector', channel=1, app_name='MyCanApp', serial=1234)",
138+
)
139+
140+
141+
def create_bus_from_namespace(
142+
namespace: argparse.Namespace,
143+
*,
144+
prefix: Optional[str] = None,
145+
**kwargs: Any,
146+
) -> can.BusABC:
147+
"""Creates and returns a CAN bus instance based on the provided namespace and arguments.
148+
149+
:param namespace:
150+
The namespace containing parsed arguments.
151+
:param prefix:
152+
An optional prefix for the argument names, enabling support for multiple buses.
153+
:param kwargs:
154+
Additional keyword arguments to configure the bus.
155+
:return:
156+
A CAN bus instance.
157+
"""
158+
config: dict[str, Any] = {"single_handle": True, **kwargs}
159+
160+
for keyword in (
161+
"channel",
162+
"interface",
163+
"bitrate",
164+
"fd",
165+
"data_bitrate",
166+
"can_filters",
167+
"timing",
168+
"bus_kwargs",
169+
):
170+
prefixed_keyword = f"{prefix}_{keyword}" if prefix else keyword
171+
172+
if prefixed_keyword in namespace:
173+
value = getattr(namespace, prefixed_keyword)
174+
175+
if keyword == "bus_kwargs":
176+
config.update(value)
177+
else:
178+
config[keyword] = value
179+
180+
try:
181+
return can.Bus(**config)
182+
except Exception as exc:
183+
err_msg = f"Unable to instantiate bus from arguments {vars(namespace)}."
184+
raise argparse.ArgumentError(None, err_msg) from exc
185+
186+
187+
class _CanFilterAction(argparse.Action):
188+
def __call__(
189+
self,
190+
parser: argparse.ArgumentParser,
191+
namespace: argparse.Namespace,
192+
values: Union[str, Sequence[Any], None],
193+
option_string: Optional[str] = None,
194+
) -> None:
195+
if not isinstance(values, list):
196+
raise argparse.ArgumentError(self, "Invalid filter argument")
197+
198+
print(f"Adding filter(s): {values}")
199+
can_filters: list[CanFilter] = []
200+
201+
for filt in values:
202+
if ":" in filt:
203+
parts = filt.split(":")
204+
can_id = int(parts[0], base=16)
205+
can_mask = int(parts[1], base=16)
206+
elif "~" in filt:
207+
parts = filt.split("~")
208+
can_id = int(parts[0], base=16) | 0x20000000 # CAN_INV_FILTER
209+
can_mask = int(parts[1], base=16) & 0x20000000 # socket.CAN_ERR_FLAG
210+
else:
211+
raise argparse.ArgumentError(self, "Invalid filter argument")
212+
can_filters.append({"can_id": can_id, "can_mask": can_mask})
213+
214+
setattr(namespace, self.dest, can_filters)
215+
216+
217+
class _BitTimingAction(argparse.Action):
218+
def __call__(
219+
self,
220+
parser: argparse.ArgumentParser,
221+
namespace: argparse.Namespace,
222+
values: Union[str, Sequence[Any], None],
223+
option_string: Optional[str] = None,
224+
) -> None:
225+
if not isinstance(values, list):
226+
raise argparse.ArgumentError(self, "Invalid --timing argument")
227+
228+
timing_dict: dict[str, int] = {}
229+
for arg in values:
230+
try:
231+
key, value_string = arg.split("=")
232+
value = int(value_string)
233+
timing_dict[key] = value
234+
except ValueError:
235+
raise argparse.ArgumentError(
236+
self, f"Invalid timing argument: {arg}"
237+
) from None
238+
239+
if not (timing := _dict2timing(timing_dict)):
240+
err_msg = "Invalid --timing argument. Incomplete parameters."
241+
raise argparse.ArgumentError(self, err_msg)
242+
243+
setattr(namespace, self.dest, timing)
244+
print(timing)
245+
246+
247+
class _BusKwargsAction(argparse.Action):
248+
def __call__(
249+
self,
250+
parser: argparse.ArgumentParser,
251+
namespace: argparse.Namespace,
252+
values: Union[str, Sequence[Any], None],
253+
option_string: Optional[str] = None,
254+
) -> None:
255+
if not isinstance(values, list):
256+
raise argparse.ArgumentError(self, "Invalid --bus-kwargs argument")
257+
258+
bus_kwargs: dict[str, Union[str, int, float, bool]] = {}
259+
260+
for arg in values:
261+
try:
262+
match = re.match(
263+
r"^(?P<name>[_a-zA-Z][_a-zA-Z0-9]*)=(?P<value>\S*?)$",
264+
arg,
265+
)
266+
if not match:
267+
raise ValueError
268+
key = match["name"].replace("-", "_")
269+
string_val = match["value"]
270+
bus_kwargs[key] = cast_from_string(string_val)
271+
except ValueError:
272+
raise argparse.ArgumentError(
273+
self,
274+
f"Unable to parse bus keyword argument '{arg}'",
275+
) from None
276+
277+
setattr(namespace, self.dest, bus_kwargs)
278+
279+
280+
def _add_extra_args(
281+
parser: Union[argparse.ArgumentParser, argparse._ArgumentGroup],
282+
) -> None:
283+
parser.add_argument(
284+
"extra_args",
285+
nargs=argparse.REMAINDER,
286+
help="The remaining arguments will be used for logger/player initialisation. "
287+
"For example, `can_logger -i virtual -c test -f logfile.blf --compression-level=9` "
288+
"passes the keyword argument `compression_level=9` to the BlfWriter.",
289+
)
290+
291+
292+
def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs:
293+
for arg in unknown_args:
294+
if not re.match(r"^--[a-zA-Z][a-zA-Z0-9\-]*=\S*?$", arg):
295+
raise ValueError(f"Parsing argument {arg} failed")
296+
297+
def _split_arg(_arg: str) -> tuple[str, str]:
298+
left, right = _arg.split("=", 1)
299+
return left.lstrip("-").replace("-", "_"), right
300+
301+
args: dict[str, Union[str, int, float, bool]] = {}
302+
for key, string_val in map(_split_arg, unknown_args):
303+
args[key] = cast_from_string(string_val)
304+
return args
305+
306+
307+
def _set_logging_level_from_namespace(namespace: argparse.Namespace) -> None:
308+
if "verbosity" in namespace:
309+
logging_level_names = [
310+
"critical",
311+
"error",
312+
"warning",
313+
"info",
314+
"debug",
315+
"subdebug",
316+
]
317+
can.set_logging_level(logging_level_names[min(5, namespace.verbosity)])

0 commit comments

Comments
 (0)