Skip to content

Commit

Permalink
feat: auto-reconnect on restart
Browse files Browse the repository at this point in the history
  • Loading branch information
GetPsyched committed Jan 19, 2024
1 parent fd0269d commit cd52f5b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 3 deletions.
65 changes: 62 additions & 3 deletions charachorder/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import time
from typing import TYPE_CHECKING, Callable, Generator, Literal, NamedTuple

from serial import Serial
from serial import Serial, serialutil
from serial.tools import list_ports

from .errors import (
InvalidParameter,
InvalidParameterInput,
ReconnectTimeout,
SerialConnectionNotFound,
TooManyDevices,
UnknownCommand,
UnknownProduct,
UnknownVendor,
Expand Down Expand Up @@ -238,8 +240,65 @@ def set_keymap(
self.execute("VAR", "B4", code.value, index, action_id)[0] == "0", commit
)

def restart(self):
self.execute("RST")
def restart(self, *, timeout: float = 10.0):
if self.connection.is_open is False:
raise SerialConnectionNotFound
self.connection.write(f"RST\r\n".encode("utf-8"))

# Attempt to reconnect

def is_same_device(product_id: int, vendor_id: int) -> bool:
return product_id == self.product_id and vendor_id == self.vendor_id

start_time = time.time()
while time.time() - start_time < timeout:
try:
ports = list_ports.comports()
except TypeError:
# Weirdly, the list_ports.comports() can fail during the device restart
#
# Traceback (most recent call last):
# ... snip ...
# File ".../serial/tools/list_ports_linux.py", line 102, in comports
# for info in [SysFS(d) for d in devices]
# File ".../serial/tools/list_ports_linux.py", line 102, in <listcomp>
# for info in [SysFS(d) for d in devices]
# File ".../serial/tools/list_ports_linux.py", line 52, in __init__
# self.vid = int(self.read_line(self.usb_device_path, 'idVendor'), 16)
# TypeError: int() can't convert non-string with explicit base
continue

comports = [port for port in ports if is_same_device(port.pid, port.vid)]

# Device is restarting
if len(comports) == 0:
continue

# Device found
elif len(comports) == 1:
port = comports[0].device

# This is true initially when the device hasn't fully shutdown yet
is_same_session = port == self.port
if is_same_session:
continue

try:
self.connection.close()
self.connection.port = port
self.connection.open()
except serialutil.SerialException:
# There is a brief period after the device
# restarts where you cannot open a connection to it
continue

break

# More than one device of the same model was found
elif len(comports) > 1:
raise TooManyDevices
else:
raise ReconnectTimeout

def factory_reset(self):
self.execute("RST", "FACTORY")
Expand Down
22 changes: 22 additions & 0 deletions charachorder/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ class SerialException(CharaChorderException):
"""An exception raised when something went wrong during Serial I/O"""


class AutoReconnectFailure(SerialException):
"""An exception raised when auto-reconnect after a restart failed"""


class ReconnectTimeout(AutoReconnectFailure):
"""An exception raised when auto-reconnect failed due to a timeout"""

def __init__(self) -> None:
super().__init__(
"Auto reconnect after the restart has failed. You will need to re-create this object to do further Serial I/O"
)


class TooManyDevices(AutoReconnectFailure):
"""An exception raised when auto-reconnect failed due to ambiguity in devices"""

def __init__(self) -> None:
super().__init__(
"It looks like you have two or more devices of the same model connected simultaneously. This causes the restart to not reconnect to the correct device. You will need to re-create this object to do further Serial I/O"
)


class SerialConnectionNotFound(SerialException):
def __init__(self) -> None:
super().__init__(
Expand Down

0 comments on commit cd52f5b

Please sign in to comment.