From 44430d9402d30c0f83b614f8327a2b1c52552d78 Mon Sep 17 00:00:00 2001 From: "David F. Mulcahey" Date: Sun, 10 Mar 2024 10:45:26 -0400 Subject: [PATCH] Initial restructuring (#3) --- zha/api.py | 115 -- zha/application/__init__.py | 0 zha/{core => application}/const.py | 0 zha/{core => application}/decorators.py | 0 zha/{core => application}/discovery.py | 8 +- zha/{ => application}/entity.py | 0 zha/{core => application}/gateway.py | 8 +- zha/{core => application}/helpers.py | 0 zha/application/platforms/__init__.py | 0 .../platforms}/alarm_control_panel.py | 0 .../platforms}/binary_sensor.py | 0 zha/{ => application/platforms}/button.py | 0 zha/{ => application/platforms}/climate.py | 0 zha/{ => application/platforms}/cover.py | 0 .../platforms}/device_tracker.py | 0 zha/{ => application/platforms}/fan.py | 0 zha/{ => application/platforms}/light.py | 0 zha/{ => application/platforms}/lock.py | 0 zha/{ => application/platforms}/number.py | 0 zha/{ => application/platforms}/select.py | 0 zha/{ => application/platforms}/sensor.py | 0 zha/{ => application/platforms}/siren.py | 0 zha/{ => application/platforms}/switch.py | 0 zha/{ => application/platforms}/update.py | 0 zha/{core => application}/registries.py | 38 +- zha/backup.py | 21 - zha/config_flow.py | 783 -------- zha/device_action.py | 232 --- zha/device_trigger.py | 112 -- zha/diagnostics.py | 175 -- zha/logbook.py | 83 - zha/radio_manager.py | 464 ----- zha/silabs_multiprotocol.py | 81 - zha/websocket_api.py | 1577 ----------------- zha/{core => zigbee}/__init__.py | 0 .../cluster_handlers/__init__.py | 0 .../cluster_handlers/closures.py | 0 .../cluster_handlers/general.py | 0 .../cluster_handlers/helpers.py | 0 .../cluster_handlers/homeautomation.py | 0 zha/{core => zigbee}/cluster_handlers/hvac.py | 0 .../cluster_handlers/lighting.py | 0 .../cluster_handlers/lightlink.py | 0 .../cluster_handlers/manufacturerspecific.py | 0 .../cluster_handlers/measurement.py | 0 .../cluster_handlers/protocol.py | 0 .../cluster_handlers/security.py | 0 .../cluster_handlers/smartenergy.py | 0 zha/{core => zigbee}/device.py | 0 zha/{core => zigbee}/endpoint.py | 0 zha/{core => zigbee}/group.py | 0 51 files changed, 27 insertions(+), 3670 deletions(-) delete mode 100644 zha/api.py create mode 100644 zha/application/__init__.py rename zha/{core => application}/const.py (100%) rename zha/{core => application}/decorators.py (100%) rename zha/{core => application}/discovery.py (99%) rename zha/{ => application}/entity.py (100%) rename zha/{core => application}/gateway.py (99%) rename zha/{core => application}/helpers.py (100%) create mode 100644 zha/application/platforms/__init__.py rename zha/{ => application/platforms}/alarm_control_panel.py (100%) rename zha/{ => application/platforms}/binary_sensor.py (100%) rename zha/{ => application/platforms}/button.py (100%) rename zha/{ => application/platforms}/climate.py (100%) rename zha/{ => application/platforms}/cover.py (100%) rename zha/{ => application/platforms}/device_tracker.py (100%) rename zha/{ => application/platforms}/fan.py (100%) rename zha/{ => application/platforms}/light.py (100%) rename zha/{ => application/platforms}/lock.py (100%) rename zha/{ => application/platforms}/number.py (100%) rename zha/{ => application/platforms}/select.py (100%) rename zha/{ => application/platforms}/sensor.py (100%) rename zha/{ => application/platforms}/siren.py (100%) rename zha/{ => application/platforms}/switch.py (100%) rename zha/{ => application/platforms}/update.py (100%) rename zha/{core => application}/registries.py (95%) delete mode 100644 zha/backup.py delete mode 100644 zha/config_flow.py delete mode 100644 zha/device_action.py delete mode 100644 zha/device_trigger.py delete mode 100644 zha/diagnostics.py delete mode 100644 zha/logbook.py delete mode 100644 zha/radio_manager.py delete mode 100644 zha/silabs_multiprotocol.py delete mode 100644 zha/websocket_api.py rename zha/{core => zigbee}/__init__.py (100%) rename zha/{core => zigbee}/cluster_handlers/__init__.py (100%) rename zha/{core => zigbee}/cluster_handlers/closures.py (100%) rename zha/{core => zigbee}/cluster_handlers/general.py (100%) rename zha/{core => zigbee}/cluster_handlers/helpers.py (100%) rename zha/{core => zigbee}/cluster_handlers/homeautomation.py (100%) rename zha/{core => zigbee}/cluster_handlers/hvac.py (100%) rename zha/{core => zigbee}/cluster_handlers/lighting.py (100%) rename zha/{core => zigbee}/cluster_handlers/lightlink.py (100%) rename zha/{core => zigbee}/cluster_handlers/manufacturerspecific.py (100%) rename zha/{core => zigbee}/cluster_handlers/measurement.py (100%) rename zha/{core => zigbee}/cluster_handlers/protocol.py (100%) rename zha/{core => zigbee}/cluster_handlers/security.py (100%) rename zha/{core => zigbee}/cluster_handlers/smartenergy.py (100%) rename zha/{core => zigbee}/device.py (100%) rename zha/{core => zigbee}/endpoint.py (100%) rename zha/{core => zigbee}/group.py (100%) diff --git a/zha/api.py b/zha/api.py deleted file mode 100644 index db0658eb6..000000000 --- a/zha/api.py +++ /dev/null @@ -1,115 +0,0 @@ -"""API for Zigbee Home Automation.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING, Literal - -from zigpy.backups import NetworkBackup -from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH -from zigpy.types import Channels -from zigpy.util import pick_optimal_channel - -from .core.const import CONF_RADIO_TYPE, DOMAIN, RadioType -from .core.helpers import get_zha_gateway -from .radio_manager import ZhaRadioManager - -if TYPE_CHECKING: - from homeassistant.config_entries import ConfigEntry - from homeassistant.core import HomeAssistant - - -def _get_config_entry(hass: HomeAssistant) -> ConfigEntry: - """Find the singleton ZHA config entry, if one exists.""" - - # If ZHA is already running, use its config entry - try: - zha_gateway = get_zha_gateway(hass) - except ValueError: - pass - else: - return zha_gateway.config_entry - - # Otherwise, find one - entries = hass.config_entries.async_entries(DOMAIN) - - if len(entries) != 1: - raise ValueError(f"Invalid number of ZHA config entries: {entries!r}") - - return entries[0] - - -def async_get_active_network_settings(hass: HomeAssistant) -> NetworkBackup: - """Get the network settings for the currently active ZHA network.""" - app = get_zha_gateway(hass).application_controller - - return NetworkBackup( - node_info=app.state.node_info, - network_info=app.state.network_info, - ) - - -async def async_get_last_network_settings( - hass: HomeAssistant, config_entry: ConfigEntry | None = None -) -> NetworkBackup | None: - """Get the network settings for the last-active ZHA network.""" - if config_entry is None: - config_entry = _get_config_entry(hass) - - radio_mgr = ZhaRadioManager.from_config_entry(hass, config_entry) - - async with radio_mgr.connect_zigpy_app() as app: - try: - settings = max(app.backups, key=lambda b: b.backup_time) - except ValueError: - settings = None - - return settings - - -async def async_get_network_settings( - hass: HomeAssistant, config_entry: ConfigEntry | None = None -) -> NetworkBackup | None: - """Get ZHA network settings, preferring the active settings if ZHA is running.""" - - try: - return async_get_active_network_settings(hass) - except ValueError: - return await async_get_last_network_settings(hass, config_entry) - - -def async_get_radio_type( - hass: HomeAssistant, config_entry: ConfigEntry | None = None -) -> RadioType: - """Get ZHA radio type.""" - if config_entry is None: - config_entry = _get_config_entry(hass) - - return RadioType[config_entry.data[CONF_RADIO_TYPE]] - - -def async_get_radio_path( - hass: HomeAssistant, config_entry: ConfigEntry | None = None -) -> str: - """Get ZHA radio path.""" - if config_entry is None: - config_entry = _get_config_entry(hass) - - return config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH] - - -async def async_change_channel( - hass: HomeAssistant, new_channel: int | Literal["auto"] -) -> None: - """Migrate the ZHA network to a new channel.""" - - app = get_zha_gateway(hass).application_controller - - if new_channel == "auto": - channel_energy = await app.energy_scan( - channels=Channels.ALL_CHANNELS, - duration_exp=4, - count=1, - ) - new_channel = pick_optimal_channel(channel_energy) - - await app.move_network_to_channel(new_channel) diff --git a/zha/application/__init__.py b/zha/application/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zha/core/const.py b/zha/application/const.py similarity index 100% rename from zha/core/const.py rename to zha/application/const.py diff --git a/zha/core/decorators.py b/zha/application/decorators.py similarity index 100% rename from zha/core/decorators.py rename to zha/application/decorators.py diff --git a/zha/core/discovery.py b/zha/application/discovery.py similarity index 99% rename from zha/core/discovery.py rename to zha/application/discovery.py index 948e0c176..33b1cb149 100644 --- a/zha/core/discovery.py +++ b/zha/application/discovery.py @@ -70,7 +70,7 @@ from .helpers import get_zha_data, get_zha_gateway if TYPE_CHECKING: - from ..entity import ZhaEntity + from ..application.entity import ZhaEntity from .device import ZHADevice from .endpoint import Endpoint from .group import ZHAGroup @@ -292,9 +292,9 @@ def discover_quirks_v2_entities(self, device: ZHADevice) -> None: not in cluster_handler.ZCL_INIT_ATTRS ): init_attrs = cluster_handler.ZCL_INIT_ATTRS.copy() - init_attrs[ - quirk_metadata.entity_metadata.attribute_name - ] = quirk_metadata.attribute_initialized_from_cache + init_attrs[quirk_metadata.entity_metadata.attribute_name] = ( + quirk_metadata.attribute_initialized_from_cache + ) cluster_handler.__dict__[zha_const.ZCL_INIT_ATTRS] = init_attrs endpoint.async_new_entity( diff --git a/zha/entity.py b/zha/application/entity.py similarity index 100% rename from zha/entity.py rename to zha/application/entity.py diff --git a/zha/core/gateway.py b/zha/application/gateway.py similarity index 99% rename from zha/core/gateway.py rename to zha/application/gateway.py index 3636776d2..50d90dd8c 100644 --- a/zha/core/gateway.py +++ b/zha/application/gateway.py @@ -92,7 +92,7 @@ if TYPE_CHECKING: from logging import Filter, LogRecord - from ..entity import ZhaEntity + from ..application.entity import ZhaEntity from .cluster_handlers import ClusterHandler _LogFilterType = Filter | Callable[[LogRecord], bool] @@ -132,9 +132,9 @@ def __init__( self._groups: dict[int, ZHAGroup] = {} self.application_controller: ControllerApplication = None self.coordinator_zha_device: ZHADevice = None # type: ignore[assignment] - self._device_registry: collections.defaultdict[ - EUI64, list[EntityReference] - ] = collections.defaultdict(list) + self._device_registry: collections.defaultdict[EUI64, list[EntityReference]] = ( + collections.defaultdict(list) + ) self._log_levels: dict[str, dict[str, int]] = { DEBUG_LEVEL_ORIGINAL: async_capture_log_levels(), DEBUG_LEVEL_CURRENT: async_capture_log_levels(), diff --git a/zha/core/helpers.py b/zha/application/helpers.py similarity index 100% rename from zha/core/helpers.py rename to zha/application/helpers.py diff --git a/zha/application/platforms/__init__.py b/zha/application/platforms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zha/alarm_control_panel.py b/zha/application/platforms/alarm_control_panel.py similarity index 100% rename from zha/alarm_control_panel.py rename to zha/application/platforms/alarm_control_panel.py diff --git a/zha/binary_sensor.py b/zha/application/platforms/binary_sensor.py similarity index 100% rename from zha/binary_sensor.py rename to zha/application/platforms/binary_sensor.py diff --git a/zha/button.py b/zha/application/platforms/button.py similarity index 100% rename from zha/button.py rename to zha/application/platforms/button.py diff --git a/zha/climate.py b/zha/application/platforms/climate.py similarity index 100% rename from zha/climate.py rename to zha/application/platforms/climate.py diff --git a/zha/cover.py b/zha/application/platforms/cover.py similarity index 100% rename from zha/cover.py rename to zha/application/platforms/cover.py diff --git a/zha/device_tracker.py b/zha/application/platforms/device_tracker.py similarity index 100% rename from zha/device_tracker.py rename to zha/application/platforms/device_tracker.py diff --git a/zha/fan.py b/zha/application/platforms/fan.py similarity index 100% rename from zha/fan.py rename to zha/application/platforms/fan.py diff --git a/zha/light.py b/zha/application/platforms/light.py similarity index 100% rename from zha/light.py rename to zha/application/platforms/light.py diff --git a/zha/lock.py b/zha/application/platforms/lock.py similarity index 100% rename from zha/lock.py rename to zha/application/platforms/lock.py diff --git a/zha/number.py b/zha/application/platforms/number.py similarity index 100% rename from zha/number.py rename to zha/application/platforms/number.py diff --git a/zha/select.py b/zha/application/platforms/select.py similarity index 100% rename from zha/select.py rename to zha/application/platforms/select.py diff --git a/zha/sensor.py b/zha/application/platforms/sensor.py similarity index 100% rename from zha/sensor.py rename to zha/application/platforms/sensor.py diff --git a/zha/siren.py b/zha/application/platforms/siren.py similarity index 100% rename from zha/siren.py rename to zha/application/platforms/siren.py diff --git a/zha/switch.py b/zha/application/platforms/switch.py similarity index 100% rename from zha/switch.py rename to zha/application/platforms/switch.py diff --git a/zha/update.py b/zha/application/platforms/update.py similarity index 100% rename from zha/update.py rename to zha/application/platforms/update.py diff --git a/zha/core/registries.py b/zha/application/registries.py similarity index 95% rename from zha/core/registries.py rename to zha/application/registries.py index 40ef27a34..cc0324e88 100644 --- a/zha/core/registries.py +++ b/zha/application/registries.py @@ -18,7 +18,7 @@ from .decorators import DictRegistry, NestedDictRegistry, SetRegistry if TYPE_CHECKING: - from ..entity import ZhaEntity, ZhaGroupEntity + from ..application.entity import ZhaEntity, ZhaGroupEntity from .cluster_handlers import ClientClusterHandler, ClusterHandler @@ -107,12 +107,12 @@ DEVICE_CLASS = collections.defaultdict(dict, DEVICE_CLASS) CLUSTER_HANDLER_ONLY_CLUSTERS = SetRegistry() -CLIENT_CLUSTER_HANDLER_REGISTRY: DictRegistry[ - type[ClientClusterHandler] -] = DictRegistry() -ZIGBEE_CLUSTER_HANDLER_REGISTRY: NestedDictRegistry[ - type[ClusterHandler] -] = NestedDictRegistry() +CLIENT_CLUSTER_HANDLER_REGISTRY: DictRegistry[type[ClientClusterHandler]] = ( + DictRegistry() +) +ZIGBEE_CLUSTER_HANDLER_REGISTRY: NestedDictRegistry[type[ClusterHandler]] = ( + NestedDictRegistry() +) WEIGHT_ATTR = attrgetter("weight") @@ -277,9 +277,9 @@ class ZHAEntityRegistry: def __init__(self) -> None: """Initialize Registry instance.""" - self._strict_registry: dict[ - Platform, dict[MatchRule, type[ZhaEntity]] - ] = collections.defaultdict(dict) + self._strict_registry: dict[Platform, dict[MatchRule, type[ZhaEntity]]] = ( + collections.defaultdict(dict) + ) self._multi_entity_registry: dict[ Platform, dict[int | str | None, dict[MatchRule, list[type[ZhaEntity]]]] ] = collections.defaultdict( @@ -291,9 +291,9 @@ def __init__(self) -> None: lambda: collections.defaultdict(lambda: collections.defaultdict(list)) ) self._group_registry: dict[str, type[ZhaGroupEntity]] = {} - self.single_device_matches: dict[ - Platform, dict[EUI64, list[str]] - ] = collections.defaultdict(lambda: collections.defaultdict(list)) + self.single_device_matches: dict[Platform, dict[EUI64, list[str]]] = ( + collections.defaultdict(lambda: collections.defaultdict(list)) + ) def get_entity( self, @@ -323,9 +323,9 @@ def get_multi_entity( dict[Platform, list[EntityClassAndClusterHandlers]], list[ClusterHandler] ]: """Match ZHA cluster handlers to potentially multiple ZHA Entity classes.""" - result: dict[ - Platform, list[EntityClassAndClusterHandlers] - ] = collections.defaultdict(list) + result: dict[Platform, list[EntityClassAndClusterHandlers]] = ( + collections.defaultdict(list) + ) all_claimed: set[ClusterHandler] = set() for component, stop_match_groups in self._multi_entity_registry.items(): for stop_match_grp, matches in stop_match_groups.items(): @@ -356,9 +356,9 @@ def get_config_diagnostic_entity( dict[Platform, list[EntityClassAndClusterHandlers]], list[ClusterHandler] ]: """Match ZHA cluster handlers to potentially multiple ZHA Entity classes.""" - result: dict[ - Platform, list[EntityClassAndClusterHandlers] - ] = collections.defaultdict(list) + result: dict[Platform, list[EntityClassAndClusterHandlers]] = ( + collections.defaultdict(list) + ) all_claimed: set[ClusterHandler] = set() for ( component, diff --git a/zha/backup.py b/zha/backup.py deleted file mode 100644 index 25d5a83b6..000000000 --- a/zha/backup.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Backup platform for the ZHA integration.""" - -import logging - -from homeassistant.core import HomeAssistant - -from .core.helpers import get_zha_gateway - -_LOGGER = logging.getLogger(__name__) - - -async def async_pre_backup(hass: HomeAssistant) -> None: - """Perform operations before a backup starts.""" - _LOGGER.debug("Performing coordinator backup") - - zha_gateway = get_zha_gateway(hass) - await zha_gateway.application_controller.backups.create_backup(load_devices=True) - - -async def async_post_backup(hass: HomeAssistant) -> None: - """Perform operations after a backup finishes.""" diff --git a/zha/config_flow.py b/zha/config_flow.py deleted file mode 100644 index c2b644e3f..000000000 --- a/zha/config_flow.py +++ /dev/null @@ -1,783 +0,0 @@ -"""Config flow for ZHA.""" - -from __future__ import annotations - -import collections -from contextlib import suppress -import json -from typing import Any - -from homeassistant.components import onboarding, usb, zeroconf -from homeassistant.components.file_upload import process_uploaded_file -from homeassistant.components.hassio import AddonError, AddonState -from homeassistant.components.homeassistant_hardware import silabs_multiprotocol_addon -from homeassistant.components.homeassistant_yellow import hardware as yellow_hardware -from homeassistant.config_entries import ( - SOURCE_IGNORE, - SOURCE_ZEROCONF, - ConfigEntry, - ConfigEntryBaseFlow, - ConfigEntryState, - ConfigFlow, - ConfigFlowResult, - OperationNotAllowed, - OptionsFlow, -) -from homeassistant.const import CONF_NAME -from homeassistant.core import HomeAssistant, callback -from homeassistant.exceptions import HomeAssistantError -from homeassistant.helpers.selector import FileSelector, FileSelectorConfig -from homeassistant.util import dt as dt_util -import serial.tools.list_ports -from serial.tools.list_ports_common import ListPortInfo -import voluptuous as vol -import zigpy.backups -from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH - -from .core.const import ( - CONF_BAUDRATE, - CONF_FLOW_CONTROL, - CONF_RADIO_TYPE, - DOMAIN, - RadioType, -) -from .radio_manager import ( - DEVICE_SCHEMA, - HARDWARE_DISCOVERY_SCHEMA, - RECOMMENDED_RADIOS, - ProbeResult, - ZhaRadioManager, -) - -CONF_MANUAL_PATH = "Enter Manually" -SUPPORTED_PORT_SETTINGS = ( - CONF_BAUDRATE, - CONF_FLOW_CONTROL, -) -DECONZ_DOMAIN = "deconz" - -FORMATION_STRATEGY = "formation_strategy" -FORMATION_FORM_NEW_NETWORK = "form_new_network" -FORMATION_FORM_INITIAL_NETWORK = "form_initial_network" -FORMATION_REUSE_SETTINGS = "reuse_settings" -FORMATION_CHOOSE_AUTOMATIC_BACKUP = "choose_automatic_backup" -FORMATION_UPLOAD_MANUAL_BACKUP = "upload_manual_backup" - -CHOOSE_AUTOMATIC_BACKUP = "choose_automatic_backup" -OVERWRITE_COORDINATOR_IEEE = "overwrite_coordinator_ieee" - -OPTIONS_INTENT_MIGRATE = "intent_migrate" -OPTIONS_INTENT_RECONFIGURE = "intent_reconfigure" - -UPLOADED_BACKUP_FILE = "uploaded_backup_file" - -REPAIR_MY_URL = "https://my.home-assistant.io/redirect/repairs/" - -DEFAULT_ZHA_ZEROCONF_PORT = 6638 -ESPHOME_API_PORT = 6053 - - -def _format_backup_choice( - backup: zigpy.backups.NetworkBackup, *, pan_ids: bool = True -) -> str: - """Format network backup info into a short piece of text.""" - if not pan_ids: - return dt_util.as_local(backup.backup_time).strftime("%c") - - identifier = ( - # PAN ID - f"{str(backup.network_info.pan_id)[2:]}" - # EPID - f":{str(backup.network_info.extended_pan_id).replace(':', '')}" - ).lower() - - return f"{dt_util.as_local(backup.backup_time).strftime('%c')} ({identifier})" - - -async def list_serial_ports(hass: HomeAssistant) -> list[ListPortInfo]: - """List all serial ports, including the Yellow radio and the multi-PAN addon.""" - ports = await hass.async_add_executor_job(serial.tools.list_ports.comports) - - # Add useful info to the Yellow's serial port selection screen - try: - yellow_hardware.async_info(hass) - except HomeAssistantError: - pass - else: - yellow_radio = next(p for p in ports if p.device == "/dev/ttyAMA1") - yellow_radio.description = "Yellow Zigbee module" - yellow_radio.manufacturer = "Nabu Casa" - - # Present the multi-PAN addon as a setup option, if it's available - multipan_manager = await silabs_multiprotocol_addon.get_multiprotocol_addon_manager( - hass - ) - - try: - addon_info = await multipan_manager.async_get_addon_info() - except (AddonError, KeyError): - addon_info = None - - if addon_info is not None and addon_info.state != AddonState.NOT_INSTALLED: - addon_port = ListPortInfo( - device=silabs_multiprotocol_addon.get_zigbee_socket(), - skip_link_detection=True, - ) - - addon_port.description = "Multiprotocol add-on" - addon_port.manufacturer = "Nabu Casa" - ports.append(addon_port) - - return ports - - -class BaseZhaFlow(ConfigEntryBaseFlow): - """Mixin for common ZHA flow steps and forms.""" - - _hass: HomeAssistant - - def __init__(self) -> None: - """Initialize flow instance.""" - super().__init__() - - self._hass = None # type: ignore[assignment] - self._radio_mgr = ZhaRadioManager() - self._title: str | None = None - - @property - def hass(self): - """Return hass.""" - return self._hass - - @hass.setter - def hass(self, hass): - """Set hass.""" - self._hass = hass - self._radio_mgr.hass = hass - - async def _async_create_radio_entry(self) -> ConfigFlowResult: - """Create a config entry with the current flow state.""" - assert self._title is not None - assert self._radio_mgr.radio_type is not None - assert self._radio_mgr.device_path is not None - assert self._radio_mgr.device_settings is not None - - device_settings = self._radio_mgr.device_settings.copy() - device_settings[CONF_DEVICE_PATH] = await self.hass.async_add_executor_job( - usb.get_serial_by_id, self._radio_mgr.device_path - ) - - return self.async_create_entry( - title=self._title, - data={ - CONF_DEVICE: DEVICE_SCHEMA(device_settings), - CONF_RADIO_TYPE: self._radio_mgr.radio_type.name, - }, - ) - - async def async_step_choose_serial_port( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Choose a serial port.""" - ports = await list_serial_ports(self.hass) - list_of_ports = [ - f"{p}{', s/n: ' + p.serial_number if p.serial_number else ''}" - + (f" - {p.manufacturer}" if p.manufacturer else "") - for p in ports - ] - - if not list_of_ports: - return await self.async_step_manual_pick_radio_type() - - list_of_ports.append(CONF_MANUAL_PATH) - - if user_input is not None: - user_selection = user_input[CONF_DEVICE_PATH] - - if user_selection == CONF_MANUAL_PATH: - return await self.async_step_manual_pick_radio_type() - - port = ports[list_of_ports.index(user_selection)] - self._radio_mgr.device_path = port.device - - probe_result = await self._radio_mgr.detect_radio_type() - if probe_result == ProbeResult.WRONG_FIRMWARE_INSTALLED: - return self.async_abort( - reason="wrong_firmware_installed", - description_placeholders={"repair_url": REPAIR_MY_URL}, - ) - if probe_result == ProbeResult.PROBING_FAILED: - # Did not autodetect anything, proceed to manual selection - return await self.async_step_manual_pick_radio_type() - - self._title = ( - f"{port.description}{', s/n: ' + port.serial_number if port.serial_number else ''}" - f" - {port.manufacturer}" - if port.manufacturer - else "" - ) - - return await self.async_step_verify_radio() - - # Pre-select the currently configured port - default_port = vol.UNDEFINED - - if self._radio_mgr.device_path is not None: - for description, port in zip(list_of_ports, ports): - if port.device == self._radio_mgr.device_path: - default_port = description - break - else: - default_port = CONF_MANUAL_PATH - - schema = vol.Schema( - { - vol.Required(CONF_DEVICE_PATH, default=default_port): vol.In( - list_of_ports - ) - } - ) - return self.async_show_form(step_id="choose_serial_port", data_schema=schema) - - async def async_step_manual_pick_radio_type( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Manually select the radio type.""" - if user_input is not None: - self._radio_mgr.radio_type = RadioType.get_by_description( - user_input[CONF_RADIO_TYPE] - ) - return await self.async_step_manual_port_config() - - # Pre-select the current radio type - default = vol.UNDEFINED - - if self._radio_mgr.radio_type is not None: - default = self._radio_mgr.radio_type.description - - schema = { - vol.Required(CONF_RADIO_TYPE, default=default): vol.In(RadioType.list()) - } - - return self.async_show_form( - step_id="manual_pick_radio_type", - data_schema=vol.Schema(schema), - ) - - async def async_step_manual_port_config( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Enter port settings specific for this type of radio.""" - assert self._radio_mgr.radio_type is not None - errors = {} - - if user_input is not None: - self._title = user_input[CONF_DEVICE_PATH] - self._radio_mgr.device_path = user_input[CONF_DEVICE_PATH] - self._radio_mgr.device_settings = user_input.copy() - - if await self._radio_mgr.radio_type.controller.probe(user_input): - return await self.async_step_verify_radio() - - errors["base"] = "cannot_connect" - - schema = { - vol.Required( - CONF_DEVICE_PATH, default=self._radio_mgr.device_path or vol.UNDEFINED - ): str - } - - source = self.context.get("source") - for ( - param, - value, - ) in DEVICE_SCHEMA.schema.items(): - if param not in SUPPORTED_PORT_SETTINGS: - continue - - if source == SOURCE_ZEROCONF and param == CONF_BAUDRATE: - value = 115200 - param = vol.Required(CONF_BAUDRATE, default=value) - elif ( - self._radio_mgr.device_settings is not None - and param in self._radio_mgr.device_settings - ): - param = vol.Required( - str(param), default=self._radio_mgr.device_settings[param] - ) - - schema[param] = value - - return self.async_show_form( - step_id="manual_port_config", - data_schema=vol.Schema(schema), - errors=errors, - ) - - async def async_step_verify_radio( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Add a warning step to dissuade the use of deprecated radios.""" - assert self._radio_mgr.radio_type is not None - - # Skip this step if we are using a recommended radio - if user_input is not None or self._radio_mgr.radio_type in RECOMMENDED_RADIOS: - return await self.async_step_choose_formation_strategy() - - return self.async_show_form( - step_id="verify_radio", - description_placeholders={ - CONF_NAME: self._radio_mgr.radio_type.description, - "docs_recommended_adapters_url": ( - "https://www.home-assistant.io/integrations/zha/#recommended-zigbee-radio-adapters-and-modules" - ), - }, - ) - - async def async_step_choose_formation_strategy( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Choose how to deal with the current radio's settings.""" - await self._radio_mgr.async_load_network_settings() - - strategies = [] - - # Check if we have any automatic backups *and* if the backups differ from - # the current radio settings, if they exist (since restoring would be redundant) - if self._radio_mgr.backups and ( - self._radio_mgr.current_settings is None - or any( - not backup.is_compatible_with(self._radio_mgr.current_settings) - for backup in self._radio_mgr.backups - ) - ): - strategies.append(CHOOSE_AUTOMATIC_BACKUP) - - if self._radio_mgr.current_settings is not None: - strategies.append(FORMATION_REUSE_SETTINGS) - - strategies.append(FORMATION_UPLOAD_MANUAL_BACKUP) - - # Do not show "erase network settings" if there are none to erase - if self._radio_mgr.current_settings is None: - strategies.append(FORMATION_FORM_INITIAL_NETWORK) - else: - strategies.append(FORMATION_FORM_NEW_NETWORK) - - # Automatically form a new network if we're onboarding with a brand new radio - if not onboarding.async_is_onboarded(self.hass) and set(strategies) == { - FORMATION_UPLOAD_MANUAL_BACKUP, - FORMATION_FORM_INITIAL_NETWORK, - }: - return await self.async_step_form_initial_network() - - # Otherwise, let the user choose - return self.async_show_menu( - step_id="choose_formation_strategy", - menu_options=strategies, - ) - - async def async_step_reuse_settings( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Reuse the existing network settings on the stick.""" - return await self._async_create_radio_entry() - - async def async_step_form_initial_network( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Form an initial network.""" - # This step exists only for translations, it does nothing new - return await self.async_step_form_new_network(user_input) - - async def async_step_form_new_network( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Form a brand-new network.""" - await self._radio_mgr.async_form_network() - return await self._async_create_radio_entry() - - def _parse_uploaded_backup( - self, uploaded_file_id: str - ) -> zigpy.backups.NetworkBackup: - """Read and parse an uploaded backup JSON file.""" - with process_uploaded_file(self.hass, uploaded_file_id) as file_path: - contents = file_path.read_text() - - return zigpy.backups.NetworkBackup.from_dict(json.loads(contents)) - - async def async_step_upload_manual_backup( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Upload and restore a coordinator backup JSON file.""" - errors = {} - - if user_input is not None: - try: - self._radio_mgr.chosen_backup = await self.hass.async_add_executor_job( - self._parse_uploaded_backup, user_input[UPLOADED_BACKUP_FILE] - ) - except ValueError: - errors["base"] = "invalid_backup_json" - else: - return await self.async_step_maybe_confirm_ezsp_restore() - - return self.async_show_form( - step_id="upload_manual_backup", - data_schema=vol.Schema( - { - vol.Required(UPLOADED_BACKUP_FILE): FileSelector( - FileSelectorConfig(accept=".json,application/json") - ) - } - ), - errors=errors, - ) - - async def async_step_choose_automatic_backup( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Choose an automatic backup.""" - if self.show_advanced_options: - # Always show the PAN IDs when in advanced mode - choices = [ - _format_backup_choice(backup, pan_ids=True) - for backup in self._radio_mgr.backups - ] - else: - # Only show the PAN IDs for multiple backups taken on the same day - num_backups_on_date = collections.Counter( - backup.backup_time.date() for backup in self._radio_mgr.backups - ) - choices = [ - _format_backup_choice( - backup, pan_ids=(num_backups_on_date[backup.backup_time.date()] > 1) - ) - for backup in self._radio_mgr.backups - ] - - if user_input is not None: - index = choices.index(user_input[CHOOSE_AUTOMATIC_BACKUP]) - self._radio_mgr.chosen_backup = self._radio_mgr.backups[index] - - return await self.async_step_maybe_confirm_ezsp_restore() - - return self.async_show_form( - step_id="choose_automatic_backup", - data_schema=vol.Schema( - { - vol.Required(CHOOSE_AUTOMATIC_BACKUP, default=choices[0]): vol.In( - choices - ), - } - ), - ) - - async def async_step_maybe_confirm_ezsp_restore( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Confirm restore for EZSP radios that require permanent IEEE writes.""" - call_step_2 = await self._radio_mgr.async_restore_backup_step_1() - if not call_step_2: - return await self._async_create_radio_entry() - - if user_input is not None: - await self._radio_mgr.async_restore_backup_step_2( - user_input[OVERWRITE_COORDINATOR_IEEE] - ) - return await self._async_create_radio_entry() - - return self.async_show_form( - step_id="maybe_confirm_ezsp_restore", - data_schema=vol.Schema( - {vol.Required(OVERWRITE_COORDINATOR_IEEE, default=True): bool} - ), - ) - - -class ZhaConfigFlowHandler(BaseZhaFlow, ConfigFlow, domain=DOMAIN): - """Handle a config flow.""" - - VERSION = 4 - - async def _set_unique_id_and_update_ignored_flow( - self, unique_id: str, device_path: str - ) -> None: - """Set the flow's unique ID and update the device path in an ignored flow.""" - current_entry = await self.async_set_unique_id(unique_id) - - if not current_entry: - return - - if current_entry.source != SOURCE_IGNORE: - self._abort_if_unique_id_configured() - else: - # Only update the current entry if it is an ignored discovery - self._abort_if_unique_id_configured( - updates={ - CONF_DEVICE: { - **current_entry.data.get(CONF_DEVICE, {}), - CONF_DEVICE_PATH: device_path, - }, - } - ) - - @staticmethod - @callback - def async_get_options_flow( - config_entry: ConfigEntry, - ) -> OptionsFlow: - """Create the options flow.""" - return ZhaOptionsFlowHandler(config_entry) - - async def async_step_user( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Handle a ZHA config flow start.""" - if self._async_current_entries(): - return self.async_abort(reason="single_instance_allowed") - - return await self.async_step_choose_serial_port(user_input) - - async def async_step_confirm( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Confirm a discovery.""" - self._set_confirm_only() - - # Don't permit discovery if ZHA is already set up - if self._async_current_entries(): - return self.async_abort(reason="single_instance_allowed") - - # Without confirmation, discovery can automatically progress into parts of the - # config flow logic that interacts with hardware. - if user_input is not None or not onboarding.async_is_onboarded(self.hass): - # Probe the radio type if we don't have one yet - if self._radio_mgr.radio_type is None: - probe_result = await self._radio_mgr.detect_radio_type() - else: - probe_result = ProbeResult.RADIO_TYPE_DETECTED - - if probe_result == ProbeResult.WRONG_FIRMWARE_INSTALLED: - return self.async_abort( - reason="wrong_firmware_installed", - description_placeholders={"repair_url": REPAIR_MY_URL}, - ) - if probe_result == ProbeResult.PROBING_FAILED: - # This path probably will not happen now that we have - # more precise USB matching unless there is a problem - # with the device - return self.async_abort(reason="usb_probe_failed") - - if self._radio_mgr.device_settings is None: - return await self.async_step_manual_port_config() - - return await self.async_step_verify_radio() - - return self.async_show_form( - step_id="confirm", - description_placeholders={CONF_NAME: self._title}, - ) - - async def async_step_usb( - self, discovery_info: usb.UsbServiceInfo - ) -> ConfigFlowResult: - """Handle usb discovery.""" - vid = discovery_info.vid - pid = discovery_info.pid - serial_number = discovery_info.serial_number - manufacturer = discovery_info.manufacturer - description = discovery_info.description - dev_path = discovery_info.device - - await self._set_unique_id_and_update_ignored_flow( - unique_id=f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}", - device_path=dev_path, - ) - - # If they already have a discovery for deconz we ignore the usb discovery as - # they probably want to use it there instead - if self.hass.config_entries.flow.async_progress_by_handler(DECONZ_DOMAIN): - return self.async_abort(reason="not_zha_device") - for entry in self.hass.config_entries.async_entries(DECONZ_DOMAIN): - if entry.source != SOURCE_IGNORE: - return self.async_abort(reason="not_zha_device") - - self._radio_mgr.device_path = dev_path - self._title = description or usb.human_readable_device_name( - dev_path, - serial_number, - manufacturer, - description, - vid, - pid, - ) - self.context["title_placeholders"] = {CONF_NAME: self._title} - return await self.async_step_confirm() - - async def async_step_zeroconf( - self, discovery_info: zeroconf.ZeroconfServiceInfo - ) -> ConfigFlowResult: - """Handle zeroconf discovery.""" - - # Hostname is format: livingroom.local. - local_name = discovery_info.hostname[:-1] - port = discovery_info.port or DEFAULT_ZHA_ZEROCONF_PORT - - # Fix incorrect port for older TubesZB devices - if "tube" in local_name and port == ESPHOME_API_PORT: - port = DEFAULT_ZHA_ZEROCONF_PORT - - if "radio_type" in discovery_info.properties: - self._radio_mgr.radio_type = self._radio_mgr.parse_radio_type( - discovery_info.properties["radio_type"] - ) - elif "efr32" in local_name: - self._radio_mgr.radio_type = RadioType.ezsp - else: - self._radio_mgr.radio_type = RadioType.znp - - node_name = local_name.removesuffix(".local") - device_path = f"socket://{discovery_info.host}:{port}" - - await self._set_unique_id_and_update_ignored_flow( - unique_id=node_name, - device_path=device_path, - ) - - self.context["title_placeholders"] = {CONF_NAME: node_name} - self._title = device_path - self._radio_mgr.device_path = device_path - - return await self.async_step_confirm() - - async def async_step_hardware( - self, data: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Handle hardware flow.""" - try: - discovery_data = HARDWARE_DISCOVERY_SCHEMA(data) - except vol.Invalid: - return self.async_abort(reason="invalid_hardware_data") - - name = discovery_data["name"] - radio_type = self._radio_mgr.parse_radio_type(discovery_data["radio_type"]) - device_settings = discovery_data["port"] - device_path = device_settings[CONF_DEVICE_PATH] - - await self._set_unique_id_and_update_ignored_flow( - unique_id=f"{name}_{radio_type.name}_{device_path}", - device_path=device_path, - ) - - self._title = name - self._radio_mgr.radio_type = radio_type - self._radio_mgr.device_path = device_path - self._radio_mgr.device_settings = device_settings - self.context["title_placeholders"] = {CONF_NAME: name} - - return await self.async_step_confirm() - - -class ZhaOptionsFlowHandler(BaseZhaFlow, OptionsFlow): - """Handle an options flow.""" - - def __init__(self, config_entry: ConfigEntry) -> None: - """Initialize options flow.""" - super().__init__() - self.config_entry = config_entry - - self._radio_mgr.device_path = config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH] - self._radio_mgr.device_settings = config_entry.data[CONF_DEVICE] - self._radio_mgr.radio_type = RadioType[config_entry.data[CONF_RADIO_TYPE]] - self._title = config_entry.title - - async def async_step_init( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Launch the options flow.""" - if user_input is not None: - # OperationNotAllowed: ZHA is not running - with suppress(OperationNotAllowed): - await self.hass.config_entries.async_unload(self.config_entry.entry_id) - - return await self.async_step_prompt_migrate_or_reconfigure() - - return self.async_show_form(step_id="init") - - async def async_step_prompt_migrate_or_reconfigure( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Confirm if we are migrating adapters or just re-configuring.""" - - return self.async_show_menu( - step_id="prompt_migrate_or_reconfigure", - menu_options=[ - OPTIONS_INTENT_RECONFIGURE, - OPTIONS_INTENT_MIGRATE, - ], - ) - - async def async_step_intent_reconfigure( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Virtual step for when the user is reconfiguring the integration.""" - return await self.async_step_choose_serial_port() - - async def async_step_intent_migrate( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Confirm the user wants to reset their current radio.""" - - if user_input is not None: - await self._radio_mgr.async_reset_adapter() - - return await self.async_step_instruct_unplug() - - return self.async_show_form(step_id="intent_migrate") - - async def async_step_instruct_unplug( - self, user_input: dict[str, Any] | None = None - ) -> ConfigFlowResult: - """Instruct the user to unplug the current radio, if possible.""" - - if user_input is not None: - # Now that the old radio is gone, we can scan for serial ports again - return await self.async_step_choose_serial_port() - - return self.async_show_form(step_id="instruct_unplug") - - async def _async_create_radio_entry(self): - """Re-implementation of the base flow's final step to update the config.""" - device_settings = self._radio_mgr.device_settings.copy() - device_settings[CONF_DEVICE_PATH] = await self.hass.async_add_executor_job( - usb.get_serial_by_id, self._radio_mgr.device_path - ) - - # Avoid creating both `.options` and `.data` by directly writing `data` here - self.hass.config_entries.async_update_entry( - entry=self.config_entry, - data={ - CONF_DEVICE: device_settings, - CONF_RADIO_TYPE: self._radio_mgr.radio_type.name, - }, - options=self.config_entry.options, - ) - - # Reload ZHA after we finish - await self.hass.config_entries.async_setup(self.config_entry.entry_id) - - # Intentionally do not set `data` to avoid creating `options`, we set it above - return self.async_create_entry(title=self._title, data={}) - - def async_remove(self): - """Maybe reload ZHA if the flow is aborted.""" - if self.config_entry.state not in ( - ConfigEntryState.SETUP_ERROR, - ConfigEntryState.NOT_LOADED, - ): - return - - self.hass.async_create_task( - self.hass.config_entries.async_setup(self.config_entry.entry_id) - ) diff --git a/zha/device_action.py b/zha/device_action.py deleted file mode 100644 index b52f5a472..000000000 --- a/zha/device_action.py +++ /dev/null @@ -1,232 +0,0 @@ -"""Provides device actions for ZHA devices.""" - -from __future__ import annotations - -from typing import Any - -from homeassistant.components.device_automation import InvalidDeviceAutomationConfig -from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_TYPE -from homeassistant.core import Context, HomeAssistant -from homeassistant.helpers import config_validation as cv -from homeassistant.helpers.typing import ConfigType, TemplateVarsType -import voluptuous as vol - -from . import DOMAIN -from .core.cluster_handlers.manufacturerspecific import ( - AllLEDEffectType, - SingleLEDEffectType, -) -from .core.const import CLUSTER_HANDLER_IAS_WD, CLUSTER_HANDLER_INOVELLI -from .core.helpers import async_get_zha_device -from .websocket_api import SERVICE_WARNING_DEVICE_SQUAWK, SERVICE_WARNING_DEVICE_WARN - -# mypy: disallow-any-generics - -ACTION_SQUAWK = "squawk" -ACTION_WARN = "warn" -ATTR_DATA = "data" -ATTR_IEEE = "ieee" -CONF_ZHA_ACTION_TYPE = "zha_action_type" -ZHA_ACTION_TYPE_SERVICE_CALL = "service_call" -ZHA_ACTION_TYPE_CLUSTER_HANDLER_COMMAND = "cluster_handler_command" -INOVELLI_ALL_LED_EFFECT = "issue_all_led_effect" -INOVELLI_INDIVIDUAL_LED_EFFECT = "issue_individual_led_effect" - -DEFAULT_ACTION_SCHEMA = cv.DEVICE_ACTION_BASE_SCHEMA.extend( - { - vol.Required(CONF_DOMAIN): DOMAIN, - vol.Required(CONF_TYPE): vol.In({ACTION_SQUAWK, ACTION_WARN}), - } -) - -INOVELLI_ALL_LED_EFFECT_SCHEMA = cv.DEVICE_ACTION_BASE_SCHEMA.extend( - { - vol.Required(CONF_TYPE): INOVELLI_ALL_LED_EFFECT, - vol.Required(CONF_DOMAIN): DOMAIN, - vol.Required("effect_type"): AllLEDEffectType.__getitem__, - vol.Required("color"): vol.All(vol.Coerce(int), vol.Range(0, 255)), - vol.Required("level"): vol.All(vol.Coerce(int), vol.Range(0, 100)), - vol.Required("duration"): vol.All(vol.Coerce(int), vol.Range(1, 255)), - } -) - -INOVELLI_INDIVIDUAL_LED_EFFECT_SCHEMA = INOVELLI_ALL_LED_EFFECT_SCHEMA.extend( - { - vol.Required(CONF_TYPE): INOVELLI_INDIVIDUAL_LED_EFFECT, - vol.Required("effect_type"): SingleLEDEffectType.__getitem__, - vol.Required("led_number"): vol.All(vol.Coerce(int), vol.Range(0, 6)), - } -) - -ACTION_SCHEMA_MAP = { - INOVELLI_ALL_LED_EFFECT: INOVELLI_ALL_LED_EFFECT_SCHEMA, - INOVELLI_INDIVIDUAL_LED_EFFECT: INOVELLI_INDIVIDUAL_LED_EFFECT_SCHEMA, -} - -ACTION_SCHEMA = vol.Any( - INOVELLI_ALL_LED_EFFECT_SCHEMA, - INOVELLI_INDIVIDUAL_LED_EFFECT_SCHEMA, - DEFAULT_ACTION_SCHEMA, -) - -DEVICE_ACTIONS = { - CLUSTER_HANDLER_IAS_WD: [ - {CONF_TYPE: ACTION_SQUAWK, CONF_DOMAIN: DOMAIN}, - {CONF_TYPE: ACTION_WARN, CONF_DOMAIN: DOMAIN}, - ], - CLUSTER_HANDLER_INOVELLI: [ - {CONF_TYPE: INOVELLI_ALL_LED_EFFECT, CONF_DOMAIN: DOMAIN}, - {CONF_TYPE: INOVELLI_INDIVIDUAL_LED_EFFECT, CONF_DOMAIN: DOMAIN}, - ], -} - -DEVICE_ACTION_TYPES = { - ACTION_SQUAWK: ZHA_ACTION_TYPE_SERVICE_CALL, - ACTION_WARN: ZHA_ACTION_TYPE_SERVICE_CALL, - INOVELLI_ALL_LED_EFFECT: ZHA_ACTION_TYPE_CLUSTER_HANDLER_COMMAND, - INOVELLI_INDIVIDUAL_LED_EFFECT: ZHA_ACTION_TYPE_CLUSTER_HANDLER_COMMAND, -} - -DEVICE_ACTION_SCHEMAS = { - INOVELLI_ALL_LED_EFFECT: vol.Schema( - { - vol.Required("effect_type"): vol.In(AllLEDEffectType.__members__.keys()), - vol.Required("color"): vol.All(vol.Coerce(int), vol.Range(0, 255)), - vol.Required("level"): vol.All(vol.Coerce(int), vol.Range(0, 100)), - vol.Required("duration"): vol.All(vol.Coerce(int), vol.Range(1, 255)), - } - ), - INOVELLI_INDIVIDUAL_LED_EFFECT: vol.Schema( - { - vol.Required("led_number"): vol.All(vol.Coerce(int), vol.Range(0, 6)), - vol.Required("effect_type"): vol.In(SingleLEDEffectType.__members__.keys()), - vol.Required("color"): vol.All(vol.Coerce(int), vol.Range(0, 255)), - vol.Required("level"): vol.All(vol.Coerce(int), vol.Range(0, 100)), - vol.Required("duration"): vol.All(vol.Coerce(int), vol.Range(1, 255)), - } - ), -} - -SERVICE_NAMES = { - ACTION_SQUAWK: SERVICE_WARNING_DEVICE_SQUAWK, - ACTION_WARN: SERVICE_WARNING_DEVICE_WARN, -} - -CLUSTER_HANDLER_MAPPINGS = { - INOVELLI_ALL_LED_EFFECT: CLUSTER_HANDLER_INOVELLI, - INOVELLI_INDIVIDUAL_LED_EFFECT: CLUSTER_HANDLER_INOVELLI, -} - - -async def async_call_action_from_config( - hass: HomeAssistant, - config: ConfigType, - variables: TemplateVarsType, - context: Context | None, -) -> None: - """Perform an action based on configuration.""" - await ZHA_ACTION_TYPES[DEVICE_ACTION_TYPES[config[CONF_TYPE]]]( - hass, config, variables, context - ) - - -async def async_validate_action_config( - hass: HomeAssistant, config: ConfigType -) -> ConfigType: - """Validate config.""" - schema = ACTION_SCHEMA_MAP.get(config[CONF_TYPE], DEFAULT_ACTION_SCHEMA) - config = schema(config) - return config - - -async def async_get_actions( - hass: HomeAssistant, device_id: str -) -> list[dict[str, str]]: - """List device actions.""" - try: - zha_device = async_get_zha_device(hass, device_id) - except (KeyError, AttributeError): - return [] - cluster_handlers = [ - ch.name - for endpoint in zha_device.endpoints.values() - for ch in endpoint.claimed_cluster_handlers.values() - ] - actions = [ - action - for cluster_handler, cluster_handler_actions in DEVICE_ACTIONS.items() - for action in cluster_handler_actions - if cluster_handler in cluster_handlers - ] - for action in actions: - action[CONF_DEVICE_ID] = device_id - return actions - - -async def async_get_action_capabilities( - hass: HomeAssistant, config: ConfigType -) -> dict[str, vol.Schema]: - """List action capabilities.""" - - return {"extra_fields": DEVICE_ACTION_SCHEMAS.get(config[CONF_TYPE], {})} - - -async def _execute_service_based_action( - hass: HomeAssistant, - config: dict[str, Any], - variables: TemplateVarsType, - context: Context | None, -) -> None: - action_type = config[CONF_TYPE] - service_name = SERVICE_NAMES[action_type] - try: - zha_device = async_get_zha_device(hass, config[CONF_DEVICE_ID]) - except (KeyError, AttributeError): - return - - service_data = {ATTR_IEEE: str(zha_device.ieee)} - - await hass.services.async_call( - DOMAIN, service_name, service_data, blocking=True, context=context - ) - - -async def _execute_cluster_handler_command_based_action( - hass: HomeAssistant, - config: dict[str, Any], - variables: TemplateVarsType, - context: Context | None, -) -> None: - action_type = config[CONF_TYPE] - cluster_handler_name = CLUSTER_HANDLER_MAPPINGS[action_type] - try: - zha_device = async_get_zha_device(hass, config[CONF_DEVICE_ID]) - except (KeyError, AttributeError): - return - - action_cluster_handler = None - for endpoint in zha_device.endpoints.values(): - for cluster_handler in endpoint.all_cluster_handlers.values(): - if cluster_handler.name == cluster_handler_name: - action_cluster_handler = cluster_handler - break - - if action_cluster_handler is None: - raise InvalidDeviceAutomationConfig( - f"Unable to execute cluster handler action - cluster handler: {cluster_handler_name} action:" - f" {action_type}" - ) - - if not hasattr(action_cluster_handler, action_type): - raise InvalidDeviceAutomationConfig( - f"Unable to execute cluster handler - cluster handler: {cluster_handler_name} action:" - f" {action_type}" - ) - - await getattr(action_cluster_handler, action_type)(**config) - - -ZHA_ACTION_TYPES = { - ZHA_ACTION_TYPE_SERVICE_CALL: _execute_service_based_action, - ZHA_ACTION_TYPE_CLUSTER_HANDLER_COMMAND: _execute_cluster_handler_command_based_action, -} diff --git a/zha/device_trigger.py b/zha/device_trigger.py deleted file mode 100644 index 355c72078..000000000 --- a/zha/device_trigger.py +++ /dev/null @@ -1,112 +0,0 @@ -"""Provides device automations for ZHA devices that emit events.""" - -from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA -from homeassistant.components.device_automation.exceptions import ( - InvalidDeviceAutomationConfig, -) -from homeassistant.components.homeassistant.triggers import event as event_trigger -from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_PLATFORM, CONF_TYPE -from homeassistant.core import CALLBACK_TYPE, HomeAssistant -from homeassistant.exceptions import HomeAssistantError -from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo -from homeassistant.helpers.typing import ConfigType -import voluptuous as vol - -from . import DOMAIN as ZHA_DOMAIN -from .core.const import ZHA_EVENT -from .core.helpers import async_get_zha_device, get_zha_data - -CONF_SUBTYPE = "subtype" -DEVICE = "device" -DEVICE_IEEE = "device_ieee" - -TRIGGER_SCHEMA = DEVICE_TRIGGER_BASE_SCHEMA.extend( - {vol.Required(CONF_TYPE): str, vol.Required(CONF_SUBTYPE): str} -) - - -def _get_device_trigger_data(hass: HomeAssistant, device_id: str) -> tuple[str, dict]: - """Get device trigger data for a device, falling back to the cache if possible.""" - - # First, try checking to see if the device itself is accessible - try: - zha_device = async_get_zha_device(hass, device_id) - except ValueError: - pass - else: - return str(zha_device.ieee), zha_device.device_automation_triggers - - # If not, check the trigger cache but allow any `KeyError`s to propagate - return get_zha_data(hass).device_trigger_cache[device_id] - - -async def async_validate_trigger_config( - hass: HomeAssistant, config: ConfigType -) -> ConfigType: - """Validate config.""" - config = TRIGGER_SCHEMA(config) - - # Trigger validation will not occur if the config entry is not loaded - _, triggers = _get_device_trigger_data(hass, config[CONF_DEVICE_ID]) - - trigger = (config[CONF_TYPE], config[CONF_SUBTYPE]) - if trigger not in triggers: - raise InvalidDeviceAutomationConfig(f"device does not have trigger {trigger}") - - return config - - -async def async_attach_trigger( - hass: HomeAssistant, - config: ConfigType, - action: TriggerActionType, - trigger_info: TriggerInfo, -) -> CALLBACK_TYPE: - """Listen for state changes based on configuration.""" - - try: - ieee, triggers = _get_device_trigger_data(hass, config[CONF_DEVICE_ID]) - except KeyError as err: - raise HomeAssistantError( - f"Unable to get zha device {config[CONF_DEVICE_ID]}" - ) from err - - trigger_key: tuple[str, str] = (config[CONF_TYPE], config[CONF_SUBTYPE]) - - if trigger_key not in triggers: - raise HomeAssistantError(f"Unable to find trigger {trigger_key}") - - event_config = event_trigger.TRIGGER_SCHEMA( - { - event_trigger.CONF_PLATFORM: "event", - event_trigger.CONF_EVENT_TYPE: ZHA_EVENT, - event_trigger.CONF_EVENT_DATA: {DEVICE_IEEE: ieee, **triggers[trigger_key]}, - } - ) - return await event_trigger.async_attach_trigger( - hass, event_config, action, trigger_info, platform_type="device" - ) - - -async def async_get_triggers( - hass: HomeAssistant, device_id: str -) -> list[dict[str, str]]: - """List device triggers. - - Make sure the device supports device automations and return the trigger list. - """ - try: - _, triggers = _get_device_trigger_data(hass, device_id) - except KeyError as err: - raise InvalidDeviceAutomationConfig from err - - return [ - { - CONF_DEVICE_ID: device_id, - CONF_DOMAIN: ZHA_DOMAIN, - CONF_PLATFORM: DEVICE, - CONF_TYPE: trigger, - CONF_SUBTYPE: subtype, - } - for trigger, subtype in triggers - ] diff --git a/zha/diagnostics.py b/zha/diagnostics.py deleted file mode 100644 index 8a63ee52b..000000000 --- a/zha/diagnostics.py +++ /dev/null @@ -1,175 +0,0 @@ -"""Provides diagnostics for ZHA.""" - -from __future__ import annotations - -import dataclasses -from importlib.metadata import version -from typing import Any - -from homeassistant.components.diagnostics.util import async_redact_data -from homeassistant.config_entries import ConfigEntry -from homeassistant.const import CONF_ID, CONF_NAME, CONF_UNIQUE_ID -from homeassistant.core import HomeAssistant -from homeassistant.helpers import device_registry as dr -from zigpy.config import CONF_NWK_EXTENDED_PAN_ID -from zigpy.profiles import PROFILES -from zigpy.types import Channels -from zigpy.zcl import Cluster - -from .core.const import ( - ATTR_ATTRIBUTE_NAME, - ATTR_DEVICE_TYPE, - ATTR_IEEE, - ATTR_IN_CLUSTERS, - ATTR_OUT_CLUSTERS, - ATTR_PROFILE_ID, - ATTR_VALUE, - CONF_ALARM_MASTER_CODE, - UNKNOWN, -) -from .core.device import ZHADevice -from .core.gateway import ZHAGateway -from .core.helpers import async_get_zha_device, get_zha_data, get_zha_gateway - -KEYS_TO_REDACT = { - ATTR_IEEE, - CONF_UNIQUE_ID, - CONF_ALARM_MASTER_CODE, - "network_key", - CONF_NWK_EXTENDED_PAN_ID, - "partner_ieee", -} - -ATTRIBUTES = "attributes" -CLUSTER_DETAILS = "cluster_details" -UNSUPPORTED_ATTRIBUTES = "unsupported_attributes" - - -def shallow_asdict(obj: Any) -> dict: - """Return a shallow copy of a dataclass as a dict.""" - if hasattr(obj, "__dataclass_fields__"): - result = {} - - for field in dataclasses.fields(obj): - result[field.name] = shallow_asdict(getattr(obj, field.name)) - - return result - if hasattr(obj, "as_dict"): - return obj.as_dict() - return obj - - -async def async_get_config_entry_diagnostics( - hass: HomeAssistant, config_entry: ConfigEntry -) -> dict[str, Any]: - """Return diagnostics for a config entry.""" - zha_data = get_zha_data(hass) - gateway: ZHAGateway = get_zha_gateway(hass) - app = gateway.application_controller - - energy_scan = await app.energy_scan( - channels=Channels.ALL_CHANNELS, duration_exp=4, count=1 - ) - - return async_redact_data( - { - "config": zha_data.yaml_config, - "config_entry": config_entry.as_dict(), - "application_state": shallow_asdict(app.state), - "energy_scan": { - channel: 100 * energy / 255 for channel, energy in energy_scan.items() - }, - "versions": { - "bellows": version("bellows"), - "zigpy": version("zigpy"), - "zigpy_deconz": version("zigpy-deconz"), - "zigpy_xbee": version("zigpy-xbee"), - "zigpy_znp": version("zigpy_znp"), - "zigpy_zigate": version("zigpy-zigate"), - "zhaquirks": version("zha-quirks"), - }, - "devices": [ - { - "manufacturer": device.manufacturer, - "model": device.model, - "logical_type": device.device_type, - } - for device in gateway.devices.values() - ], - }, - KEYS_TO_REDACT, - ) - - -async def async_get_device_diagnostics( - hass: HomeAssistant, config_entry: ConfigEntry, device: dr.DeviceEntry -) -> dict[str, Any]: - """Return diagnostics for a device.""" - zha_device: ZHADevice = async_get_zha_device(hass, device.id) - device_info: dict[str, Any] = zha_device.zha_device_info - device_info[CLUSTER_DETAILS] = get_endpoint_cluster_attr_data(zha_device) - return async_redact_data(device_info, KEYS_TO_REDACT) - - -def get_endpoint_cluster_attr_data(zha_device: ZHADevice) -> dict: - """Return endpoint cluster attribute data.""" - cluster_details = {} - for ep_id, endpoint in zha_device.device.endpoints.items(): - if ep_id == 0: - continue - endpoint_key = ( - f"{PROFILES.get(endpoint.profile_id).DeviceType(endpoint.device_type).name}" - if PROFILES.get(endpoint.profile_id) is not None - and endpoint.device_type is not None - else UNKNOWN - ) - cluster_details[ep_id] = { - ATTR_DEVICE_TYPE: { - CONF_NAME: endpoint_key, - CONF_ID: endpoint.device_type, - }, - ATTR_PROFILE_ID: endpoint.profile_id, - ATTR_IN_CLUSTERS: { - f"0x{cluster_id:04x}": { - "endpoint_attribute": cluster.ep_attribute, - **get_cluster_attr_data(cluster), - } - for cluster_id, cluster in endpoint.in_clusters.items() - }, - ATTR_OUT_CLUSTERS: { - f"0x{cluster_id:04x}": { - "endpoint_attribute": cluster.ep_attribute, - **get_cluster_attr_data(cluster), - } - for cluster_id, cluster in endpoint.out_clusters.items() - }, - } - return cluster_details - - -def get_cluster_attr_data(cluster: Cluster) -> dict: - """Return cluster attribute data.""" - unsupported_attributes = {} - for u_attr in cluster.unsupported_attributes: - try: - u_attr_def = cluster.find_attribute(u_attr) - unsupported_attributes[f"0x{u_attr_def.id:04x}"] = { - ATTR_ATTRIBUTE_NAME: u_attr_def.name - } - except KeyError: - if isinstance(u_attr, int): - unsupported_attributes[f"0x{u_attr:04x}"] = {} - else: - unsupported_attributes[u_attr] = {} - - return { - ATTRIBUTES: { - f"0x{attr_id:04x}": { - ATTR_ATTRIBUTE_NAME: attr_def.name, - ATTR_VALUE: attr_value, - } - for attr_id, attr_def in cluster.attributes.items() - if (attr_value := cluster.get(attr_def.name)) is not None - }, - UNSUPPORTED_ATTRIBUTES: unsupported_attributes, - } diff --git a/zha/logbook.py b/zha/logbook.py deleted file mode 100644 index e63ef5658..000000000 --- a/zha/logbook.py +++ /dev/null @@ -1,83 +0,0 @@ -"""Describe ZHA logbook events.""" - -from __future__ import annotations - -from collections.abc import Callable -from typing import TYPE_CHECKING - -from homeassistant.components.logbook import LOGBOOK_ENTRY_MESSAGE, LOGBOOK_ENTRY_NAME -from homeassistant.const import ATTR_COMMAND, ATTR_DEVICE_ID -from homeassistant.core import Event, HomeAssistant, callback -import homeassistant.helpers.device_registry as dr - -from .core.const import DOMAIN as ZHA_DOMAIN, ZHA_EVENT -from .core.helpers import async_get_zha_device - -if TYPE_CHECKING: - from .core.device import ZHADevice - - -@callback -def async_describe_events( - hass: HomeAssistant, - async_describe_event: Callable[[str, str, Callable[[Event], dict[str, str]]], None], -) -> None: - """Describe logbook events.""" - device_registry = dr.async_get(hass) - - @callback - def async_describe_zha_event(event: Event) -> dict[str, str]: - """Describe ZHA logbook event.""" - device: dr.DeviceEntry | None = None - device_name: str = "Unknown device" - zha_device: ZHADevice | None = None - event_data = event.data - event_type: str | None = None - event_subtype: str | None = None - - try: - device = device_registry.devices[event.data[ATTR_DEVICE_ID]] - if device: - device_name = device.name_by_user or device.name or "Unknown device" - zha_device = async_get_zha_device(hass, event.data[ATTR_DEVICE_ID]) - except (KeyError, AttributeError): - pass - - if ( - zha_device - and (command := event_data.get(ATTR_COMMAND)) - and (command_to_etype_subtype := zha_device.device_automation_commands) - and (etype_subtypes := command_to_etype_subtype.get(command)) - ): - all_triggers = zha_device.device_automation_triggers - for etype_subtype in etype_subtypes: - trigger = all_triggers[etype_subtype] - if not all( - event_data.get(key) == value for key, value in trigger.items() - ): - continue - event_type, event_subtype = etype_subtype - break - - if event_type is None: - event_type = event_data.get(ATTR_COMMAND, ZHA_EVENT) - - if event_subtype is not None and event_subtype != event_type: - event_type = f"{event_type} - {event_subtype}" - - if event_type is not None: - event_type = event_type.replace("_", " ").title() - if "event" in event_type.lower(): - message = f"{event_type} was fired" - else: - message = f"{event_type} event was fired" - - if params := event_data.get("params"): - message = f"{message} with parameters: {params}" - - return { - LOGBOOK_ENTRY_NAME: device_name, - LOGBOOK_ENTRY_MESSAGE: message, - } - - async_describe_event(ZHA_DOMAIN, ZHA_EVENT, async_describe_zha_event) diff --git a/zha/radio_manager.py b/zha/radio_manager.py deleted file mode 100644 index d64e8059b..000000000 --- a/zha/radio_manager.py +++ /dev/null @@ -1,464 +0,0 @@ -"""Config flow for ZHA.""" - -from __future__ import annotations - -import asyncio -import contextlib -from contextlib import suppress -import copy -import enum -import logging -import os -from typing import Any, Self - -from bellows.config import CONF_USE_THREAD -from homeassistant import config_entries -from homeassistant.components import usb -from homeassistant.core import HomeAssistant -import voluptuous as vol -from zigpy.application import ControllerApplication -import zigpy.backups -from zigpy.config import ( - CONF_DATABASE, - CONF_DEVICE, - CONF_DEVICE_PATH, - CONF_NWK_BACKUP_ENABLED, - SCHEMA_DEVICE, -) -from zigpy.exceptions import NetworkNotFormed - -from . import repairs -from .core.const import ( - CONF_RADIO_TYPE, - CONF_ZIGPY, - DEFAULT_DATABASE_NAME, - EZSP_OVERWRITE_EUI64, - RadioType, -) -from .core.helpers import get_zha_data - -# Only the common radio types will be autoprobed, ordered by new device popularity. -# XBee takes too long to probe since it scans through all possible bauds and likely has -# very few users to begin with. -AUTOPROBE_RADIOS = ( - RadioType.ezsp, - RadioType.znp, - RadioType.deconz, - RadioType.zigate, -) - -RECOMMENDED_RADIOS = ( - RadioType.ezsp, - RadioType.znp, - RadioType.deconz, -) - -CONNECT_DELAY_S = 1.0 -RETRY_DELAY_S = 1.0 - -BACKUP_RETRIES = 5 -MIGRATION_RETRIES = 100 - - -DEVICE_SCHEMA = vol.Schema( - { - vol.Required("path"): str, - vol.Optional("baudrate", default=115200): int, - vol.Optional("flow_control", default=None): vol.In( - ["hardware", "software", None] - ), - } -) - -HARDWARE_DISCOVERY_SCHEMA = vol.Schema( - { - vol.Required("name"): str, - vol.Required("port"): DEVICE_SCHEMA, - vol.Required("radio_type"): str, - } -) - -HARDWARE_MIGRATION_SCHEMA = vol.Schema( - { - vol.Required("new_discovery_info"): HARDWARE_DISCOVERY_SCHEMA, - vol.Required("old_discovery_info"): vol.Schema( - { - vol.Exclusive("hw", "discovery"): HARDWARE_DISCOVERY_SCHEMA, - vol.Exclusive("usb", "discovery"): usb.UsbServiceInfo, - } - ), - } -) - -_LOGGER = logging.getLogger(__name__) - - -class ProbeResult(enum.StrEnum): - """Radio firmware probing result.""" - - RADIO_TYPE_DETECTED = "radio_type_detected" - WRONG_FIRMWARE_INSTALLED = "wrong_firmware_installed" - PROBING_FAILED = "probing_failed" - - -def _allow_overwrite_ezsp_ieee( - backup: zigpy.backups.NetworkBackup, -) -> zigpy.backups.NetworkBackup: - """Return a new backup with the flag to allow overwriting the EZSP EUI64.""" - new_stack_specific = copy.deepcopy(backup.network_info.stack_specific) - new_stack_specific.setdefault("ezsp", {})[EZSP_OVERWRITE_EUI64] = True - - return backup.replace( - network_info=backup.network_info.replace(stack_specific=new_stack_specific) - ) - - -def _prevent_overwrite_ezsp_ieee( - backup: zigpy.backups.NetworkBackup, -) -> zigpy.backups.NetworkBackup: - """Return a new backup without the flag to allow overwriting the EZSP EUI64.""" - if "ezsp" not in backup.network_info.stack_specific: - return backup - - new_stack_specific = copy.deepcopy(backup.network_info.stack_specific) - new_stack_specific.setdefault("ezsp", {}).pop(EZSP_OVERWRITE_EUI64, None) - - return backup.replace( - network_info=backup.network_info.replace(stack_specific=new_stack_specific) - ) - - -class ZhaRadioManager: - """Helper class with radio related functionality.""" - - hass: HomeAssistant - - def __init__(self) -> None: - """Initialize ZhaRadioManager instance.""" - self.device_path: str | None = None - self.device_settings: dict[str, Any] | None = None - self.radio_type: RadioType | None = None - self.current_settings: zigpy.backups.NetworkBackup | None = None - self.backups: list[zigpy.backups.NetworkBackup] = [] - self.chosen_backup: zigpy.backups.NetworkBackup | None = None - - @classmethod - def from_config_entry( - cls, hass: HomeAssistant, config_entry: config_entries.ConfigEntry - ) -> Self: - """Create an instance from a config entry.""" - mgr = cls() - mgr.hass = hass - mgr.device_path = config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH] - mgr.device_settings = config_entry.data[CONF_DEVICE] - mgr.radio_type = RadioType[config_entry.data[CONF_RADIO_TYPE]] - - return mgr - - @contextlib.asynccontextmanager - async def connect_zigpy_app(self) -> ControllerApplication: - """Connect to the radio with the current config and then clean up.""" - assert self.radio_type is not None - - config = get_zha_data(self.hass).yaml_config - app_config = config.get(CONF_ZIGPY, {}).copy() - - database_path = config.get( - CONF_DATABASE, - self.hass.config.path(DEFAULT_DATABASE_NAME), - ) - - # Don't create `zigbee.db` if it doesn't already exist - if not await self.hass.async_add_executor_job(os.path.exists, database_path): - database_path = None - - app_config[CONF_DATABASE] = database_path - app_config[CONF_DEVICE] = self.device_settings - app_config[CONF_NWK_BACKUP_ENABLED] = False - app_config[CONF_USE_THREAD] = False - app_config = self.radio_type.controller.SCHEMA(app_config) - - app = await self.radio_type.controller.new( - app_config, auto_form=False, start_radio=False - ) - - try: - yield app - finally: - await app.shutdown() - await asyncio.sleep(CONNECT_DELAY_S) - - async def restore_backup( - self, backup: zigpy.backups.NetworkBackup, **kwargs: Any - ) -> None: - """Restore the provided network backup, passing through kwargs.""" - if self.current_settings is not None and self.current_settings.supersedes( - self.chosen_backup - ): - return - - async with self.connect_zigpy_app() as app: - await app.connect() - await app.backups.restore_backup(backup, **kwargs) - - @staticmethod - def parse_radio_type(radio_type: str) -> RadioType: - """Parse a radio type name, accounting for past aliases.""" - if radio_type == "efr32": - return RadioType.ezsp - - return RadioType[radio_type] - - async def detect_radio_type(self) -> ProbeResult: - """Probe all radio types on the current port.""" - assert self.device_path is not None - - for radio in AUTOPROBE_RADIOS: - _LOGGER.debug("Attempting to probe radio type %s", radio) - - dev_config = SCHEMA_DEVICE({CONF_DEVICE_PATH: self.device_path}) - probe_result = await radio.controller.probe(dev_config) - - if not probe_result: - continue - - # Radio library probing can succeed and return new device settings - if isinstance(probe_result, dict): - dev_config = probe_result - - self.radio_type = radio - self.device_settings = dev_config - - repairs.async_delete_blocking_issues(self.hass) - return ProbeResult.RADIO_TYPE_DETECTED - - with suppress(repairs.wrong_silabs_firmware.AlreadyRunningEZSP): - if await repairs.wrong_silabs_firmware.warn_on_wrong_silabs_firmware( - self.hass, self.device_path - ): - return ProbeResult.WRONG_FIRMWARE_INSTALLED - - return ProbeResult.PROBING_FAILED - - async def async_load_network_settings( - self, *, create_backup: bool = False - ) -> zigpy.backups.NetworkBackup | None: - """Connect to the radio and load its current network settings.""" - backup = None - - async with self.connect_zigpy_app() as app: - await app.connect() - - # Check if the stick has any settings and load them - try: - await app.load_network_info() - except NetworkNotFormed: - pass - else: - self.current_settings = zigpy.backups.NetworkBackup( - network_info=app.state.network_info, - node_info=app.state.node_info, - ) - - if create_backup: - backup = await app.backups.create_backup() - - # The list of backups will always exist - self.backups = app.backups.backups.copy() - self.backups.sort(reverse=True, key=lambda b: b.backup_time) - - return backup - - async def async_form_network(self) -> None: - """Form a brand-new network.""" - async with self.connect_zigpy_app() as app: - await app.connect() - await app.form_network() - - async def async_reset_adapter(self) -> None: - """Reset the current adapter.""" - async with self.connect_zigpy_app() as app: - await app.connect() - await app.reset_network_info() - - async def async_restore_backup_step_1(self) -> bool: - """Prepare restoring backup. - - Returns True if async_restore_backup_step_2 should be called. - """ - assert self.chosen_backup is not None - - if self.radio_type != RadioType.ezsp: - await self.restore_backup(self.chosen_backup) - return False - - # We have no way to partially load network settings if no network is formed - if self.current_settings is None: - # Since we are going to be restoring the backup anyways, write it to the - # radio without overwriting the IEEE but don't take a backup with these - # temporary settings - temp_backup = _prevent_overwrite_ezsp_ieee(self.chosen_backup) - await self.restore_backup(temp_backup, create_new=False) - await self.async_load_network_settings() - - assert self.current_settings is not None - - metadata = self.current_settings.network_info.metadata["ezsp"] - - if ( - self.current_settings.node_info.ieee == self.chosen_backup.node_info.ieee - or metadata["can_rewrite_custom_eui64"] - or not metadata["can_burn_userdata_custom_eui64"] - ): - # No point in prompting the user if the backup doesn't have a new IEEE - # address or if there is no way to overwrite the IEEE address a second time - await self.restore_backup(self.chosen_backup) - - return False - - return True - - async def async_restore_backup_step_2(self, overwrite_ieee: bool) -> None: - """Restore backup and optionally overwrite IEEE.""" - assert self.chosen_backup is not None - - backup = self.chosen_backup - - if overwrite_ieee: - backup = _allow_overwrite_ezsp_ieee(backup) - - # If the user declined to overwrite the IEEE *and* we wrote the backup to - # their empty radio above, restoring it again would be redundant. - await self.restore_backup(backup) - - -class ZhaMultiPANMigrationHelper: - """Helper class for automatic migration when upgrading the firmware of a radio. - - This class is currently only intended to be used when changing the firmware on the - radio used in the Home Assistant SkyConnect USB stick and the Home Assistant Yellow - from Zigbee only firmware to firmware supporting both Zigbee and Thread. - """ - - def __init__( - self, hass: HomeAssistant, config_entry: config_entries.ConfigEntry - ) -> None: - """Initialize MigrationHelper instance.""" - self._config_entry = config_entry - self._hass = hass - self._radio_mgr = ZhaRadioManager() - self._radio_mgr.hass = hass - - async def async_initiate_migration(self, data: dict[str, Any]) -> bool: - """Initiate ZHA migration. - - The passed data should contain: - - Discovery data identifying the device being firmware updated - - Discovery data for connecting to the device after the firmware update is - completed. - - Returns True if async_finish_migration should be called after the firmware - update is completed. - """ - migration_data = HARDWARE_MIGRATION_SCHEMA(data) - - name = migration_data["new_discovery_info"]["name"] - new_radio_type = ZhaRadioManager.parse_radio_type( - migration_data["new_discovery_info"]["radio_type"] - ) - - new_device_settings = SCHEMA_DEVICE( - migration_data["new_discovery_info"]["port"] - ) - - if "hw" in migration_data["old_discovery_info"]: - old_device_path = migration_data["old_discovery_info"]["hw"]["port"]["path"] - else: # usb - device = migration_data["old_discovery_info"]["usb"].device - old_device_path = await self._hass.async_add_executor_job( - usb.get_serial_by_id, device - ) - - if self._config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH] != old_device_path: - # ZHA is using another radio, do nothing - return False - - # OperationNotAllowed: ZHA is not running - with suppress(config_entries.OperationNotAllowed): - await self._hass.config_entries.async_unload(self._config_entry.entry_id) - - # Temporarily connect to the old radio to read its settings - config_entry_data = self._config_entry.data - old_radio_mgr = ZhaRadioManager() - old_radio_mgr.hass = self._hass - old_radio_mgr.device_path = config_entry_data[CONF_DEVICE][CONF_DEVICE_PATH] - old_radio_mgr.device_settings = config_entry_data[CONF_DEVICE] - old_radio_mgr.radio_type = RadioType[config_entry_data[CONF_RADIO_TYPE]] - - for retry in range(BACKUP_RETRIES): - try: - backup = await old_radio_mgr.async_load_network_settings( - create_backup=True - ) - break - except OSError as err: - if retry >= BACKUP_RETRIES - 1: - raise - - _LOGGER.debug( - "Failed to create backup %r, retrying in %s seconds", - err, - RETRY_DELAY_S, - ) - - await asyncio.sleep(RETRY_DELAY_S) - - # Then configure the radio manager for the new radio to use the new settings - self._radio_mgr.chosen_backup = backup - self._radio_mgr.radio_type = new_radio_type - self._radio_mgr.device_path = new_device_settings[CONF_DEVICE_PATH] - self._radio_mgr.device_settings = new_device_settings - device_settings = self._radio_mgr.device_settings.copy() # type: ignore[union-attr] - - # Update the config entry settings - self._hass.config_entries.async_update_entry( - entry=self._config_entry, - data={ - CONF_DEVICE: device_settings, - CONF_RADIO_TYPE: self._radio_mgr.radio_type.name, - }, - options=self._config_entry.options, - title=name, - ) - return True - - async def async_finish_migration(self) -> None: - """Finish ZHA migration. - - Throws an exception if the migration did not succeed. - """ - # Restore the backup, permanently overwriting the device IEEE address - for retry in range(MIGRATION_RETRIES): - try: - if await self._radio_mgr.async_restore_backup_step_1(): - await self._radio_mgr.async_restore_backup_step_2(True) - - break - except OSError as err: - if retry >= MIGRATION_RETRIES - 1: - raise - - _LOGGER.debug( - "Failed to restore backup %r, retrying in %s seconds", - err, - RETRY_DELAY_S, - ) - - await asyncio.sleep(RETRY_DELAY_S) - - _LOGGER.debug("Restored backup after %s retries", retry) - - # Launch ZHA again - # OperationNotAllowed: ZHA is not unloaded - with suppress(config_entries.OperationNotAllowed): - await self._hass.config_entries.async_setup(self._config_entry.entry_id) diff --git a/zha/silabs_multiprotocol.py b/zha/silabs_multiprotocol.py deleted file mode 100644 index aec52b4ac..000000000 --- a/zha/silabs_multiprotocol.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Silicon Labs Multiprotocol support.""" - -from __future__ import annotations - -import asyncio -import contextlib - -from homeassistant.components.homeassistant_hardware.silabs_multiprotocol_addon import ( - is_multiprotocol_url, -) -from homeassistant.core import HomeAssistant - -from . import api - -# The approximate time it takes ZHA to change channels on SiLabs coordinators -ZHA_CHANNEL_CHANGE_TIME_S = 10.27 - - -def _get_zha_url(hass: HomeAssistant) -> str | None: - """Return the ZHA radio path, or None if there's no ZHA config entry.""" - with contextlib.suppress(ValueError): - return api.async_get_radio_path(hass) - return None - - -async def _get_zha_channel(hass: HomeAssistant) -> int | None: - """Get ZHA channel, or None if there's no ZHA config entry.""" - zha_network_settings: api.NetworkBackup | None - with contextlib.suppress(ValueError): - zha_network_settings = await api.async_get_network_settings(hass) - if not zha_network_settings: - return None - channel: int = zha_network_settings.network_info.channel - # ZHA uses channel 0 when no channel is set - return channel or None - - -async def async_change_channel( - hass: HomeAssistant, channel: int, delay: float = 0 -) -> asyncio.Task | None: - """Set the channel to be used. - - Does nothing if not configured. - """ - zha_url = _get_zha_url(hass) - if not zha_url: - # ZHA is not configured - return None - - async def finish_migration() -> None: - """Finish the channel migration.""" - await asyncio.sleep(max(0, delay - ZHA_CHANNEL_CHANGE_TIME_S)) - return await api.async_change_channel(hass, channel) - - return hass.async_create_task(finish_migration()) - - -async def async_get_channel(hass: HomeAssistant) -> int | None: - """Return the channel. - - Returns None if not configured. - """ - zha_url = _get_zha_url(hass) - if not zha_url: - # ZHA is not configured - return None - - return await _get_zha_channel(hass) - - -async def async_using_multipan(hass: HomeAssistant) -> bool: - """Return if the multiprotocol device is used. - - Returns False if not configured. - """ - zha_url = _get_zha_url(hass) - if not zha_url: - # ZHA is not configured - return False - - return is_multiprotocol_url(zha_url) diff --git a/zha/websocket_api.py b/zha/websocket_api.py deleted file mode 100644 index 78b21c1f5..000000000 --- a/zha/websocket_api.py +++ /dev/null @@ -1,1577 +0,0 @@ -"""Web socket API for Zigbee Home Automation devices.""" - -from __future__ import annotations - -import asyncio -import logging -from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypeVar, cast - -from homeassistant.components import websocket_api -from homeassistant.const import ATTR_COMMAND, ATTR_ID, ATTR_NAME -from homeassistant.core import HomeAssistant, ServiceCall, callback -from homeassistant.helpers import entity_registry as er -import homeassistant.helpers.config_validation as cv -from homeassistant.helpers.dispatcher import async_dispatcher_connect -from homeassistant.helpers.service import async_register_admin_service -import voluptuous as vol -import zigpy.backups -from zigpy.config import CONF_DEVICE -from zigpy.config.validators import cv_boolean -from zigpy.types.named import EUI64, KeyData -from zigpy.zcl.clusters.security import IasAce -import zigpy.zdo.types as zdo_types - -from .api import ( - async_change_channel, - async_get_active_network_settings, - async_get_radio_type, -) -from .core.const import ( - ATTR_ARGS, - ATTR_ATTRIBUTE, - ATTR_CLUSTER_ID, - ATTR_CLUSTER_TYPE, - ATTR_COMMAND_TYPE, - ATTR_ENDPOINT_ID, - ATTR_IEEE, - ATTR_LEVEL, - ATTR_MANUFACTURER, - ATTR_MEMBERS, - ATTR_PARAMS, - ATTR_TYPE, - ATTR_VALUE, - ATTR_WARNING_DEVICE_DURATION, - ATTR_WARNING_DEVICE_MODE, - ATTR_WARNING_DEVICE_STROBE, - ATTR_WARNING_DEVICE_STROBE_DUTY_CYCLE, - ATTR_WARNING_DEVICE_STROBE_INTENSITY, - BINDINGS, - CLUSTER_COMMAND_SERVER, - CLUSTER_COMMANDS_CLIENT, - CLUSTER_COMMANDS_SERVER, - CLUSTER_HANDLER_IAS_WD, - CLUSTER_TYPE_IN, - CLUSTER_TYPE_OUT, - CUSTOM_CONFIGURATION, - DOMAIN, - EZSP_OVERWRITE_EUI64, - GROUP_ID, - GROUP_IDS, - GROUP_NAME, - MFG_CLUSTER_ID_START, - WARNING_DEVICE_MODE_EMERGENCY, - WARNING_DEVICE_SOUND_HIGH, - WARNING_DEVICE_SQUAWK_MODE_ARMED, - WARNING_DEVICE_STROBE_HIGH, - WARNING_DEVICE_STROBE_YES, - ZHA_ALARM_OPTIONS, - ZHA_CLUSTER_HANDLER_MSG, - ZHA_CONFIG_SCHEMAS, -) -from .core.gateway import EntityReference -from .core.group import GroupMember -from .core.helpers import ( - async_cluster_exists, - async_is_bindable_target, - cluster_command_schema_to_vol_schema, - convert_install_code, - get_matched_clusters, - get_zha_gateway, - qr_to_install_code, -) - -if TYPE_CHECKING: - from homeassistant.components.websocket_api.connection import ActiveConnection - - from .core.device import ZHADevice - from .core.gateway import ZHAGateway - -_LOGGER = logging.getLogger(__name__) - -TYPE = "type" -CLIENT = "client" -ID = "id" -RESPONSE = "response" -DEVICE_INFO = "device_info" - -ATTR_DURATION = "duration" -ATTR_GROUP = "group" -ATTR_IEEE_ADDRESS = "ieee_address" -ATTR_INSTALL_CODE = "install_code" -ATTR_NEW_CHANNEL = "new_channel" -ATTR_SOURCE_IEEE = "source_ieee" -ATTR_TARGET_IEEE = "target_ieee" -ATTR_QR_CODE = "qr_code" - -SERVICE_PERMIT = "permit" -SERVICE_REMOVE = "remove" -SERVICE_SET_ZIGBEE_CLUSTER_ATTRIBUTE = "set_zigbee_cluster_attribute" -SERVICE_ISSUE_ZIGBEE_CLUSTER_COMMAND = "issue_zigbee_cluster_command" -SERVICE_ISSUE_ZIGBEE_GROUP_COMMAND = "issue_zigbee_group_command" -SERVICE_DIRECT_ZIGBEE_BIND = "issue_direct_zigbee_bind" -SERVICE_DIRECT_ZIGBEE_UNBIND = "issue_direct_zigbee_unbind" -SERVICE_WARNING_DEVICE_SQUAWK = "warning_device_squawk" -SERVICE_WARNING_DEVICE_WARN = "warning_device_warn" -SERVICE_ZIGBEE_BIND = "service_zigbee_bind" -IEEE_SERVICE = "ieee_based_service" - -IEEE_SCHEMA = vol.All(cv.string, EUI64.convert) - -# typing typevar -_T = TypeVar("_T") - - -def _ensure_list_if_present(value: _T | None) -> list[_T] | list[Any] | None: - """Wrap value in list if it is provided and not one.""" - if value is None: - return None - return cast("list[_T]", value) if isinstance(value, list) else [value] - - -SERVICE_PERMIT_PARAMS = { - vol.Optional(ATTR_IEEE): IEEE_SCHEMA, - vol.Optional(ATTR_DURATION, default=60): vol.All( - vol.Coerce(int), vol.Range(0, 254) - ), - vol.Inclusive(ATTR_SOURCE_IEEE, "install_code"): IEEE_SCHEMA, - vol.Inclusive(ATTR_INSTALL_CODE, "install_code"): vol.All( - cv.string, convert_install_code - ), - vol.Exclusive(ATTR_QR_CODE, "install_code"): vol.All(cv.string, qr_to_install_code), -} - -SERVICE_SCHEMAS = { - SERVICE_PERMIT: vol.Schema( - vol.All( - cv.deprecated(ATTR_IEEE_ADDRESS, replacement_key=ATTR_IEEE), - SERVICE_PERMIT_PARAMS, - ) - ), - IEEE_SERVICE: vol.Schema( - vol.All( - cv.deprecated(ATTR_IEEE_ADDRESS, replacement_key=ATTR_IEEE), - {vol.Required(ATTR_IEEE): IEEE_SCHEMA}, - ) - ), - SERVICE_SET_ZIGBEE_CLUSTER_ATTRIBUTE: vol.Schema( - { - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - vol.Required(ATTR_ENDPOINT_ID): cv.positive_int, - vol.Required(ATTR_CLUSTER_ID): cv.positive_int, - vol.Optional(ATTR_CLUSTER_TYPE, default=CLUSTER_TYPE_IN): cv.string, - vol.Required(ATTR_ATTRIBUTE): vol.Any(cv.positive_int, str), - vol.Required(ATTR_VALUE): vol.Any(int, cv.boolean, cv.string), - vol.Optional(ATTR_MANUFACTURER): vol.All( - vol.Coerce(int), vol.Range(min=-1) - ), - } - ), - SERVICE_WARNING_DEVICE_SQUAWK: vol.Schema( - { - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - vol.Optional( - ATTR_WARNING_DEVICE_MODE, default=WARNING_DEVICE_SQUAWK_MODE_ARMED - ): cv.positive_int, - vol.Optional( - ATTR_WARNING_DEVICE_STROBE, default=WARNING_DEVICE_STROBE_YES - ): cv.positive_int, - vol.Optional( - ATTR_LEVEL, default=WARNING_DEVICE_SOUND_HIGH - ): cv.positive_int, - } - ), - SERVICE_WARNING_DEVICE_WARN: vol.Schema( - { - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - vol.Optional( - ATTR_WARNING_DEVICE_MODE, default=WARNING_DEVICE_MODE_EMERGENCY - ): cv.positive_int, - vol.Optional( - ATTR_WARNING_DEVICE_STROBE, default=WARNING_DEVICE_STROBE_YES - ): cv.positive_int, - vol.Optional( - ATTR_LEVEL, default=WARNING_DEVICE_SOUND_HIGH - ): cv.positive_int, - vol.Optional(ATTR_WARNING_DEVICE_DURATION, default=5): cv.positive_int, - vol.Optional( - ATTR_WARNING_DEVICE_STROBE_DUTY_CYCLE, default=0x00 - ): cv.positive_int, - vol.Optional( - ATTR_WARNING_DEVICE_STROBE_INTENSITY, default=WARNING_DEVICE_STROBE_HIGH - ): cv.positive_int, - } - ), - SERVICE_ISSUE_ZIGBEE_CLUSTER_COMMAND: vol.All( - vol.Schema( - { - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - vol.Required(ATTR_ENDPOINT_ID): cv.positive_int, - vol.Required(ATTR_CLUSTER_ID): cv.positive_int, - vol.Optional(ATTR_CLUSTER_TYPE, default=CLUSTER_TYPE_IN): cv.string, - vol.Required(ATTR_COMMAND): cv.positive_int, - vol.Required(ATTR_COMMAND_TYPE): cv.string, - vol.Exclusive(ATTR_ARGS, "attrs_params"): _ensure_list_if_present, - vol.Exclusive(ATTR_PARAMS, "attrs_params"): dict, - vol.Optional(ATTR_MANUFACTURER): vol.All( - vol.Coerce(int), vol.Range(min=-1) - ), - } - ), - cv.deprecated(ATTR_ARGS), - cv.has_at_least_one_key(ATTR_ARGS, ATTR_PARAMS), - ), - SERVICE_ISSUE_ZIGBEE_GROUP_COMMAND: vol.Schema( - { - vol.Required(ATTR_GROUP): cv.positive_int, - vol.Required(ATTR_CLUSTER_ID): cv.positive_int, - vol.Optional(ATTR_CLUSTER_TYPE, default=CLUSTER_TYPE_IN): cv.string, - vol.Required(ATTR_COMMAND): cv.positive_int, - vol.Optional(ATTR_ARGS, default=[]): cv.ensure_list, - vol.Optional(ATTR_MANUFACTURER): vol.All( - vol.Coerce(int), vol.Range(min=-1) - ), - } - ), -} - - -class ClusterBinding(NamedTuple): - """Describes a cluster binding.""" - - name: str - type: str - id: int - endpoint_id: int - - -def _cv_group_member(value: dict[str, Any]) -> GroupMember: - """Transform a group member.""" - return GroupMember( - ieee=value[ATTR_IEEE], - endpoint_id=value[ATTR_ENDPOINT_ID], - ) - - -def _cv_cluster_binding(value: dict[str, Any]) -> ClusterBinding: - """Transform a cluster binding.""" - return ClusterBinding( - name=value[ATTR_NAME], - type=value[ATTR_TYPE], - id=value[ATTR_ID], - endpoint_id=value[ATTR_ENDPOINT_ID], - ) - - -def _cv_zigpy_network_backup(value: dict[str, Any]) -> zigpy.backups.NetworkBackup: - """Transform a zigpy network backup.""" - - try: - return zigpy.backups.NetworkBackup.from_dict(value) - except ValueError as err: - raise vol.Invalid(str(err)) from err - - -GROUP_MEMBER_SCHEMA = vol.All( - vol.Schema( - { - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - vol.Required(ATTR_ENDPOINT_ID): vol.Coerce(int), - } - ), - _cv_group_member, -) - - -CLUSTER_BINDING_SCHEMA = vol.All( - vol.Schema( - { - vol.Required(ATTR_NAME): cv.string, - vol.Required(ATTR_TYPE): cv.string, - vol.Required(ATTR_ID): vol.Coerce(int), - vol.Required(ATTR_ENDPOINT_ID): vol.Coerce(int), - } - ), - _cv_cluster_binding, -) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required("type"): "zha/devices/permit", - **SERVICE_PERMIT_PARAMS, - } -) -@websocket_api.async_response -async def websocket_permit_devices( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Permit ZHA zigbee devices.""" - zha_gateway = get_zha_gateway(hass) - duration: int = msg[ATTR_DURATION] - ieee: EUI64 | None = msg.get(ATTR_IEEE) - - async def forward_messages(data): - """Forward events to websocket.""" - connection.send_message(websocket_api.event_message(msg["id"], data)) - - remove_dispatcher_function = async_dispatcher_connect( - hass, "zha_gateway_message", forward_messages - ) - - @callback - def async_cleanup() -> None: - """Remove signal listener and turn off debug mode.""" - zha_gateway.async_disable_debug_mode() - remove_dispatcher_function() - - connection.subscriptions[msg["id"]] = async_cleanup - zha_gateway.async_enable_debug_mode() - src_ieee: EUI64 - link_key: KeyData - if ATTR_SOURCE_IEEE in msg: - src_ieee = msg[ATTR_SOURCE_IEEE] - link_key = msg[ATTR_INSTALL_CODE] - _LOGGER.debug("Allowing join for %s device with link key", src_ieee) - await zha_gateway.application_controller.permit_with_link_key( - time_s=duration, node=src_ieee, link_key=link_key - ) - elif ATTR_QR_CODE in msg: - src_ieee, link_key = msg[ATTR_QR_CODE] - _LOGGER.debug("Allowing join for %s device with link key", src_ieee) - await zha_gateway.application_controller.permit_with_link_key( - time_s=duration, node=src_ieee, link_key=link_key - ) - else: - await zha_gateway.application_controller.permit(time_s=duration, node=ieee) - connection.send_result(msg[ID]) - - -@websocket_api.require_admin -@websocket_api.websocket_command({vol.Required(TYPE): "zha/devices"}) -@websocket_api.async_response -async def websocket_get_devices( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Get ZHA devices.""" - zha_gateway = get_zha_gateway(hass) - devices = [device.zha_device_info for device in zha_gateway.devices.values()] - connection.send_result(msg[ID], devices) - - -@callback -def _get_entity_name( - zha_gateway: ZHAGateway, entity_ref: EntityReference -) -> str | None: - entity_registry = er.async_get(zha_gateway.hass) - entry = entity_registry.async_get(entity_ref.reference_id) - return entry.name if entry else None - - -@callback -def _get_entity_original_name( - zha_gateway: ZHAGateway, entity_ref: EntityReference -) -> str | None: - entity_registry = er.async_get(zha_gateway.hass) - entry = entity_registry.async_get(entity_ref.reference_id) - return entry.original_name if entry else None - - -@websocket_api.require_admin -@websocket_api.websocket_command({vol.Required(TYPE): "zha/devices/groupable"}) -@websocket_api.async_response -async def websocket_get_groupable_devices( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Get ZHA devices that can be grouped.""" - zha_gateway = get_zha_gateway(hass) - - devices = [device for device in zha_gateway.devices.values() if device.is_groupable] - groupable_devices = [] - - for device in devices: - entity_refs = zha_gateway.device_registry[device.ieee] - for ep_id in device.async_get_groupable_endpoints(): - groupable_devices.append( - { - "endpoint_id": ep_id, - "entities": [ - { - "name": _get_entity_name(zha_gateway, entity_ref), - "original_name": _get_entity_original_name( - zha_gateway, entity_ref - ), - } - for entity_ref in entity_refs - if list(entity_ref.cluster_handlers.values())[ - 0 - ].cluster.endpoint.endpoint_id - == ep_id - ], - "device": device.zha_device_info, - } - ) - - connection.send_result(msg[ID], groupable_devices) - - -@websocket_api.require_admin -@websocket_api.websocket_command({vol.Required(TYPE): "zha/groups"}) -@websocket_api.async_response -async def websocket_get_groups( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Get ZHA groups.""" - zha_gateway = get_zha_gateway(hass) - groups = [group.group_info for group in zha_gateway.groups.values()] - connection.send_result(msg[ID], groups) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/device", - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - } -) -@websocket_api.async_response -async def websocket_get_device( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Get ZHA devices.""" - zha_gateway = get_zha_gateway(hass) - ieee: EUI64 = msg[ATTR_IEEE] - - if not (zha_device := zha_gateway.devices.get(ieee)): - connection.send_message( - websocket_api.error_message( - msg[ID], websocket_api.const.ERR_NOT_FOUND, "ZHA Device not found" - ) - ) - return - - device_info = zha_device.zha_device_info - connection.send_result(msg[ID], device_info) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/group", - vol.Required(GROUP_ID): cv.positive_int, - } -) -@websocket_api.async_response -async def websocket_get_group( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Get ZHA group.""" - zha_gateway = get_zha_gateway(hass) - group_id: int = msg[GROUP_ID] - - if not (zha_group := zha_gateway.groups.get(group_id)): - connection.send_message( - websocket_api.error_message( - msg[ID], websocket_api.const.ERR_NOT_FOUND, "ZHA Group not found" - ) - ) - return - - group_info = zha_group.group_info - connection.send_result(msg[ID], group_info) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/group/add", - vol.Required(GROUP_NAME): cv.string, - vol.Optional(GROUP_ID): cv.positive_int, - vol.Optional(ATTR_MEMBERS): vol.All(cv.ensure_list, [GROUP_MEMBER_SCHEMA]), - } -) -@websocket_api.async_response -async def websocket_add_group( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Add a new ZHA group.""" - zha_gateway = get_zha_gateway(hass) - group_name: str = msg[GROUP_NAME] - group_id: int | None = msg.get(GROUP_ID) - members: list[GroupMember] | None = msg.get(ATTR_MEMBERS) - group = await zha_gateway.async_create_zigpy_group(group_name, members, group_id) - assert group - connection.send_result(msg[ID], group.group_info) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/group/remove", - vol.Required(GROUP_IDS): vol.All(cv.ensure_list, [cv.positive_int]), - } -) -@websocket_api.async_response -async def websocket_remove_groups( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Remove the specified ZHA groups.""" - zha_gateway = get_zha_gateway(hass) - group_ids: list[int] = msg[GROUP_IDS] - - if len(group_ids) > 1: - tasks = [] - for group_id in group_ids: - tasks.append(zha_gateway.async_remove_zigpy_group(group_id)) - await asyncio.gather(*tasks) - else: - await zha_gateway.async_remove_zigpy_group(group_ids[0]) - ret_groups = [group.group_info for group in zha_gateway.groups.values()] - connection.send_result(msg[ID], ret_groups) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/group/members/add", - vol.Required(GROUP_ID): cv.positive_int, - vol.Required(ATTR_MEMBERS): vol.All(cv.ensure_list, [GROUP_MEMBER_SCHEMA]), - } -) -@websocket_api.async_response -async def websocket_add_group_members( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Add members to a ZHA group.""" - zha_gateway = get_zha_gateway(hass) - group_id: int = msg[GROUP_ID] - members: list[GroupMember] = msg[ATTR_MEMBERS] - - if not (zha_group := zha_gateway.groups.get(group_id)): - connection.send_message( - websocket_api.error_message( - msg[ID], websocket_api.const.ERR_NOT_FOUND, "ZHA Group not found" - ) - ) - return - - await zha_group.async_add_members(members) - ret_group = zha_group.group_info - connection.send_result(msg[ID], ret_group) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/group/members/remove", - vol.Required(GROUP_ID): cv.positive_int, - vol.Required(ATTR_MEMBERS): vol.All(cv.ensure_list, [GROUP_MEMBER_SCHEMA]), - } -) -@websocket_api.async_response -async def websocket_remove_group_members( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Remove members from a ZHA group.""" - zha_gateway = get_zha_gateway(hass) - group_id: int = msg[GROUP_ID] - members: list[GroupMember] = msg[ATTR_MEMBERS] - - if not (zha_group := zha_gateway.groups.get(group_id)): - connection.send_message( - websocket_api.error_message( - msg[ID], websocket_api.const.ERR_NOT_FOUND, "ZHA Group not found" - ) - ) - return - - await zha_group.async_remove_members(members) - ret_group = zha_group.group_info - connection.send_result(msg[ID], ret_group) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/devices/reconfigure", - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - } -) -@websocket_api.async_response -async def websocket_reconfigure_node( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Reconfigure a ZHA nodes entities by its ieee address.""" - zha_gateway = get_zha_gateway(hass) - ieee: EUI64 = msg[ATTR_IEEE] - device: ZHADevice | None = zha_gateway.get_device(ieee) - - async def forward_messages(data): - """Forward events to websocket.""" - connection.send_message(websocket_api.event_message(msg["id"], data)) - - remove_dispatcher_function = async_dispatcher_connect( - hass, ZHA_CLUSTER_HANDLER_MSG, forward_messages - ) - - @callback - def async_cleanup() -> None: - """Remove signal listener.""" - remove_dispatcher_function() - - connection.subscriptions[msg["id"]] = async_cleanup - - _LOGGER.debug("Reconfiguring node with ieee_address: %s", ieee) - assert device - hass.async_create_task(device.async_configure()) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/topology/update", - } -) -@websocket_api.async_response -async def websocket_update_topology( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Update the ZHA network topology.""" - zha_gateway = get_zha_gateway(hass) - hass.async_create_task(zha_gateway.application_controller.topology.scan()) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/devices/clusters", - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - } -) -@websocket_api.async_response -async def websocket_device_clusters( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Return a list of device clusters.""" - zha_gateway = get_zha_gateway(hass) - ieee: EUI64 = msg[ATTR_IEEE] - zha_device = zha_gateway.get_device(ieee) - response_clusters = [] - if zha_device is not None: - clusters_by_endpoint = zha_device.async_get_clusters() - for ep_id, clusters in clusters_by_endpoint.items(): - for c_id, cluster in clusters[CLUSTER_TYPE_IN].items(): - response_clusters.append( - { - TYPE: CLUSTER_TYPE_IN, - ID: c_id, - ATTR_NAME: cluster.__class__.__name__, - "endpoint_id": ep_id, - } - ) - for c_id, cluster in clusters[CLUSTER_TYPE_OUT].items(): - response_clusters.append( - { - TYPE: CLUSTER_TYPE_OUT, - ID: c_id, - ATTR_NAME: cluster.__class__.__name__, - "endpoint_id": ep_id, - } - ) - - connection.send_result(msg[ID], response_clusters) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/devices/clusters/attributes", - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - vol.Required(ATTR_ENDPOINT_ID): int, - vol.Required(ATTR_CLUSTER_ID): int, - vol.Required(ATTR_CLUSTER_TYPE): str, - } -) -@websocket_api.async_response -async def websocket_device_cluster_attributes( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Return a list of cluster attributes.""" - zha_gateway = get_zha_gateway(hass) - ieee: EUI64 = msg[ATTR_IEEE] - endpoint_id: int = msg[ATTR_ENDPOINT_ID] - cluster_id: int = msg[ATTR_CLUSTER_ID] - cluster_type: str = msg[ATTR_CLUSTER_TYPE] - cluster_attributes: list[dict[str, Any]] = [] - zha_device = zha_gateway.get_device(ieee) - attributes = None - if zha_device is not None: - attributes = zha_device.async_get_cluster_attributes( - endpoint_id, cluster_id, cluster_type - ) - if attributes is not None: - for attr_id, attr in attributes.items(): - cluster_attributes.append({ID: attr_id, ATTR_NAME: attr.name}) - _LOGGER.debug( - "Requested attributes for: %s: %s, %s: '%s', %s: %s, %s: %s", - ATTR_CLUSTER_ID, - cluster_id, - ATTR_CLUSTER_TYPE, - cluster_type, - ATTR_ENDPOINT_ID, - endpoint_id, - RESPONSE, - cluster_attributes, - ) - - connection.send_result(msg[ID], cluster_attributes) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/devices/clusters/commands", - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - vol.Required(ATTR_ENDPOINT_ID): int, - vol.Required(ATTR_CLUSTER_ID): int, - vol.Required(ATTR_CLUSTER_TYPE): str, - } -) -@websocket_api.async_response -async def websocket_device_cluster_commands( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Return a list of cluster commands.""" - import voluptuous_serialize # pylint: disable=import-outside-toplevel - - zha_gateway = get_zha_gateway(hass) - ieee: EUI64 = msg[ATTR_IEEE] - endpoint_id: int = msg[ATTR_ENDPOINT_ID] - cluster_id: int = msg[ATTR_CLUSTER_ID] - cluster_type: str = msg[ATTR_CLUSTER_TYPE] - zha_device = zha_gateway.get_device(ieee) - cluster_commands: list[dict[str, Any]] = [] - commands = None - if zha_device is not None: - commands = zha_device.async_get_cluster_commands( - endpoint_id, cluster_id, cluster_type - ) - - if commands is not None: - for cmd_id, cmd in commands[CLUSTER_COMMANDS_CLIENT].items(): - cluster_commands.append( - { - TYPE: CLIENT, - ID: cmd_id, - ATTR_NAME: cmd.name, - "schema": voluptuous_serialize.convert( - cluster_command_schema_to_vol_schema(cmd.schema), - custom_serializer=cv.custom_serializer, - ), - } - ) - for cmd_id, cmd in commands[CLUSTER_COMMANDS_SERVER].items(): - cluster_commands.append( - { - TYPE: CLUSTER_COMMAND_SERVER, - ID: cmd_id, - ATTR_NAME: cmd.name, - "schema": voluptuous_serialize.convert( - cluster_command_schema_to_vol_schema(cmd.schema), - custom_serializer=cv.custom_serializer, - ), - } - ) - _LOGGER.debug( - "Requested commands for: %s: %s, %s: '%s', %s: %s, %s: %s", - ATTR_CLUSTER_ID, - cluster_id, - ATTR_CLUSTER_TYPE, - cluster_type, - ATTR_ENDPOINT_ID, - endpoint_id, - RESPONSE, - cluster_commands, - ) - - connection.send_result(msg[ID], cluster_commands) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/devices/clusters/attributes/value", - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - vol.Required(ATTR_ENDPOINT_ID): int, - vol.Required(ATTR_CLUSTER_ID): int, - vol.Required(ATTR_CLUSTER_TYPE): str, - vol.Required(ATTR_ATTRIBUTE): int, - vol.Optional(ATTR_MANUFACTURER): cv.positive_int, - } -) -@websocket_api.async_response -async def websocket_read_zigbee_cluster_attributes( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Read zigbee attribute for cluster on ZHA entity.""" - zha_gateway = get_zha_gateway(hass) - ieee: EUI64 = msg[ATTR_IEEE] - endpoint_id: int = msg[ATTR_ENDPOINT_ID] - cluster_id: int = msg[ATTR_CLUSTER_ID] - cluster_type: str = msg[ATTR_CLUSTER_TYPE] - attribute: int = msg[ATTR_ATTRIBUTE] - manufacturer: int | None = msg.get(ATTR_MANUFACTURER) - zha_device = zha_gateway.get_device(ieee) - success = {} - failure = {} - if zha_device is not None: - cluster = zha_device.async_get_cluster( - endpoint_id, cluster_id, cluster_type=cluster_type - ) - success, failure = await cluster.read_attributes( - [attribute], allow_cache=False, only_cache=False, manufacturer=manufacturer - ) - _LOGGER.debug( - ( - "Read attribute for: %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s]" - " %s: [%s]," - ), - ATTR_CLUSTER_ID, - cluster_id, - ATTR_CLUSTER_TYPE, - cluster_type, - ATTR_ENDPOINT_ID, - endpoint_id, - ATTR_ATTRIBUTE, - attribute, - ATTR_MANUFACTURER, - manufacturer, - RESPONSE, - str(success.get(attribute)), - "failure", - failure, - ) - connection.send_result(msg[ID], str(success.get(attribute))) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/devices/bindable", - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - } -) -@websocket_api.async_response -async def websocket_get_bindable_devices( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Directly bind devices.""" - zha_gateway = get_zha_gateway(hass) - source_ieee: EUI64 = msg[ATTR_IEEE] - source_device = zha_gateway.get_device(source_ieee) - - devices = [ - device.zha_device_info - for device in zha_gateway.devices.values() - if async_is_bindable_target(source_device, device) - ] - - _LOGGER.debug( - "Get bindable devices: %s: [%s], %s: [%s]", - ATTR_SOURCE_IEEE, - source_ieee, - "bindable devices", - devices, - ) - - connection.send_message(websocket_api.result_message(msg[ID], devices)) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/devices/bind", - vol.Required(ATTR_SOURCE_IEEE): IEEE_SCHEMA, - vol.Required(ATTR_TARGET_IEEE): IEEE_SCHEMA, - } -) -@websocket_api.async_response -async def websocket_bind_devices( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Directly bind devices.""" - zha_gateway = get_zha_gateway(hass) - source_ieee: EUI64 = msg[ATTR_SOURCE_IEEE] - target_ieee: EUI64 = msg[ATTR_TARGET_IEEE] - await async_binding_operation( - zha_gateway, source_ieee, target_ieee, zdo_types.ZDOCmd.Bind_req - ) - _LOGGER.info( - "Devices bound: %s: [%s] %s: [%s]", - ATTR_SOURCE_IEEE, - source_ieee, - ATTR_TARGET_IEEE, - target_ieee, - ) - connection.send_result(msg[ID]) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/devices/unbind", - vol.Required(ATTR_SOURCE_IEEE): IEEE_SCHEMA, - vol.Required(ATTR_TARGET_IEEE): IEEE_SCHEMA, - } -) -@websocket_api.async_response -async def websocket_unbind_devices( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Remove a direct binding between devices.""" - zha_gateway = get_zha_gateway(hass) - source_ieee: EUI64 = msg[ATTR_SOURCE_IEEE] - target_ieee: EUI64 = msg[ATTR_TARGET_IEEE] - await async_binding_operation( - zha_gateway, source_ieee, target_ieee, zdo_types.ZDOCmd.Unbind_req - ) - _LOGGER.info( - "Devices un-bound: %s: [%s] %s: [%s]", - ATTR_SOURCE_IEEE, - source_ieee, - ATTR_TARGET_IEEE, - target_ieee, - ) - connection.send_result(msg[ID]) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/groups/bind", - vol.Required(ATTR_SOURCE_IEEE): IEEE_SCHEMA, - vol.Required(GROUP_ID): cv.positive_int, - vol.Required(BINDINGS): vol.All(cv.ensure_list, [CLUSTER_BINDING_SCHEMA]), - } -) -@websocket_api.async_response -async def websocket_bind_group( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Directly bind a device to a group.""" - zha_gateway = get_zha_gateway(hass) - source_ieee: EUI64 = msg[ATTR_SOURCE_IEEE] - group_id: int = msg[GROUP_ID] - bindings: list[ClusterBinding] = msg[BINDINGS] - source_device = zha_gateway.get_device(source_ieee) - assert source_device - await source_device.async_bind_to_group(group_id, bindings) - connection.send_result(msg[ID]) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/groups/unbind", - vol.Required(ATTR_SOURCE_IEEE): IEEE_SCHEMA, - vol.Required(GROUP_ID): cv.positive_int, - vol.Required(BINDINGS): vol.All(cv.ensure_list, [CLUSTER_BINDING_SCHEMA]), - } -) -@websocket_api.async_response -async def websocket_unbind_group( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Unbind a device from a group.""" - zha_gateway = get_zha_gateway(hass) - source_ieee: EUI64 = msg[ATTR_SOURCE_IEEE] - group_id: int = msg[GROUP_ID] - bindings: list[ClusterBinding] = msg[BINDINGS] - source_device = zha_gateway.get_device(source_ieee) - assert source_device - await source_device.async_unbind_from_group(group_id, bindings) - connection.send_result(msg[ID]) - - -async def async_binding_operation( - zha_gateway: ZHAGateway, - source_ieee: EUI64, - target_ieee: EUI64, - operation: zdo_types.ZDOCmd, -) -> None: - """Create or remove a direct zigbee binding between 2 devices.""" - - source_device = zha_gateway.get_device(source_ieee) - target_device = zha_gateway.get_device(target_ieee) - - assert source_device - assert target_device - clusters_to_bind = await get_matched_clusters(source_device, target_device) - - zdo = source_device.device.zdo - bind_tasks = [] - for binding_pair in clusters_to_bind: - op_msg = "cluster: %s %s --> [%s]" - op_params = ( - binding_pair.source_cluster.cluster_id, - operation.name, - target_ieee, - ) - zdo.debug(f"processing {op_msg}", *op_params) - - bind_tasks.append( - ( - zdo.request( - operation, - source_device.ieee, - binding_pair.source_cluster.endpoint.endpoint_id, - binding_pair.source_cluster.cluster_id, - binding_pair.destination_address, - ), - op_msg, - op_params, - ) - ) - res = await asyncio.gather(*(t[0] for t in bind_tasks), return_exceptions=True) - for outcome, log_msg in zip(res, bind_tasks): - if isinstance(outcome, Exception): - fmt = f"{log_msg[1]} failed: %s" - else: - fmt = f"{log_msg[1]} completed: %s" - zdo.debug(fmt, *(log_msg[2] + (outcome,))) - - -@websocket_api.require_admin -@websocket_api.websocket_command({vol.Required(TYPE): "zha/configuration"}) -@websocket_api.async_response -async def websocket_get_configuration( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Get ZHA configuration.""" - zha_gateway = get_zha_gateway(hass) - import voluptuous_serialize # pylint: disable=import-outside-toplevel - - def custom_serializer(schema: Any) -> Any: - """Serialize additional types for voluptuous_serialize.""" - if schema is cv_boolean: - return {"type": "bool"} - if schema is vol.Schema: - return voluptuous_serialize.convert( - schema, custom_serializer=custom_serializer - ) - - return cv.custom_serializer(schema) - - data: dict[str, dict[str, Any]] = {"schemas": {}, "data": {}} - for section, schema in ZHA_CONFIG_SCHEMAS.items(): - if section == ZHA_ALARM_OPTIONS and not async_cluster_exists( - hass, IasAce.cluster_id - ): - continue - data["schemas"][section] = voluptuous_serialize.convert( - schema, custom_serializer=custom_serializer - ) - data["data"][section] = zha_gateway.config_entry.options.get( - CUSTOM_CONFIGURATION, {} - ).get(section, {}) - - # send default values for unconfigured options - for entry in data["schemas"][section]: - if data["data"][section].get(entry["name"]) is None: - data["data"][section][entry["name"]] = entry["default"] - - connection.send_result(msg[ID], data) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/configuration/update", - vol.Required("data"): ZHA_CONFIG_SCHEMAS, - } -) -@websocket_api.async_response -async def websocket_update_zha_configuration( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Update the ZHA configuration.""" - zha_gateway = get_zha_gateway(hass) - options = zha_gateway.config_entry.options - data_to_save = {**options, **{CUSTOM_CONFIGURATION: msg["data"]}} - - for section, schema in ZHA_CONFIG_SCHEMAS.items(): - for entry in schema.schema: - # remove options that match defaults - if ( - data_to_save[CUSTOM_CONFIGURATION].get(section, {}).get(entry) - == entry.default() - ): - data_to_save[CUSTOM_CONFIGURATION][section].pop(entry) - # remove entire section block if empty - if ( - not data_to_save[CUSTOM_CONFIGURATION].get(section) - and section in data_to_save[CUSTOM_CONFIGURATION] - ): - data_to_save[CUSTOM_CONFIGURATION].pop(section) - - # remove entire custom_configuration block if empty - if ( - not data_to_save.get(CUSTOM_CONFIGURATION) - and CUSTOM_CONFIGURATION in data_to_save - ): - data_to_save.pop(CUSTOM_CONFIGURATION) - - _LOGGER.info( - "Updating ZHA custom configuration options from %s to %s", - options, - data_to_save, - ) - - hass.config_entries.async_update_entry( - zha_gateway.config_entry, options=data_to_save - ) - status = await hass.config_entries.async_reload(zha_gateway.config_entry.entry_id) - connection.send_result(msg[ID], status) - - -@websocket_api.require_admin -@websocket_api.websocket_command({vol.Required(TYPE): "zha/network/settings"}) -@websocket_api.async_response -async def websocket_get_network_settings( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Get ZHA network settings.""" - backup = async_get_active_network_settings(hass) - zha_gateway = get_zha_gateway(hass) - connection.send_result( - msg[ID], - { - "radio_type": async_get_radio_type(hass, zha_gateway.config_entry).name, - "device": zha_gateway.application_controller.config[CONF_DEVICE], - "settings": backup.as_dict(), - }, - ) - - -@websocket_api.require_admin -@websocket_api.websocket_command({vol.Required(TYPE): "zha/network/backups/list"}) -@websocket_api.async_response -async def websocket_list_network_backups( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Get ZHA network settings.""" - zha_gateway = get_zha_gateway(hass) - application_controller = zha_gateway.application_controller - - # Serialize known backups - connection.send_result( - msg[ID], [backup.as_dict() for backup in application_controller.backups] - ) - - -@websocket_api.require_admin -@websocket_api.websocket_command({vol.Required(TYPE): "zha/network/backups/create"}) -@websocket_api.async_response -async def websocket_create_network_backup( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Create a ZHA network backup.""" - zha_gateway = get_zha_gateway(hass) - application_controller = zha_gateway.application_controller - - # This can take 5-30s - backup = await application_controller.backups.create_backup(load_devices=True) - connection.send_result( - msg[ID], - { - "backup": backup.as_dict(), - "is_complete": backup.is_complete(), - }, - ) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/network/backups/restore", - vol.Required("backup"): _cv_zigpy_network_backup, - vol.Optional("ezsp_force_write_eui64", default=False): cv.boolean, - } -) -@websocket_api.async_response -async def websocket_restore_network_backup( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Restore a ZHA network backup.""" - zha_gateway = get_zha_gateway(hass) - application_controller = zha_gateway.application_controller - backup = msg["backup"] - - if msg["ezsp_force_write_eui64"]: - backup.network_info.stack_specific.setdefault("ezsp", {})[ - EZSP_OVERWRITE_EUI64 - ] = True - - # This can take 30-40s - try: - await application_controller.backups.restore_backup(backup) - except ValueError as err: - connection.send_error(msg[ID], websocket_api.const.ERR_INVALID_FORMAT, str(err)) - else: - connection.send_result(msg[ID]) - - -@websocket_api.require_admin -@websocket_api.websocket_command( - { - vol.Required(TYPE): "zha/network/change_channel", - vol.Required(ATTR_NEW_CHANNEL): vol.Any("auto", vol.Range(11, 26)), - } -) -@websocket_api.async_response -async def websocket_change_channel( - hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] -) -> None: - """Migrate the Zigbee network to a new channel.""" - new_channel = cast(Literal["auto"] | int, msg[ATTR_NEW_CHANNEL]) - await async_change_channel(hass, new_channel=new_channel) - connection.send_result(msg[ID]) - - -@callback -def async_load_api(hass: HomeAssistant) -> None: - """Set up the web socket API.""" - zha_gateway = get_zha_gateway(hass) - application_controller = zha_gateway.application_controller - - async def permit(service: ServiceCall) -> None: - """Allow devices to join this network.""" - duration: int = service.data[ATTR_DURATION] - ieee: EUI64 | None = service.data.get(ATTR_IEEE) - src_ieee: EUI64 - link_key: KeyData - if ATTR_SOURCE_IEEE in service.data: - src_ieee = service.data[ATTR_SOURCE_IEEE] - link_key = service.data[ATTR_INSTALL_CODE] - _LOGGER.info("Allowing join for %s device with link key", src_ieee) - await application_controller.permit_with_link_key( - time_s=duration, node=src_ieee, link_key=link_key - ) - return - - if ATTR_QR_CODE in service.data: - src_ieee, link_key = service.data[ATTR_QR_CODE] - _LOGGER.info("Allowing join for %s device with link key", src_ieee) - await application_controller.permit_with_link_key( - time_s=duration, node=src_ieee, link_key=link_key - ) - return - - if ieee: - _LOGGER.info("Permitting joins for %ss on %s device", duration, ieee) - else: - _LOGGER.info("Permitting joins for %ss", duration) - await application_controller.permit(time_s=duration, node=ieee) - - async_register_admin_service( - hass, DOMAIN, SERVICE_PERMIT, permit, schema=SERVICE_SCHEMAS[SERVICE_PERMIT] - ) - - async def remove(service: ServiceCall) -> None: - """Remove a node from the network.""" - zha_gateway = get_zha_gateway(hass) - ieee: EUI64 = service.data[ATTR_IEEE] - zha_device: ZHADevice | None = zha_gateway.get_device(ieee) - if zha_device is not None and zha_device.is_active_coordinator: - _LOGGER.info("Removing the coordinator (%s) is not allowed", ieee) - return - _LOGGER.info("Removing node %s", ieee) - await application_controller.remove(ieee) - - async_register_admin_service( - hass, DOMAIN, SERVICE_REMOVE, remove, schema=SERVICE_SCHEMAS[IEEE_SERVICE] - ) - - async def set_zigbee_cluster_attributes(service: ServiceCall) -> None: - """Set zigbee attribute for cluster on zha entity.""" - ieee: EUI64 = service.data[ATTR_IEEE] - endpoint_id: int = service.data[ATTR_ENDPOINT_ID] - cluster_id: int = service.data[ATTR_CLUSTER_ID] - cluster_type: str = service.data[ATTR_CLUSTER_TYPE] - attribute: int | str = service.data[ATTR_ATTRIBUTE] - value: int | bool | str = service.data[ATTR_VALUE] - manufacturer: int | None = service.data.get(ATTR_MANUFACTURER) - zha_device = zha_gateway.get_device(ieee) - response = None - if zha_device is not None: - response = await zha_device.write_zigbee_attribute( - endpoint_id, - cluster_id, - attribute, - value, - cluster_type=cluster_type, - manufacturer=manufacturer, - ) - else: - raise ValueError(f"Device with IEEE {str(ieee)} not found") - - _LOGGER.debug( - ( - "Set attribute for: %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s:" - " [%s] %s: [%s]" - ), - ATTR_CLUSTER_ID, - cluster_id, - ATTR_CLUSTER_TYPE, - cluster_type, - ATTR_ENDPOINT_ID, - endpoint_id, - ATTR_ATTRIBUTE, - attribute, - ATTR_VALUE, - value, - ATTR_MANUFACTURER, - manufacturer, - RESPONSE, - response, - ) - - async_register_admin_service( - hass, - DOMAIN, - SERVICE_SET_ZIGBEE_CLUSTER_ATTRIBUTE, - set_zigbee_cluster_attributes, - schema=SERVICE_SCHEMAS[SERVICE_SET_ZIGBEE_CLUSTER_ATTRIBUTE], - ) - - async def issue_zigbee_cluster_command(service: ServiceCall) -> None: - """Issue command on zigbee cluster on ZHA entity.""" - ieee: EUI64 = service.data[ATTR_IEEE] - endpoint_id: int = service.data[ATTR_ENDPOINT_ID] - cluster_id: int = service.data[ATTR_CLUSTER_ID] - cluster_type: str = service.data[ATTR_CLUSTER_TYPE] - command: int = service.data[ATTR_COMMAND] - command_type: str = service.data[ATTR_COMMAND_TYPE] - args: list | None = service.data.get(ATTR_ARGS) - params: dict | None = service.data.get(ATTR_PARAMS) - manufacturer: int | None = service.data.get(ATTR_MANUFACTURER) - zha_device = zha_gateway.get_device(ieee) - if zha_device is not None: - if cluster_id >= MFG_CLUSTER_ID_START and manufacturer is None: - manufacturer = zha_device.manufacturer_code - - await zha_device.issue_cluster_command( - endpoint_id, - cluster_id, - command, - command_type, - args, - params, - cluster_type=cluster_type, - manufacturer=manufacturer, - ) - _LOGGER.debug( - ( - "Issued command for: %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s]" - " %s: [%s] %s: [%s] %s: [%s]" - ), - ATTR_CLUSTER_ID, - cluster_id, - ATTR_CLUSTER_TYPE, - cluster_type, - ATTR_ENDPOINT_ID, - endpoint_id, - ATTR_COMMAND, - command, - ATTR_COMMAND_TYPE, - command_type, - ATTR_ARGS, - args, - ATTR_PARAMS, - params, - ATTR_MANUFACTURER, - manufacturer, - ) - else: - raise ValueError(f"Device with IEEE {str(ieee)} not found") - - async_register_admin_service( - hass, - DOMAIN, - SERVICE_ISSUE_ZIGBEE_CLUSTER_COMMAND, - issue_zigbee_cluster_command, - schema=SERVICE_SCHEMAS[SERVICE_ISSUE_ZIGBEE_CLUSTER_COMMAND], - ) - - async def issue_zigbee_group_command(service: ServiceCall) -> None: - """Issue command on zigbee cluster on a zigbee group.""" - group_id: int = service.data[ATTR_GROUP] - cluster_id: int = service.data[ATTR_CLUSTER_ID] - command: int = service.data[ATTR_COMMAND] - args: list = service.data[ATTR_ARGS] - manufacturer: int | None = service.data.get(ATTR_MANUFACTURER) - group = zha_gateway.get_group(group_id) - if cluster_id >= MFG_CLUSTER_ID_START and manufacturer is None: - _LOGGER.error("Missing manufacturer attribute for cluster: %d", cluster_id) - response = None - if group is not None: - cluster = group.endpoint[cluster_id] - response = await cluster.command( - command, *args, manufacturer=manufacturer, expect_reply=True - ) - _LOGGER.debug( - "Issued group command for: %s: [%s] %s: [%s] %s: %s %s: [%s] %s: %s", - ATTR_CLUSTER_ID, - cluster_id, - ATTR_COMMAND, - command, - ATTR_ARGS, - args, - ATTR_MANUFACTURER, - manufacturer, - RESPONSE, - response, - ) - - async_register_admin_service( - hass, - DOMAIN, - SERVICE_ISSUE_ZIGBEE_GROUP_COMMAND, - issue_zigbee_group_command, - schema=SERVICE_SCHEMAS[SERVICE_ISSUE_ZIGBEE_GROUP_COMMAND], - ) - - def _get_ias_wd_cluster_handler(zha_device): - """Get the IASWD cluster handler for a device.""" - cluster_handlers = { - ch.name: ch - for endpoint in zha_device.endpoints.values() - for ch in endpoint.claimed_cluster_handlers.values() - } - return cluster_handlers.get(CLUSTER_HANDLER_IAS_WD) - - async def warning_device_squawk(service: ServiceCall) -> None: - """Issue the squawk command for an IAS warning device.""" - ieee: EUI64 = service.data[ATTR_IEEE] - mode: int = service.data[ATTR_WARNING_DEVICE_MODE] - strobe: int = service.data[ATTR_WARNING_DEVICE_STROBE] - level: int = service.data[ATTR_LEVEL] - - if (zha_device := zha_gateway.get_device(ieee)) is not None: - if cluster_handler := _get_ias_wd_cluster_handler(zha_device): - await cluster_handler.issue_squawk(mode, strobe, level) - else: - _LOGGER.error( - "Squawking IASWD: %s: [%s] is missing the required IASWD cluster handler!", - ATTR_IEEE, - str(ieee), - ) - else: - _LOGGER.error( - "Squawking IASWD: %s: [%s] could not be found!", ATTR_IEEE, str(ieee) - ) - _LOGGER.debug( - "Squawking IASWD: %s: [%s] %s: [%s] %s: [%s] %s: [%s]", - ATTR_IEEE, - str(ieee), - ATTR_WARNING_DEVICE_MODE, - mode, - ATTR_WARNING_DEVICE_STROBE, - strobe, - ATTR_LEVEL, - level, - ) - - async_register_admin_service( - hass, - DOMAIN, - SERVICE_WARNING_DEVICE_SQUAWK, - warning_device_squawk, - schema=SERVICE_SCHEMAS[SERVICE_WARNING_DEVICE_SQUAWK], - ) - - async def warning_device_warn(service: ServiceCall) -> None: - """Issue the warning command for an IAS warning device.""" - ieee: EUI64 = service.data[ATTR_IEEE] - mode: int = service.data[ATTR_WARNING_DEVICE_MODE] - strobe: int = service.data[ATTR_WARNING_DEVICE_STROBE] - level: int = service.data[ATTR_LEVEL] - duration: int = service.data[ATTR_WARNING_DEVICE_DURATION] - duty_mode: int = service.data[ATTR_WARNING_DEVICE_STROBE_DUTY_CYCLE] - intensity: int = service.data[ATTR_WARNING_DEVICE_STROBE_INTENSITY] - - if (zha_device := zha_gateway.get_device(ieee)) is not None: - if cluster_handler := _get_ias_wd_cluster_handler(zha_device): - await cluster_handler.issue_start_warning( - mode, strobe, level, duration, duty_mode, intensity - ) - else: - _LOGGER.error( - "Warning IASWD: %s: [%s] is missing the required IASWD cluster handler!", - ATTR_IEEE, - str(ieee), - ) - else: - _LOGGER.error( - "Warning IASWD: %s: [%s] could not be found!", ATTR_IEEE, str(ieee) - ) - _LOGGER.debug( - "Warning IASWD: %s: [%s] %s: [%s] %s: [%s] %s: [%s]", - ATTR_IEEE, - str(ieee), - ATTR_WARNING_DEVICE_MODE, - mode, - ATTR_WARNING_DEVICE_STROBE, - strobe, - ATTR_LEVEL, - level, - ) - - async_register_admin_service( - hass, - DOMAIN, - SERVICE_WARNING_DEVICE_WARN, - warning_device_warn, - schema=SERVICE_SCHEMAS[SERVICE_WARNING_DEVICE_WARN], - ) - - websocket_api.async_register_command(hass, websocket_permit_devices) - websocket_api.async_register_command(hass, websocket_get_devices) - websocket_api.async_register_command(hass, websocket_get_groupable_devices) - websocket_api.async_register_command(hass, websocket_get_groups) - websocket_api.async_register_command(hass, websocket_get_device) - websocket_api.async_register_command(hass, websocket_get_group) - websocket_api.async_register_command(hass, websocket_add_group) - websocket_api.async_register_command(hass, websocket_remove_groups) - websocket_api.async_register_command(hass, websocket_add_group_members) - websocket_api.async_register_command(hass, websocket_remove_group_members) - websocket_api.async_register_command(hass, websocket_bind_group) - websocket_api.async_register_command(hass, websocket_unbind_group) - websocket_api.async_register_command(hass, websocket_reconfigure_node) - websocket_api.async_register_command(hass, websocket_device_clusters) - websocket_api.async_register_command(hass, websocket_device_cluster_attributes) - websocket_api.async_register_command(hass, websocket_device_cluster_commands) - websocket_api.async_register_command(hass, websocket_read_zigbee_cluster_attributes) - websocket_api.async_register_command(hass, websocket_get_bindable_devices) - websocket_api.async_register_command(hass, websocket_bind_devices) - websocket_api.async_register_command(hass, websocket_unbind_devices) - websocket_api.async_register_command(hass, websocket_update_topology) - websocket_api.async_register_command(hass, websocket_get_configuration) - websocket_api.async_register_command(hass, websocket_update_zha_configuration) - websocket_api.async_register_command(hass, websocket_get_network_settings) - websocket_api.async_register_command(hass, websocket_list_network_backups) - websocket_api.async_register_command(hass, websocket_create_network_backup) - websocket_api.async_register_command(hass, websocket_restore_network_backup) - websocket_api.async_register_command(hass, websocket_change_channel) - - -@callback -def async_unload_api(hass: HomeAssistant) -> None: - """Unload the ZHA API.""" - hass.services.async_remove(DOMAIN, SERVICE_PERMIT) - hass.services.async_remove(DOMAIN, SERVICE_REMOVE) - hass.services.async_remove(DOMAIN, SERVICE_SET_ZIGBEE_CLUSTER_ATTRIBUTE) - hass.services.async_remove(DOMAIN, SERVICE_ISSUE_ZIGBEE_CLUSTER_COMMAND) - hass.services.async_remove(DOMAIN, SERVICE_ISSUE_ZIGBEE_GROUP_COMMAND) - hass.services.async_remove(DOMAIN, SERVICE_WARNING_DEVICE_SQUAWK) - hass.services.async_remove(DOMAIN, SERVICE_WARNING_DEVICE_WARN) diff --git a/zha/core/__init__.py b/zha/zigbee/__init__.py similarity index 100% rename from zha/core/__init__.py rename to zha/zigbee/__init__.py diff --git a/zha/core/cluster_handlers/__init__.py b/zha/zigbee/cluster_handlers/__init__.py similarity index 100% rename from zha/core/cluster_handlers/__init__.py rename to zha/zigbee/cluster_handlers/__init__.py diff --git a/zha/core/cluster_handlers/closures.py b/zha/zigbee/cluster_handlers/closures.py similarity index 100% rename from zha/core/cluster_handlers/closures.py rename to zha/zigbee/cluster_handlers/closures.py diff --git a/zha/core/cluster_handlers/general.py b/zha/zigbee/cluster_handlers/general.py similarity index 100% rename from zha/core/cluster_handlers/general.py rename to zha/zigbee/cluster_handlers/general.py diff --git a/zha/core/cluster_handlers/helpers.py b/zha/zigbee/cluster_handlers/helpers.py similarity index 100% rename from zha/core/cluster_handlers/helpers.py rename to zha/zigbee/cluster_handlers/helpers.py diff --git a/zha/core/cluster_handlers/homeautomation.py b/zha/zigbee/cluster_handlers/homeautomation.py similarity index 100% rename from zha/core/cluster_handlers/homeautomation.py rename to zha/zigbee/cluster_handlers/homeautomation.py diff --git a/zha/core/cluster_handlers/hvac.py b/zha/zigbee/cluster_handlers/hvac.py similarity index 100% rename from zha/core/cluster_handlers/hvac.py rename to zha/zigbee/cluster_handlers/hvac.py diff --git a/zha/core/cluster_handlers/lighting.py b/zha/zigbee/cluster_handlers/lighting.py similarity index 100% rename from zha/core/cluster_handlers/lighting.py rename to zha/zigbee/cluster_handlers/lighting.py diff --git a/zha/core/cluster_handlers/lightlink.py b/zha/zigbee/cluster_handlers/lightlink.py similarity index 100% rename from zha/core/cluster_handlers/lightlink.py rename to zha/zigbee/cluster_handlers/lightlink.py diff --git a/zha/core/cluster_handlers/manufacturerspecific.py b/zha/zigbee/cluster_handlers/manufacturerspecific.py similarity index 100% rename from zha/core/cluster_handlers/manufacturerspecific.py rename to zha/zigbee/cluster_handlers/manufacturerspecific.py diff --git a/zha/core/cluster_handlers/measurement.py b/zha/zigbee/cluster_handlers/measurement.py similarity index 100% rename from zha/core/cluster_handlers/measurement.py rename to zha/zigbee/cluster_handlers/measurement.py diff --git a/zha/core/cluster_handlers/protocol.py b/zha/zigbee/cluster_handlers/protocol.py similarity index 100% rename from zha/core/cluster_handlers/protocol.py rename to zha/zigbee/cluster_handlers/protocol.py diff --git a/zha/core/cluster_handlers/security.py b/zha/zigbee/cluster_handlers/security.py similarity index 100% rename from zha/core/cluster_handlers/security.py rename to zha/zigbee/cluster_handlers/security.py diff --git a/zha/core/cluster_handlers/smartenergy.py b/zha/zigbee/cluster_handlers/smartenergy.py similarity index 100% rename from zha/core/cluster_handlers/smartenergy.py rename to zha/zigbee/cluster_handlers/smartenergy.py diff --git a/zha/core/device.py b/zha/zigbee/device.py similarity index 100% rename from zha/core/device.py rename to zha/zigbee/device.py diff --git a/zha/core/endpoint.py b/zha/zigbee/endpoint.py similarity index 100% rename from zha/core/endpoint.py rename to zha/zigbee/endpoint.py diff --git a/zha/core/group.py b/zha/zigbee/group.py similarity index 100% rename from zha/core/group.py rename to zha/zigbee/group.py