diff --git a/pyproject.toml b/pyproject.toml index 37131b99..370230c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,12 +26,11 @@ exclude = ["tests", "tests.*"] [project.optional-dependencies] testing = [ - "pytest>=7.1.2", - "pytest-asyncio>=0.19.0", + "pytest>=7.3.1", + "pytest-asyncio>=0.21.0", "pytest-timeout>=2.1.0", - "pytest-mock>=3.8.2", - "pytest-cov>=3.0.0", - "coveralls", + "pytest-mock>=3.10.0", + "pytest-cov>=4.1.0", ] [tool.setuptools-git-versioning] diff --git a/tests/application/test_requests.py b/tests/application/test_requests.py index 555bd79f..1e9ba5a7 100644 --- a/tests/application/test_requests.py +++ b/tests/application/test_requests.py @@ -534,6 +534,9 @@ def set_route_discovered(req): await was_route_discovered await zdo_req + # 6 accounts for the loopback requests + assert sum(c.value for c in app.state.counters["Retry_NONE"].values()) == 6 + 1 + await app.shutdown() @@ -602,6 +605,9 @@ def set_route_discovered(req): ) await was_route_discovered + assert ( + sum(c.value for c in app.state.counters["Retry_RouteDiscovery"].values()) == 1 + ) await app.shutdown() @@ -666,6 +672,7 @@ def data_confirm_replier(req): ) assert was_ieee_addr_used + assert sum(c.value for c in app.state.counters["Retry_IEEEAddress"].values()) == 1 await app.shutdown() diff --git a/tests/conftest.py b/tests/conftest.py index 530e9dba..1a2f730f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,7 @@ +import gc +import sys import json +import typing import asyncio import inspect import logging @@ -7,6 +10,7 @@ import pytest import zigpy.types +import zigpy.config import zigpy.device try: @@ -42,6 +46,46 @@ def pytest_collection_modifyitems(session, config, items): item.add_marker(pytest.mark.filterwarnings("error::RuntimeWarning")) +@pytest.hookimpl(trylast=True) +def pytest_fixture_post_finalizer(fixturedef, request) -> None: + """Called after fixture teardown""" + if fixturedef.argname != "event_loop": + return + + policy = asyncio.get_event_loop_policy() + try: + loop = policy.get_event_loop() + except RuntimeError: + loop = None + if loop is not None: + # Cleanup code based on the implementation of asyncio.run() + try: + if not loop.is_closed(): + asyncio.runners._cancel_all_tasks(loop) # type: ignore[attr-defined] + loop.run_until_complete(loop.shutdown_asyncgens()) + if sys.version_info >= (3, 9): + loop.run_until_complete(loop.shutdown_default_executor()) + finally: + loop.close() + new_loop = policy.new_event_loop() # Replace existing event loop + # Ensure subsequent calls to get_event_loop() succeed + policy.set_event_loop(new_loop) + + +@pytest.fixture +def event_loop( + request: pytest.FixtureRequest, +) -> typing.Iterator[asyncio.AbstractEventLoop]: + """Create an instance of the default event loop for each test case.""" + yield asyncio.get_event_loop_policy().new_event_loop() + # Call the garbage collector to trigger ResourceWarning's as soon + # as possible (these are triggered in various __del__ methods). + # Without this, resources opened in one test can fail other tests + # when the warning is generated. + gc.collect() + # Event loop cleanup handled by pytest_fixture_post_finalizer + + class ForwardingSerialTransport: """ Serial transport that hooks directly into a protocol @@ -86,7 +130,12 @@ def __repr__(self): def config_for_port_path(path): - return conf.CONFIG_SCHEMA({conf.CONF_DEVICE: {conf.CONF_DEVICE_PATH: path}}) + return conf.CONFIG_SCHEMA( + { + conf.CONF_DEVICE: {conf.CONF_DEVICE_PATH: path}, + zigpy.config.CONF_NWK_BACKUP_ENABLED: False, + } + ) @pytest.fixture diff --git a/zigpy_znp/zigbee/application.py b/zigpy_znp/zigbee/application.py index 898d9b02..1e519ed5 100644 --- a/zigpy_znp/zigbee/application.py +++ b/zigpy_znp/zigbee/application.py @@ -68,6 +68,14 @@ LOGGER = logging.getLogger(__name__) +class RetryMethod(t.enum_flag_uint8): + NONE = 0 + AssocRemove = 2 << 0 + RouteDiscovery = 2 << 1 + LastGoodRoute = 2 << 2 + IEEEAddress = 2 << 3 + + class ControllerApplication(zigpy.application.ControllerApplication): SCHEMA = conf.CONFIG_SCHEMA SCHEMA_DEVICE = conf.SCHEMA_DEVICE @@ -809,19 +817,19 @@ def _find_endpoint(self, dst_ep: int, profile: int, cluster: int) -> int: async def _send_request_raw( self, - dst_addr, - dst_ep, - src_ep, - profile, - cluster, - sequence, - options, - radius, - data, + dst_addr: t.AddrModeAddress, + dst_ep: int, + src_ep: int, + profile: int, + cluster: int, + sequence: int, + options: c.af.TransmitOptions, + radius: int, + data: bytes, *, - relays=None, - extended_timeout=False, - ): + relays: list[int] | None = None, + extended_timeout: bool = False, + ) -> None: """ Used by `request`/`mrequest`/`broadcast` to send a request. Picks the correct request sending mechanism and fixes endpoint information. @@ -922,9 +930,7 @@ async def _send_request_raw( if dst_ep == ZDO_ENDPOINT or dst_addr.mode == t.AddrMode.Broadcast: # Broadcasts and ZDO requests will not receive a confirmation - response = await self._znp.request( - request=request, RspStatus=t.Status.SUCCESS - ) + await self._znp.request(request=request, RspStatus=t.Status.SUCCESS) else: async with async_timeout.timeout( EXTENDED_DATA_CONFIRM_TIMEOUT @@ -956,8 +962,6 @@ async def _send_request_raw( response, ) - return response - @combine_concurrent_calls async def _discover_route(self, nwk: t.NWK) -> None: """ @@ -1006,18 +1010,15 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: dst_addr = t.AddrModeAddress.from_zigpy_type(packet.dst) - status = None - response = None + succeeded = False association = None force_relays = None if packet.source_route is not None: force_relays = packet.source_route - tried_assoc_remove = False - tried_route_discovery = False - tried_last_good_route = False - tried_ieee_address = False + retry_methods = RetryMethod.NONE + last_retry_method = RetryMethod.NONE # Don't release the concurrency-limiting semaphore until we are done trying. # There is no point in allowing requests to take turns getting buffer errors. @@ -1047,7 +1048,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: if route_status.Status != c.zdo.RoutingStatus.SUCCESS: await self._discover_route(dst_addr.address) - response = await self._send_request_raw( + await self._send_request_raw( dst_addr=dst_addr, dst_ep=packet.dst_ep, src_ep=packet.src_ep, @@ -1060,7 +1061,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: relays=force_relays, extended_timeout=packet.extended_timeout, ) - status = response.Status + succeeded = True break except InvalidCommandResponse as e: status = e.response.Status @@ -1078,23 +1079,27 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: or dst_addr.mode not in (t.AddrMode.NWK, t.AddrMode.IEEE) ): LOGGER.debug( - "Request failed (%s), retry attempt %s of %s", + "Request failed (%s), retry attempt %s of %s (%s)", e, attempt + 1, REQUEST_MAX_RETRIES, + retry_methods.name, ) await asyncio.sleep(3 * REQUEST_ERROR_RETRY_DELAY) continue # If we can't contact the device by forcing a specific route, - # there is not point in trying this more than once. - if tried_last_good_route and force_relays is not None: + # there is no point in trying this more than once. + if ( + retry_methods & RetryMethod.LastGoodRoute + and force_relays is not None + ): force_relays = None # If we fail to contact the device with its IEEE address, don't # try again. if ( - tried_ieee_address + retry_methods & RetryMethod.IEEEAddress and dst_addr.mode == t.AddrMode.IEEE and device is not None ): @@ -1111,7 +1116,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: status == t.Status.MAC_TRANSACTION_EXPIRED and device is not None and association is None - and not tried_assoc_remove + and not retry_methods & RetryMethod.AssocRemove and self._znp.version >= 3.30 ): association = await self._znp.request( @@ -1129,7 +1134,8 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: await self._znp.request( c.UTIL.AssocRemove.Req(IEEE=device.ieee) ) - tried_assoc_remove = True + retry_methods |= RetryMethod.AssocRemove + last_retry_method = RetryMethod.AssocRemove # Route discovery must be performed right after await self._discover_route(device.nwk) @@ -1138,39 +1144,46 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: "The UTIL.AssocRemove command is available only" " in Z-Stack 3 releases built after 20201017" ) - elif not tried_last_good_route and device is not None: + elif ( + not retry_methods & RetryMethod.LastGoodRoute + and device is not None + ): # `ZDO.SrcRtgInd` callbacks tell us the last path taken by # messages from the device back to the coordinator. Sending # packets backwards via this same route may work. force_relays = (device.relays or [])[::-1] - tried_last_good_route = True + retry_methods |= RetryMethod.LastGoodRoute + last_retry_method = RetryMethod.LastGoodRoute elif ( - not tried_route_discovery + not retry_methods & RetryMethod.RouteDiscovery and dst_addr.mode == t.AddrMode.NWK ): # If that doesn't work, try re-discovering the route. # While we can in theory poll and wait until it is fixed, # letting the retry mechanism deal with it simpler. await self._discover_route(dst_addr.address) - tried_route_discovery = True + retry_methods |= RetryMethod.RouteDiscovery + last_retry_method = RetryMethod.RouteDiscovery elif ( - not tried_ieee_address + not retry_methods & RetryMethod.IEEEAddress and device is not None and dst_addr.mode == t.AddrMode.NWK ): # Try using the device's IEEE address instead of its NWK. # If it works, the NWK will be updated when relays arrive. - tried_ieee_address = True + retry_methods |= RetryMethod.IEEEAddress + last_retry_method = RetryMethod.IEEEAddress dst_addr = t.AddrModeAddress( mode=t.AddrMode.IEEE, address=device.ieee, ) LOGGER.debug( - "Request failed (%s), retry attempt %s of %s", + "Request failed (%s), retry attempt %s of %s (%s)", e, attempt + 1, REQUEST_MAX_RETRIES, + retry_methods.name, ) # We've tried everything already so at this point just wait @@ -1181,11 +1194,15 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: f" {status!r}", status=status, ) + + self.state.counters[f"Retry_{last_retry_method.name}"][ + attempt + ].increment() finally: # We *must* re-add the device association if we previously removed it but # the request still failed. Otherwise, it may be a direct child and we will # not be able to find it again. - if tried_assoc_remove and response is None: + if not succeeded and retry_methods & RetryMethod.AssocRemove: await self._znp.request( c.UTIL.AssocAdd.Req( NWK=association.Device.shortAddr,