Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep Track of Active Notifiers. Make Notifier usable as ContextManager. #1890

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 165 additions & 19 deletions can/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,21 @@
import logging
import threading
import time
from typing import Any, Awaitable, Callable, Iterable, List, Optional, Union
from contextlib import AbstractContextManager
from types import TracebackType
from typing import (
Any,
Awaitable,
Callable,
Final,
Iterable,
List,
NamedTuple,
Optional,
Tuple,
Type,
Union,
)

from can.bus import BusABC
from can.listener import Listener
Expand All @@ -18,7 +32,85 @@
MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]]


class Notifier:
class _BusNotifierPair(NamedTuple):
bus: "BusABC"
notifier: "Notifier"


class _NotifierRegistry:
"""A registry to manage the association between CAN buses and Notifiers.

This class ensures that a bus is not added to multiple active Notifiers.
"""

def __init__(self) -> None:
"""Initialize the registry with an empty list of bus-notifier pairs and a threading lock."""
self.pairs: List[_BusNotifierPair] = []
self.lock = threading.Lock()

def register(self, bus: BusABC, notifier: "Notifier") -> None:
"""Register a bus and its associated notifier.

Ensures that a bus is not added to multiple active :class:`~can.Notifier` instances.

:param bus:
The CAN bus to register.
:param notifier:
The :class:`~can.Notifier` instance associated with the bus.
:raises ValueError:
If the bus is already assigned to an active Notifier.
"""
with self.lock:
for pair in self.pairs:
if bus is pair.bus and not pair.notifier.stopped:
raise ValueError(
"A bus can not be added to multiple active Notifier instances."
)
self.pairs.append(_BusNotifierPair(bus, notifier))

def unregister(self, bus: BusABC, notifier: "Notifier") -> None:
"""Unregister a bus and its associated notifier.

Removes the bus-notifier pair from the registry.

:param bus:
The CAN bus to unregister.
:param notifier:
The :class:`~can.Notifier` instance associated with the bus.
"""
with self.lock:
registered_pairs_to_remove: List[_BusNotifierPair] = []
for pair in self.pairs:
if pair.bus is bus and pair.notifier is notifier:
registered_pairs_to_remove.append(pair)
for pair in registered_pairs_to_remove:
self.pairs.remove(pair)

def find_instances(self, bus: BusABC) -> Tuple["Notifier", ...]:
"""Find the :class:`~can.Notifier` instances associated with a given CAN bus.

This method searches the registry for the :class:`~can.Notifier`
that is linked to the specified bus. If the bus is found, the
corresponding :class:`~can.Notifier` instances are returned. If the bus is not
found in the registry, an empty tuple is returned.

:param bus:
The CAN bus for which to find the associated :class:`~can.Notifier` .
:return:
A tuple of :class:`~can.Notifier` instances associated with the given bus.
"""
instance_list = []
with self.lock:
for pair in self.pairs:
if bus is pair.bus:
instance_list.append(pair.notifier)
return tuple(instance_list)


class Notifier(AbstractContextManager):

_registry: Final = _NotifierRegistry()

def __init__(
self,
bus: Union[BusABC, List[BusABC]],
Expand All @@ -32,69 +124,89 @@ def __init__(

.. Note::

Remember to call `stop()` after all messages are received as
Remember to call :meth:`~can.Notifier.stop` after all messages are received as
many listeners carry out flush operations to persist data.


:param bus: A :ref:`bus` or a list of buses to listen to.
:param bus:
A :ref:`bus` or a list of buses to consume messages from.
:param listeners:
An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message`
and return nothing.
:param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`.
:param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in.
:param timeout:
An optional maximum number of seconds to wait for any :class:`~can.Message`.
:param loop:
An :mod:`asyncio` event loop to schedule the ``listeners`` in.
:raises ValueError:
If a passed in *bus* is already assigned to an active :class:`~can.Notifier`.
"""
self.listeners: List[MessageRecipient] = list(listeners)
self.bus = bus
self._bus_list: List[BusABC] = []
self.timeout = timeout
self._loop = loop

#: Exception raised in thread
self.exception: Optional[Exception] = None

self._running = True
self._stopped = False
self._lock = threading.Lock()

self._readers: List[Union[int, threading.Thread]] = []
buses = self.bus if isinstance(self.bus, list) else [self.bus]
for each_bus in buses:
_bus_list: List[BusABC] = bus if isinstance(bus, list) else [bus]
for each_bus in _bus_list:
self.add_bus(each_bus)

@property
def bus(self) -> Union[BusABC, Tuple["BusABC", ...]]:
"""Return the associated bus or a tuple of buses."""
if len(self._bus_list) == 1:
return self._bus_list[0]
return tuple(self._bus_list)

def add_bus(self, bus: BusABC) -> None:
"""Add a bus for notification.

:param bus:
CAN bus instance.
:raises ValueError:
If the *bus* is already assigned to an active :class:`~can.Notifier`.
"""
reader: int = -1
# add bus to notifier registry
Notifier._registry.register(bus, self)

# add bus to internal bus list
self._bus_list.append(bus)

file_descriptor: int = -1
try:
reader = bus.fileno()
file_descriptor = bus.fileno()
zariiii9003 marked this conversation as resolved.
Show resolved Hide resolved
except NotImplementedError:
# Bus doesn't support fileno, we fall back to thread based reader
pass

if self._loop is not None and reader >= 0:
if self._loop is not None and file_descriptor >= 0:
# Use bus file descriptor to watch for messages
self._loop.add_reader(reader, self._on_message_available, bus)
self._readers.append(reader)
self._loop.add_reader(file_descriptor, self._on_message_available, bus)
self._readers.append(file_descriptor)
else:
reader_thread = threading.Thread(
target=self._rx_thread,
args=(bus,),
name=f'can.notifier for bus "{bus.channel_info}"',
name=f'{self.__class__.__qualname__} for bus "{bus.channel_info}"',
)
reader_thread.daemon = True
reader_thread.start()
self._readers.append(reader_thread)

def stop(self, timeout: float = 5) -> None:
def stop(self, timeout: float = 5.0) -> None:
"""Stop notifying Listeners when new :class:`~can.Message` objects arrive
and call :meth:`~can.Listener.stop` on each Listener.

:param timeout:
Max time in seconds to wait for receive threads to finish.
Should be longer than timeout given at instantiation.
"""
self._running = False
self._stopped = True
end_time = time.time() + timeout
for reader in self._readers:
if isinstance(reader, threading.Thread):
Expand All @@ -108,6 +220,10 @@ def stop(self, timeout: float = 5) -> None:
if hasattr(listener, "stop"):
listener.stop()

# remove bus from registry
for bus in self._bus_list:
Notifier._registry.unregister(bus, self)

def _rx_thread(self, bus: BusABC) -> None:
# determine message handling callable early, not inside while loop
if self._loop:
Expand All @@ -118,7 +234,7 @@ def _rx_thread(self, bus: BusABC) -> None:
else:
handle_message = self._on_message_received

while self._running:
while not self._stopped:
try:
if msg := bus.recv(self.timeout):
with self._lock:
Expand Down Expand Up @@ -183,3 +299,33 @@ def remove_listener(self, listener: MessageRecipient) -> None:
:raises ValueError: if `listener` was never added to this notifier
"""
self.listeners.remove(listener)

@property
def stopped(self) -> bool:
"""Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`."""
return self._stopped

@staticmethod
def find_instances(bus: BusABC) -> Tuple["Notifier", ...]:
"""Find :class:`~can.Notifier` instances associated with a given CAN bus.

This method searches the registry for the :class:`~can.Notifier`
that is linked to the specified bus. If the bus is found, the
corresponding :class:`~can.Notifier` instances are returned. If the bus is not
found in the registry, an empty tuple is returned.

:param bus:
The CAN bus for which to find the associated :class:`~can.Notifier` .
:return:
A tuple of :class:`~can.Notifier` instances associated with the given bus.
"""
return Notifier._registry.find_instances(bus)

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
if not self._stopped:
self.stop()
42 changes: 20 additions & 22 deletions examples/asyncio_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
"""

import asyncio
from typing import List
from typing import TYPE_CHECKING, List

import can
from can.notifier import MessageRecipient

if TYPE_CHECKING:
from can.notifier import MessageRecipient


def print_message(msg: can.Message) -> None:
Expand All @@ -31,26 +33,22 @@ async def main() -> None:
logger, # Regular Listener object
]
# Create Notifier with an explicit loop to use for scheduling of callbacks
loop = asyncio.get_running_loop()
notifier = can.Notifier(bus, listeners, loop=loop)
# Start sending first message
bus.send(can.Message(arbitration_id=0))

print("Bouncing 10 messages...")
for _ in range(10):
# Wait for next message from AsyncBufferedReader
msg = await reader.get_message()
# Delay response
await asyncio.sleep(0.5)
msg.arbitration_id += 1
bus.send(msg)

# Wait for last message to arrive
await reader.get_message()
print("Done!")

# Clean-up
notifier.stop()
with can.Notifier(bus, listeners, loop=asyncio.get_running_loop()):
# Start sending first message
bus.send(can.Message(arbitration_id=0))

print("Bouncing 10 messages...")
for _ in range(10):
# Wait for next message from AsyncBufferedReader
msg = await reader.get_message()
# Delay response
await asyncio.sleep(0.5)
msg.arbitration_id += 1
bus.send(msg)

# Wait for last message to arrive
await reader.get_message()
print("Done!")


if __name__ == "__main__":
Expand Down
5 changes: 2 additions & 3 deletions examples/cyclic_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,5 @@ def compute_xbr_checksum(message: can.Message, counter: int) -> int:

if __name__ == "__main__":
with can.Bus(channel=0, interface="virtual", receive_own_messages=True) as _bus:
notifier = can.Notifier(bus=_bus, listeners=[print])
cyclic_checksum_send(_bus)
notifier.stop()
with can.Notifier(bus=_bus, listeners=[print]):
cyclic_checksum_send(_bus)
15 changes: 7 additions & 8 deletions examples/print_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
def main():
with can.Bus(interface="virtual", receive_own_messages=True) as bus:
print_listener = can.Printer()
notifier = can.Notifier(bus, [print_listener])

bus.send(can.Message(arbitration_id=1, is_extended_id=True))
bus.send(can.Message(arbitration_id=2, is_extended_id=True))
bus.send(can.Message(arbitration_id=1, is_extended_id=False))

time.sleep(1.0)
notifier.stop()
with can.Notifier(bus, listeners=[print_listener]):
# using Notifier as a context manager automatically calls `Notifier.stop()`
# at the end of the `with` block
bus.send(can.Message(arbitration_id=1, is_extended_id=True))
bus.send(can.Message(arbitration_id=2, is_extended_id=True))
bus.send(can.Message(arbitration_id=1, is_extended_id=False))
time.sleep(1.0)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion examples/send_multiple.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
This demo creates multiple processes of producers to spam a socketcan bus.
"""

from time import sleep
from concurrent.futures import ProcessPoolExecutor
from time import sleep

import can

Expand Down
2 changes: 1 addition & 1 deletion examples/serial_com.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
com0com: http://com0com.sourceforge.net/
"""

import time
import threading
import time

import can

Expand Down
13 changes: 5 additions & 8 deletions examples/vcan_filtered.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ def main():
# print all incoming messages, which includes the ones sent,
# since we set receive_own_messages to True
# assign to some variable so it does not garbage collected
notifier = can.Notifier(bus, [can.Printer()]) # pylint: disable=unused-variable

bus.send(can.Message(arbitration_id=1, is_extended_id=True))
bus.send(can.Message(arbitration_id=2, is_extended_id=True))
bus.send(can.Message(arbitration_id=1, is_extended_id=False))

time.sleep(1.0)
notifier.stop()
with can.Notifier(bus, [can.Printer()]): # pylint: disable=unused-variable
bus.send(can.Message(arbitration_id=1, is_extended_id=True))
bus.send(can.Message(arbitration_id=2, is_extended_id=True))
bus.send(can.Message(arbitration_id=1, is_extended_id=False))
time.sleep(1.0)


if __name__ == "__main__":
Expand Down
Loading
Loading