Skip to content

Commit

Permalink
Keep track of what retry method actually works (#216)
Browse files Browse the repository at this point in the history
* For debug analytics, keep track of what retry method actually works

* Bump testing dependencies

* Add a global fixture to clean up asyncio tasks between tests

* Log the name of the retry methods attempted, not the numerical value
  • Loading branch information
puddly authored Jun 9, 2023
1 parent c4db8d1 commit e3a2ba5
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 45 deletions.
9 changes: 4 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ exclude = ["tests", "tests.*"]

[project.optional-dependencies]
testing = [
"pytest>=7.1.2",
"pytest-asyncio>=0.19.0",
"pytest>=7.3.1",
"pytest-asyncio>=0.21.0",
"pytest-timeout>=2.1.0",
"pytest-mock>=3.8.2",
"pytest-cov>=3.0.0",
"coveralls",
"pytest-mock>=3.10.0",
"pytest-cov>=4.1.0",
]

[tool.setuptools-git-versioning]
Expand Down
7 changes: 7 additions & 0 deletions tests/application/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ def set_route_discovered(req):
await was_route_discovered
await zdo_req

# 6 accounts for the loopback requests
assert sum(c.value for c in app.state.counters["Retry_NONE"].values()) == 6 + 1

await app.shutdown()


Expand Down Expand Up @@ -602,6 +605,9 @@ def set_route_discovered(req):
)

await was_route_discovered
assert (
sum(c.value for c in app.state.counters["Retry_RouteDiscovery"].values()) == 1
)

await app.shutdown()

Expand Down Expand Up @@ -666,6 +672,7 @@ def data_confirm_replier(req):
)

assert was_ieee_addr_used
assert sum(c.value for c in app.state.counters["Retry_IEEEAddress"].values()) == 1

await app.shutdown()

Expand Down
51 changes: 50 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import gc
import sys
import json
import typing
import asyncio
import inspect
import logging
Expand All @@ -7,6 +10,7 @@

import pytest
import zigpy.types
import zigpy.config
import zigpy.device

try:
Expand Down Expand Up @@ -42,6 +46,46 @@ def pytest_collection_modifyitems(session, config, items):
item.add_marker(pytest.mark.filterwarnings("error::RuntimeWarning"))


@pytest.hookimpl(trylast=True)
def pytest_fixture_post_finalizer(fixturedef, request) -> None:
"""Called after fixture teardown"""
if fixturedef.argname != "event_loop":
return

policy = asyncio.get_event_loop_policy()
try:
loop = policy.get_event_loop()
except RuntimeError:
loop = None
if loop is not None:
# Cleanup code based on the implementation of asyncio.run()
try:
if not loop.is_closed():
asyncio.runners._cancel_all_tasks(loop) # type: ignore[attr-defined]
loop.run_until_complete(loop.shutdown_asyncgens())
if sys.version_info >= (3, 9):
loop.run_until_complete(loop.shutdown_default_executor())
finally:
loop.close()
new_loop = policy.new_event_loop() # Replace existing event loop
# Ensure subsequent calls to get_event_loop() succeed
policy.set_event_loop(new_loop)


@pytest.fixture
def event_loop(
request: pytest.FixtureRequest,
) -> typing.Iterator[asyncio.AbstractEventLoop]:
"""Create an instance of the default event loop for each test case."""
yield asyncio.get_event_loop_policy().new_event_loop()
# Call the garbage collector to trigger ResourceWarning's as soon
# as possible (these are triggered in various __del__ methods).
# Without this, resources opened in one test can fail other tests
# when the warning is generated.
gc.collect()
# Event loop cleanup handled by pytest_fixture_post_finalizer


class ForwardingSerialTransport:
"""
Serial transport that hooks directly into a protocol
Expand Down Expand Up @@ -86,7 +130,12 @@ def __repr__(self):


def config_for_port_path(path):
return conf.CONFIG_SCHEMA({conf.CONF_DEVICE: {conf.CONF_DEVICE_PATH: path}})
return conf.CONFIG_SCHEMA(
{
conf.CONF_DEVICE: {conf.CONF_DEVICE_PATH: path},
zigpy.config.CONF_NWK_BACKUP_ENABLED: False,
}
)


@pytest.fixture
Expand Down
95 changes: 56 additions & 39 deletions zigpy_znp/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@
LOGGER = logging.getLogger(__name__)


class RetryMethod(t.enum_flag_uint8):
NONE = 0
AssocRemove = 2 << 0
RouteDiscovery = 2 << 1
LastGoodRoute = 2 << 2
IEEEAddress = 2 << 3


class ControllerApplication(zigpy.application.ControllerApplication):
SCHEMA = conf.CONFIG_SCHEMA
SCHEMA_DEVICE = conf.SCHEMA_DEVICE
Expand Down Expand Up @@ -809,19 +817,19 @@ def _find_endpoint(self, dst_ep: int, profile: int, cluster: int) -> int:

async def _send_request_raw(
self,
dst_addr,
dst_ep,
src_ep,
profile,
cluster,
sequence,
options,
radius,
data,
dst_addr: t.AddrModeAddress,
dst_ep: int,
src_ep: int,
profile: int,
cluster: int,
sequence: int,
options: c.af.TransmitOptions,
radius: int,
data: bytes,
*,
relays=None,
extended_timeout=False,
):
relays: list[int] | None = None,
extended_timeout: bool = False,
) -> None:
"""
Used by `request`/`mrequest`/`broadcast` to send a request.
Picks the correct request sending mechanism and fixes endpoint information.
Expand Down Expand Up @@ -922,9 +930,7 @@ async def _send_request_raw(

if dst_ep == ZDO_ENDPOINT or dst_addr.mode == t.AddrMode.Broadcast:
# Broadcasts and ZDO requests will not receive a confirmation
response = await self._znp.request(
request=request, RspStatus=t.Status.SUCCESS
)
await self._znp.request(request=request, RspStatus=t.Status.SUCCESS)
else:
async with async_timeout.timeout(
EXTENDED_DATA_CONFIRM_TIMEOUT
Expand Down Expand Up @@ -956,8 +962,6 @@ async def _send_request_raw(
response,
)

return response

@combine_concurrent_calls
async def _discover_route(self, nwk: t.NWK) -> None:
"""
Expand Down Expand Up @@ -1006,18 +1010,15 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:

dst_addr = t.AddrModeAddress.from_zigpy_type(packet.dst)

status = None
response = None
succeeded = False
association = None
force_relays = None

if packet.source_route is not None:
force_relays = packet.source_route

tried_assoc_remove = False
tried_route_discovery = False
tried_last_good_route = False
tried_ieee_address = False
retry_methods = RetryMethod.NONE
last_retry_method = RetryMethod.NONE

# Don't release the concurrency-limiting semaphore until we are done trying.
# There is no point in allowing requests to take turns getting buffer errors.
Expand Down Expand Up @@ -1047,7 +1048,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
if route_status.Status != c.zdo.RoutingStatus.SUCCESS:
await self._discover_route(dst_addr.address)

response = await self._send_request_raw(
await self._send_request_raw(
dst_addr=dst_addr,
dst_ep=packet.dst_ep,
src_ep=packet.src_ep,
Expand All @@ -1060,7 +1061,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
relays=force_relays,
extended_timeout=packet.extended_timeout,
)
status = response.Status
succeeded = True
break
except InvalidCommandResponse as e:
status = e.response.Status
Expand All @@ -1078,23 +1079,27 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
or dst_addr.mode not in (t.AddrMode.NWK, t.AddrMode.IEEE)
):
LOGGER.debug(
"Request failed (%s), retry attempt %s of %s",
"Request failed (%s), retry attempt %s of %s (%s)",
e,
attempt + 1,
REQUEST_MAX_RETRIES,
retry_methods.name,
)
await asyncio.sleep(3 * REQUEST_ERROR_RETRY_DELAY)
continue

# If we can't contact the device by forcing a specific route,
# there is not point in trying this more than once.
if tried_last_good_route and force_relays is not None:
# there is no point in trying this more than once.
if (
retry_methods & RetryMethod.LastGoodRoute
and force_relays is not None
):
force_relays = None

# If we fail to contact the device with its IEEE address, don't
# try again.
if (
tried_ieee_address
retry_methods & RetryMethod.IEEEAddress
and dst_addr.mode == t.AddrMode.IEEE
and device is not None
):
Expand All @@ -1111,7 +1116,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
status == t.Status.MAC_TRANSACTION_EXPIRED
and device is not None
and association is None
and not tried_assoc_remove
and not retry_methods & RetryMethod.AssocRemove
and self._znp.version >= 3.30
):
association = await self._znp.request(
Expand All @@ -1129,7 +1134,8 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
await self._znp.request(
c.UTIL.AssocRemove.Req(IEEE=device.ieee)
)
tried_assoc_remove = True
retry_methods |= RetryMethod.AssocRemove
last_retry_method = RetryMethod.AssocRemove

# Route discovery must be performed right after
await self._discover_route(device.nwk)
Expand All @@ -1138,39 +1144,46 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
"The UTIL.AssocRemove command is available only"
" in Z-Stack 3 releases built after 20201017"
)
elif not tried_last_good_route and device is not None:
elif (
not retry_methods & RetryMethod.LastGoodRoute
and device is not None
):
# `ZDO.SrcRtgInd` callbacks tell us the last path taken by
# messages from the device back to the coordinator. Sending
# packets backwards via this same route may work.
force_relays = (device.relays or [])[::-1]
tried_last_good_route = True
retry_methods |= RetryMethod.LastGoodRoute
last_retry_method = RetryMethod.LastGoodRoute
elif (
not tried_route_discovery
not retry_methods & RetryMethod.RouteDiscovery
and dst_addr.mode == t.AddrMode.NWK
):
# If that doesn't work, try re-discovering the route.
# While we can in theory poll and wait until it is fixed,
# letting the retry mechanism deal with it simpler.
await self._discover_route(dst_addr.address)
tried_route_discovery = True
retry_methods |= RetryMethod.RouteDiscovery
last_retry_method = RetryMethod.RouteDiscovery
elif (
not tried_ieee_address
not retry_methods & RetryMethod.IEEEAddress
and device is not None
and dst_addr.mode == t.AddrMode.NWK
):
# Try using the device's IEEE address instead of its NWK.
# If it works, the NWK will be updated when relays arrive.
tried_ieee_address = True
retry_methods |= RetryMethod.IEEEAddress
last_retry_method = RetryMethod.IEEEAddress
dst_addr = t.AddrModeAddress(
mode=t.AddrMode.IEEE,
address=device.ieee,
)

LOGGER.debug(
"Request failed (%s), retry attempt %s of %s",
"Request failed (%s), retry attempt %s of %s (%s)",
e,
attempt + 1,
REQUEST_MAX_RETRIES,
retry_methods.name,
)

# We've tried everything already so at this point just wait
Expand All @@ -1181,11 +1194,15 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
f" {status!r}",
status=status,
)

self.state.counters[f"Retry_{last_retry_method.name}"][
attempt
].increment()
finally:
# We *must* re-add the device association if we previously removed it but
# the request still failed. Otherwise, it may be a direct child and we will
# not be able to find it again.
if tried_assoc_remove and response is None:
if not succeeded and retry_methods & RetryMethod.AssocRemove:
await self._znp.request(
c.UTIL.AssocAdd.Req(
NWK=association.Device.shortAddr,
Expand Down

0 comments on commit e3a2ba5

Please sign in to comment.