diff --git a/.github/workflows/ruff.yaml b/.github/workflows/ruff.yaml index 68bfa4a..ddd8939 100644 --- a/.github/workflows/ruff.yaml +++ b/.github/workflows/ruff.yaml @@ -1,11 +1,11 @@ -name: Ruff +name: Python on: push: branches: [ main ] pull_request: branches: [ main ] jobs: - ruff: + lint: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/unittest.yaml b/.github/workflows/unittest.yaml new file mode 100644 index 0000000..2403a0c --- /dev/null +++ b/.github/workflows/unittest.yaml @@ -0,0 +1,22 @@ +name: Python +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Setup Python + uses: actions/setup-python@v3 + with: + python-version: "3.12" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install homeassistant + - name: Run tests + run: | + python -m unittest discover -s tests/ -v diff --git a/README.md b/README.md index 080458c..7904f68 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,9 @@ # ha-vinx + Custom integration for controlling Lightware VINX encoders and decoders + +## Tests + +```bash +python3 -m unittest discover -s tests/ -v +``` diff --git a/custom_components/vinx/__init__.py b/custom_components/vinx/__init__.py index eb88c31..d0cbcc8 100644 --- a/custom_components/vinx/__init__.py +++ b/custom_components/vinx/__init__.py @@ -9,7 +9,7 @@ from custom_components.vinx.const import DOMAIN from custom_components.vinx.lw3 import LW3 -PLATFORMS: list[Platform] = [Platform.MEDIA_PLAYER] +PLATFORMS: list[Platform] = [Platform.MEDIA_PLAYER, Platform.BUTTON] @dataclass diff --git a/custom_components/vinx/button.py b/custom_components/vinx/button.py new file mode 100644 index 0000000..ca04e37 --- /dev/null +++ b/custom_components/vinx/button.py @@ -0,0 +1,52 @@ +import logging + +from homeassistant.components.button import ButtonDeviceClass, ButtonEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.helpers.device_registry import DeviceInfo + +from custom_components.vinx import LW3, DeviceInformation, VinxRuntimeData + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup_entry(hass, entry: ConfigEntry, async_add_entities): + # Extract stored runtime data + runtime_data: VinxRuntimeData = entry.runtime_data + _LOGGER.info(f"Runtime data: {runtime_data}") + + # Add entity to Home Assistant + async_add_entities([VinxRebootButtonEntity(runtime_data.lw3, runtime_data.device_information)]) + + +class VinxRebootButtonEntity(ButtonEntity): + def __init__(self, lw3: LW3, device_information: DeviceInformation) -> None: + self._lw3 = lw3 + self._device_information = device_information + + _attr_device_class = ButtonDeviceClass.RESTART + + @property + def unique_id(self) -> str | None: + return f"vinx_{self._device_information.mac_address}_reboot_button" + + @property + def device_info(self) -> DeviceInfo: + return self._device_information.device_info + + @property + def name(self): + # Use increasingly less descriptive names depending on what information is available + device_label = self._device_information.device_label + serial_number = self._device_information.device_info.get("serial_number") + + if device_label: + return f"{self._device_information.device_label} reboot button" + elif serial_number: + return f"VINX {serial_number} reboot button" + else: + return "VINX reboot button" + + async def async_press(self) -> None: + async with self._lw3.connection(): + _LOGGER.info("Issuing device reset") + await self._lw3.call("/SYS", "reset(1)") diff --git a/custom_components/vinx/lw3.py b/custom_components/vinx/lw3.py index 9418746..07b2bb3 100644 --- a/custom_components/vinx/lw3.py +++ b/custom_components/vinx/lw3.py @@ -3,16 +3,17 @@ from asyncio import StreamReader, StreamWriter from dataclasses import dataclass +from enum import Enum @dataclass -class Response: +class SingleLineResponse: prefix: str path: str @dataclass -class PropertyResponse(Response): +class PropertyResponse(SingleLineResponse): value: str def __str__(self): @@ -20,7 +21,7 @@ def __str__(self): @dataclass -class ErrorResponse(Response): +class ErrorResponse(SingleLineResponse): code: int message: str @@ -28,6 +29,81 @@ def __str__(self): return self.message +@dataclass +class NodeResponse(SingleLineResponse): + pass + + +@dataclass +class MethodResponse(SingleLineResponse): + name: str + + def __str__(self): + return self.name + + +type MultiLineResponse = list[SingleLineResponse] +type Response = SingleLineResponse | MultiLineResponse + + +class ResponseType(Enum): + Node = 1 + Property = 2 + Error = 3 + Method = 4 + + +def get_response_type(response: str) -> ResponseType: + if response[1] == "E": + return ResponseType.Error + elif response[0] == "p": + return ResponseType.Property + elif response[0] == "n": + return ResponseType.Node + elif response[0] == "m": + return ResponseType.Method + + raise ValueError("Unknown response type") + + +def parse_single_line_response(response: str) -> SingleLineResponse: + match get_response_type(response): + case ResponseType.Error: + matches = re.search(r"^(.E) (.*) %(E[0-9]+):(.*)$", response) + return ErrorResponse(matches.group(1), matches.group(2), matches.group(3), matches.group(4)) + case ResponseType.Property: + matches = re.fullmatch(r"^p(.*) (.*)=(.*)$", response) + return PropertyResponse(f"p{matches.group(1)}", matches.group(2), matches.group(3)) + case ResponseType.Node: + matches = re.fullmatch(r"^n(.*) (.*)$", response) + return NodeResponse(f"n{matches.group(1)}", matches.group(2)) + case ResponseType.Method: + matches = re.fullmatch(r"^m(.*) (.*):(.*)$", response) + return MethodResponse(f"m{matches.group(1)}", matches.group(2), matches.group(3)) + + +def parse_multiline_response(lines: list[str]) -> MultiLineResponse: + return [parse_single_line_response(response) for response in lines] + + +def parse_response(response: str) -> Response: + lines = response.split("\r\n") + + # Determine if we're dealing with a single line response or multiple + if len(lines) == 3: + return parse_single_line_response(lines[1]) + else: + return parse_multiline_response(lines[1:-1]) + + +def is_encoder_discovery_node(node: Response) -> bool: + return isinstance(node, NodeResponse) and "TX" in node.path + + +def is_decoder_discovery_node(node: Response) -> bool: + return isinstance(node, NodeResponse) and "RX" in node.path + + class LW3: def __init__(self, hostname: str, port: int, timeout: int = 5): self._hostname = hostname @@ -57,51 +133,74 @@ async def _disconnect(self): self._writer.close() await self._writer.wait_closed() - @staticmethod - def _is_error_response(response: str) -> bool: - return response[1] == "E" - - @staticmethod - def parse_response(response: str) -> PropertyResponse | ErrorResponse: - if LW3._is_error_response(response): - matches = re.search(r"^(.E) (.*) %(E[0-9]+):(.*)$", response) - return ErrorResponse(matches.group(1), matches.group(2), matches.group(3), matches.group(4)) - - matches = re.fullmatch(r"^(.*) (.*)=(.*)$", response) - return PropertyResponse(matches.group(1), matches.group(2), matches.group(3)) - - async def _read_and_parse_response(self) -> PropertyResponse: - response = await self._read_until("\r\n") + async def _read_and_parse_response(self) -> Response: + # All commands are wrapped with a signature, so read until the end delimiter + response = await self._read_until("}") if response is None: raise EOFError("Reached EOF while reading, connection probably lost") - result = self.parse_response(response.strip()) + result = parse_response(response.strip()) if isinstance(result, ErrorResponse): raise ValueError(result) return result - async def _run_get_property(self, path: str) -> PropertyResponse: + async def _run_get(self, path: str) -> Response: async with self._semaphore: - self._writer.write(f"GET {path}\r\n".encode()) + self._writer.write(f"0000#GET {path}\r\n".encode()) await self._writer.drain() return await self._read_and_parse_response() - async def _run_set_property(self, path: str, value: str) -> PropertyResponse: + async def _run_set(self, path: str, value: str) -> Response: async with self._semaphore: - self._writer.write(f"SET {path}={value}\r\n".encode()) + self._writer.write(f"0000#SET {path}={value}\r\n".encode()) + await self._writer.drain() + + return await self._read_and_parse_response() + + async def _run_get_all(self, path: str) -> Response: + async with self._semaphore: + self._writer.write(f"0000#GETALL {path}\r\n".encode()) + await self._writer.drain() + + return await self._read_and_parse_response() + + async def _run_call(self, path: str, method: str) -> Response: + async with self._semaphore: + self._writer.write(f"0000#CALL {path}:{method}\r\n".encode()) await self._writer.drain() return await self._read_and_parse_response() async def get_property(self, path: str) -> PropertyResponse: - return await asyncio.wait_for(self._run_get_property(path), self._timeout) + response = await asyncio.wait_for(self._run_get(path), self._timeout) + + if not isinstance(response, PropertyResponse): + raise ValueError(f"Requested path {path} does not return a property") + + return response async def set_property(self, path: str, value: str) -> PropertyResponse: - return await asyncio.wait_for(self._run_set_property(path, value), self._timeout) + response = await asyncio.wait_for(self._run_set(path, value), self._timeout) + + if not isinstance(response, PropertyResponse): + raise ValueError(f"Requested path {path} does not return a property") + + return response + + async def get_all(self, path: str) -> Response: + return await asyncio.wait_for(self._run_get_all(path), self._timeout) + + async def call(self, path: str, method: str) -> MethodResponse: + response = await asyncio.wait_for(self._run_call(path, method), self._timeout) + + if not isinstance(response, MethodResponse): + raise ValueError(f"Called method {path}:{method} does not return a method response") + + return response class LW3ConnectionContext: diff --git a/custom_components/vinx/manifest.json b/custom_components/vinx/manifest.json index 8fccf7a..e6b1fba 100644 --- a/custom_components/vinx/manifest.json +++ b/custom_components/vinx/manifest.json @@ -11,7 +11,7 @@ "documentation": "https://www.home-assistant.io/integrations/vinx", "homekit": {}, "iot_class": "local_polling", - "requirements": [], + "requirements": ["bidict"], "ssdp": [], "zeroconf": ["_lwr3._tcp.local."] } diff --git a/custom_components/vinx/media_player.py b/custom_components/vinx/media_player.py index 54e92e5..b7d53e3 100644 --- a/custom_components/vinx/media_player.py +++ b/custom_components/vinx/media_player.py @@ -1,9 +1,11 @@ import logging +from bidict import bidict from homeassistant.components.media_player import MediaPlayerEntity, MediaPlayerEntityFeature, MediaPlayerState from homeassistant.helpers.device_registry import DeviceInfo from custom_components.vinx import LW3, DeviceInformation, VinxRuntimeData +from custom_components.vinx.lw3 import NodeResponse, is_encoder_discovery_node _LOGGER = logging.getLogger(__name__) @@ -66,11 +68,62 @@ def name(self): class VinxEncoder(AbstractVinxMediaPlayerEntity): - pass + async def async_update(self): + async with self._lw3.connection(): + # Query signal status + signal_present = await self._lw3.get_property("/MEDIA/VIDEO/I1.SignalPresent") + self._state = MediaPlayerState.PLAYING if str(signal_present) == "1" else MediaPlayerState.IDLE class VinxDecoder(AbstractVinxMediaPlayerEntity): def __init__(self, lw3: LW3, device_information: DeviceInformation) -> None: super().__init__(lw3, device_information) + self._source = None + self._source_list = None + self._source_bidict = bidict() - _attr_supported_features = MediaPlayerEntityFeature.SELECT_SOURCE + _attr_supported_features = MediaPlayerEntityFeature.SELECT_SOURCE + + async def async_update(self): + # Populate the source list only once. Sort it alphabetically, since the order of discovered devices + # may differ from device to device. + if self._source_list is None: + await self.populate_source_bidict() + self._source_list = sorted(list(self._source_bidict.values())) + _LOGGER.info(f"{self.name} source list populated with {len(self._source_list)} sources") + + async with self._lw3.connection(): + # Query current source + video_channel_id = await self._lw3.get_property("/SYS/MB/PHY.VideoChannelId") + self._source = str(self._source_bidict.get(str(video_channel_id))) + + # Query signal status + signal_present = await self._lw3.get_property("/MEDIA/VIDEO/I1.SignalPresent") + self._state = MediaPlayerState.PLAYING if str(signal_present) == "1" else MediaPlayerState.IDLE + + @property + def source(self) -> str | None: + return self._source + + @property + def source_list(self) -> list[str] | None: + return self._source_list + + async def async_select_source(self, source: str) -> None: + self._source = source + video_channel_id = self._source_bidict.inverse.get(source) + + async with self._lw3.connection(): + await self._lw3.set_property("/SYS/MB/PHY.VideoChannelId", video_channel_id) + + async def populate_source_bidict(self): + """Queries the device for discovered devices, filters out everything that isn't a VINX encoder, + then builds a bidict mapping between the device label and video channel ID.""" + async with self._lw3.connection(): + discovery_nodes = await self._lw3.get_all("/DISCOVERY") + encoder_nodes: list[NodeResponse] = list(filter(is_encoder_discovery_node, discovery_nodes)) + + for encoder_node in encoder_nodes: + device_name = await self._lw3.get_property(f"{encoder_node.path}.DeviceName") + video_channel_id = await self._lw3.get_property(f"{encoder_node.path}.VideoChannelId") + self._source_bidict.put(str(video_channel_id), str(device_name)) diff --git a/tests/test_lw3.py b/tests/test_lw3.py new file mode 100644 index 0000000..ed8a69e --- /dev/null +++ b/tests/test_lw3.py @@ -0,0 +1,99 @@ +from unittest import TestCase + +from custom_components.vinx.lw3 import ( + ErrorResponse, + MethodResponse, + NodeResponse, + PropertyResponse, + ResponseType, + get_response_type, + is_decoder_discovery_node, + is_encoder_discovery_node, + parse_response, + parse_single_line_response, +) + + +class TestResponseParsing(TestCase): + def test_get_response_type(self): + self.assertEqual(ResponseType.Error, get_response_type("nE FOO %E0001:Syntax error")) + self.assertEqual(ResponseType.Property, get_response_type("pr /.ProductName=VINX")) + self.assertEqual(ResponseType.Node, get_response_type("n- /SYS")) + self.assertEqual(ResponseType.Method, get_response_type("m- reset()")) + + def test_parse_single_line_response(self): + raw_response = "n- /LOGIN" + response = parse_single_line_response(raw_response) + self.assertIsInstance(response, NodeResponse) + self.assertDictEqual({"prefix": "n-", "path": "/LOGIN"}, response.__dict__) + + raw_response = "-E HURR %E001:Syntax error" + response = parse_single_line_response(raw_response) + self.assertIsInstance(response, ErrorResponse) + self.assertDictEqual( + {"prefix": "-E", "path": "HURR", "code": "E001", "message": "Syntax error"}, response.__dict__ + ) + + raw_response = "pr /.ProductName=VINX-110-HDMI-DEC" + response = parse_single_line_response(raw_response) + self.assertIsInstance(response, PropertyResponse) + self.assertDictEqual( + { + "prefix": "pr", + "path": "/.ProductName", + "value": "VINX-110-HDMI-DEC", + }, + response.__dict__, + ) + + raw_response = "m- /SYS:factoryDefaults" + response = parse_single_line_response(raw_response) + self.assertIsInstance(response, MethodResponse) + self.assertDictEqual( + { + "prefix": "m-", + "path": "/SYS", + "name": "factoryDefaults", + }, + response.__dict__, + ) + + def test_parse_response(self): + raw_response = """{0000\r +n- /LOGIN\r +n- /MEDIA\r +n- /SYS\r +n- /MANAGEMENT\r +n- /DISCOVERY\r +n- /EDID\r +pr /.ProductName=VINX-110-HDMI-DEC\r +pr /.ProductPartNumber=91810003\r +pr /.SerialNumber=E8013A\r +pr /.MacAddress=00:11:AA:E8:01:3A\r +pr /.PackageVersion=v3.2.2b3 r1\r +pr /.FirmwareVersion=7.4.1\r +pr /.CoreVersion=v3.2.2b1 r1\r +} +""" + response = parse_response(raw_response) + self.assertEqual(13, len(response)) + + first = response[0] + self.assertIsInstance(first, NodeResponse) + self.assertEqual("n-", first.prefix) + self.assertEqual("/LOGIN", first.path) + last = response[12] + self.assertIsInstance(last, PropertyResponse) + self.assertEqual("pr", last.prefix) + self.assertEqual("/.CoreVersion", last.path) + self.assertEqual("v3.2.2b1 r1", last.value) + + +class TestDiscoveryNodes(TestCase): + def test_is_discovery_node(self): + encoder_node = NodeResponse("n-", "/DISCOVERY/TXE00143") + decoder_node = NodeResponse("n-", "/DISCOVERY/RXE8011D") + self.assertTrue(is_encoder_discovery_node(encoder_node)) + self.assertFalse(is_encoder_discovery_node(decoder_node)) + self.assertTrue(is_decoder_discovery_node(decoder_node)) + self.assertFalse(is_decoder_discovery_node(encoder_node))