Skip to content

CANtact Support #853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions can/interfaces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"canalystii": ("can.interfaces.canalystii", "CANalystIIBus"),
"systec": ("can.interfaces.systec", "UcanBus"),
"seeedstudio": ("can.interfaces.seeedstudio", "SeeedBus"),
"cantact": ("can.interfaces.cantact", "CantactBus"),
}

BACKENDS.update(
Expand Down
152 changes: 152 additions & 0 deletions can/interfaces/cantact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""
Interface for CANtact devices from Linklayer Labs
"""

import time
import logging
from unittest.mock import Mock

from can import BusABC, Message

logger = logging.getLogger(__name__)

try:
import cantact
except ImportError:
logger.warning(
"The CANtact module is not installed. Install it using `python3 -m pip install cantact`"
)


class CantactBus(BusABC):
"""CANtact interface"""

@staticmethod
def _detect_available_configs():
try:
interface = cantact.Interface()
except NameError:
# couldn't import cantact, so no configurations are available
return []

channels = []
for i in range(0, interface.channel_count()):
channels.append({"interface": "cantact", "channel": "ch:%d" % i})
return channels

def __init__(
self,
channel,
bitrate=500000,
poll_interval=0.01,
monitor=False,
bit_timing=None,
_testing=False,
**kwargs
):
"""
:param int channel:
Channel number (zero indexed, labeled on multi-channel devices)
:param int bitrate:
Bitrate in bits/s
:param bool monitor:
If true, operate in listen-only monitoring mode
:param BitTiming bit_timing
Optional BitTiming to use for custom bit timing setting. Overrides bitrate if not None.
"""

if _testing:
self.interface = MockInterface()
else:
self.interface = cantact.Interface()

self.channel = int(channel)
self.channel_info = "CANtact: ch:%s" % channel

# configure the interface
if bit_timing is None:
# use bitrate
self.interface.set_bitrate(int(channel), int(bitrate))
else:
# use custom bit timing
self.interface.set_bit_timing(
int(channel),
int(bit_timing.brp),
int(bit_timing.tseg1),
int(bit_timing.tseg2),
int(bit_timing.sjw),
)
self.interface.set_enabled(int(channel), True)
self.interface.set_monitor(int(channel), monitor)
self.interface.start()

super().__init__(
channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs
)

def _recv_internal(self, timeout):
frame = self.interface.recv(int(timeout * 1000))
if frame is None:
# timeout occured
return None, False

msg = Message(
arbitration_id=frame["id"],
is_extended_id=frame["extended"],
timestamp=frame["timestamp"],
is_remote_frame=frame["rtr"],
dlc=frame["dlc"],
data=frame["data"][: frame["dlc"]],
channel=frame["channel"],
is_rx=(not frame["loopback"]), # received if not loopback frame
)
return msg, False

def send(self, msg, timeout=None):
self.interface.send(
self.channel,
msg.arbitration_id,
bool(msg.is_extended_id),
bool(msg.is_remote_frame),
msg.dlc,
msg.data,
)

def shutdown(self):
self.interface.stop()


def mock_recv(timeout):
if timeout > 0:
frame = {}
frame["id"] = 0x123
frame["extended"] = False
frame["timestamp"] = time.time()
frame["loopback"] = False
frame["rtr"] = False
frame["dlc"] = 8
frame["data"] = [1, 2, 3, 4, 5, 6, 7, 8]
frame["channel"] = 0
return frame
else:
# simulate timeout when timeout = 0
return None


class MockInterface:
"""
Mock interface to replace real interface when testing.
This allows for tests to run without actual hardware.
"""

start = Mock()
set_bitrate = Mock()
set_bit_timing = Mock()
set_enabled = Mock()
set_monitor = Mock()
start = Mock()
stop = Mock()
send = Mock()
channel_count = Mock(return_value=1)

recv = Mock(side_effect=mock_recv)
14 changes: 14 additions & 0 deletions doc/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ To install ``python-can`` using the XL Driver Library as the backend:

3. Use Vector Hardware Configuration to assign a channel to your application.

CANtact
~~~~~~~

CANtact is supported on Linux, Windows, and macOS.
To install ``python-can`` using the CANtact driver backend:

``python3 -m pip install "python-can[cantact]"``

If ``python-can`` is already installed, the CANtact backend can be installed seperately:

``python3 -m pip install cantact``

Additional CANtact documentation is available at https://cantact.io.

Installing python-can in development mode
-----------------------------------------

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"seeedstudio": ["pyserial>=3.0"],
"serial": ["pyserial~=3.0"],
"neovi": ["python-ics>=2.12"],
"cantact": ["cantact>=0.0.7"],
}

setup(
Expand Down
1 change: 1 addition & 0 deletions test/simplecyclic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def test_stopping_perodic_tasks(self):

bus.shutdown()

@unittest.skipIf(IS_CI, "fails randomly when run on CI server")
def test_thread_based_cyclic_send_task(self):
bus = can.ThreadSafeBus(bustype="virtual")
msg = can.Message(
Expand Down
64 changes: 64 additions & 0 deletions test/test_cantact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python
# coding: utf-8

"""
Tests for CANtact interfaces
"""

import time
import logging
import unittest
from unittest.mock import Mock, patch

import pytest

import can
from can.interfaces import cantact


class CantactTest(unittest.TestCase):
def test_bus_creation(self):
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
self.assertIsInstance(bus, cantact.CantactBus)
cantact.MockInterface.set_bitrate.assert_called()
cantact.MockInterface.set_bit_timing.assert_not_called()
cantact.MockInterface.set_enabled.assert_called()
cantact.MockInterface.set_monitor.assert_called()
cantact.MockInterface.start.assert_called()

def test_bus_creation_bittiming(self):
cantact.MockInterface.set_bitrate.reset_mock()

bt = can.BitTiming(tseg1=13, tseg2=2, brp=6, sjw=1)
bus = can.Bus(channel=0, bustype="cantact", bit_timing=bt, _testing=True)
self.assertIsInstance(bus, cantact.CantactBus)
cantact.MockInterface.set_bitrate.assert_not_called()
cantact.MockInterface.set_bit_timing.assert_called()
cantact.MockInterface.set_enabled.assert_called()
cantact.MockInterface.set_monitor.assert_called()
cantact.MockInterface.start.assert_called()

def test_transmit(self):
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
msg = can.Message(
arbitration_id=0xC0FFEF, data=[1, 2, 3, 4, 5, 6, 7, 8], is_extended_id=True
)
bus.send(msg)
cantact.MockInterface.send.assert_called()

def test_recv(self):
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
frame = bus.recv(timeout=0.5)
cantact.MockInterface.recv.assert_called()
self.assertIsInstance(frame, can.Message)

def test_recv_timeout(self):
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
frame = bus.recv(timeout=0.0)
cantact.MockInterface.recv.assert_called()
self.assertIsNone(frame)

def test_shutdown(self):
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
bus.shutdown()
cantact.MockInterface.stop.assert_called()