Skip to content

Commit

Permalink
implement Notifier.find_instance(bus)
Browse files Browse the repository at this point in the history
  • Loading branch information
zariiii9003 committed Nov 23, 2024
1 parent 6fc0d7d commit e35610a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
43 changes: 40 additions & 3 deletions can/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ def __init__(self) -> None:
def register(self, bus: BusABC, notifier: "Notifier") -> None:
"""Register a bus and its associated notifier.
Ensures that a bus is not added to multiple active Notifier instances.
Ensures that a bus is not added to multiple active :class:`~can.Notifier` instances.
:param bus:
The CAN bus to register.
:param notifier:
The Notifier instance associated with the bus.
The :class:`~can.Notifier` instance associated with the bus.
:raises ValueError:
If the bus is already assigned to an active Notifier.
"""
Expand All @@ -75,7 +75,7 @@ def unregister(self, bus: BusABC, notifier: "Notifier") -> None:
:param bus:
The CAN bus to unregister.
:param notifier:
The Notifier instance associated with the bus.
The :class:`~can.Notifier` instance associated with the bus.
"""
with self.lock:
registered_pairs_to_remove: List[_BusNotifierPair] = []
Expand All @@ -85,6 +85,26 @@ def unregister(self, bus: BusABC, notifier: "Notifier") -> None:
for pair in registered_pairs_to_remove:
self.pairs.remove(pair)

def find_instance(self, bus: BusABC) -> Optional["Notifier"]:
"""Find the :class:`~can.Notifier` instance associated with a given CAN bus.
This method searches the registry for the :class:`~can.Notifier`
that is linked to the specified bus. If the bus is found, the
corresponding :class:`~can.Notifier` instance is returned. If the bus is not
found in the registry, `None` is returned.
:param bus:
The CAN bus for which to find the associated :class:`~can.Notifier` .
:return:
The :class:`~can.Notifier` instance associated with the given bus,
or `None` if no such association exists.
"""
with self.lock:
for pair in self.pairs:
if bus is pair.bus:
return pair.notifier
return None


class Notifier(AbstractContextManager):

Expand Down Expand Up @@ -274,6 +294,23 @@ def stopped(self) -> bool:
"""Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`."""
return self._stopped

@classmethod
def find_instance(cls, bus: BusABC) -> Optional["Notifier"]:
"""Find the :class:`~can.Notifier` instance associated with a given CAN bus.
This method searches the registry for the :class:`~can.Notifier`
that is linked to the specified bus. If the bus is found, the
corresponding :class:`~can.Notifier` instance is returned. If the bus is not
found in the registry, `None` is returned.
:param bus:
The CAN bus for which to find the associated :class:`~can.Notifier` .
:return:
The :class:`~can.Notifier` instance associated with the given bus,
or `None` if no such association exists.
"""
return cls._registry.find_instance(bus)

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
Expand Down
13 changes: 11 additions & 2 deletions test/notifier_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,24 @@ def test_context_manager(self):
def test_registry(self):
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
reader = can.BufferedReader()
with can.Notifier(bus, [reader], 0.1):
with can.Notifier(bus, [reader], 0.1) as notifier:
# creating a second notifier for the same bus must fail
self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1)

# find_instance must return the existing instance
self.assertIs(can.Notifier.find_instance(bus), notifier)

# Notifier is stopped, find instance must return None
self.assertIsNone(can.Notifier.find_instance(bus))

# now the first notifier is stopped, a new notifier can be created without error:
with can.Notifier(bus, [reader], 0.1):
with can.Notifier(bus, [reader], 0.1) as notifier:
# the next notifier call should fail again since there is an active notifier already
self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1)

# find_instance must return the existing instance
self.assertIs(can.Notifier.find_instance(bus), notifier)


class AsyncNotifierTest(unittest.TestCase):
def test_asyncio_notifier(self):
Expand Down

0 comments on commit e35610a

Please sign in to comment.