Skip to content

Commit

Permalink
Merge branch 'feature_ir_update' into integrate-feature-continuousrun…
Browse files Browse the repository at this point in the history
…-remoteupdate
  • Loading branch information
t-sasatani committed Nov 1, 2024
2 parents 728549a + a9ead50 commit 508abdd
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 108 deletions.
2 changes: 2 additions & 0 deletions miniscope_io/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import click

from miniscope_io.cli.stream import stream
from miniscope_io.cli.update import update


@click.group()
Expand All @@ -18,3 +19,4 @@ def cli(ctx: click.Context) -> None:


cli.add_command(stream)
cli.add_command(update)
37 changes: 37 additions & 0 deletions miniscope_io/cli/update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
CLI for updating device over IR or UART.
"""

import click

from miniscope_io.device_update import DevUpdate


@click.command()
@click.option(
"-p",
"--port",
required=False,
help="Serial port to connect to. Needed if multiple FTDI devices are connected.",
)
@click.option(
"-i",
"--device_id",
required=False,
default=0,
type=int,
help="ID of the device to update. 0 will update all devices.",
)
@click.option(
"-t",
"--target",
required=True,
type=click.Choice(["LED", "GAIN", "ROI_X", "ROI_Y"]),
help="Target to update",
)
@click.option("-v", "--value", required=True, type=int, help="Value to set")
def update(port: str, target: str, value: int, device_id: int) -> None:
"""
Update device configuration.
"""
DevUpdate(port=port, target=target, value=value, device_id=device_id)
269 changes: 162 additions & 107 deletions miniscope_io/device_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,138 +7,193 @@
"""

import argparse
import sys
import time
from enum import Enum
from typing import Optional

import numpy as np
import serial
import serial.tools.list_ports
from pydantic import BaseModel, ConfigDict, field_validator, model_validator

from miniscope_io import init_logger
from miniscope_io.logging import init_logger

# Parsers for update LED
updateDeviceParser = argparse.ArgumentParser("updateDevice")
updateDeviceParser.add_argument("port", help="serial port")
updateDeviceParser.add_argument("baudrate", help="baudrate")
updateDeviceParser.add_argument("module", help="module to update")
updateDeviceParser.add_argument("value", help="LED value")
logger = init_logger(name="device_update", level="DEBUG")


def updateDevice() -> None:
"""
Script to update hardware settings over a generic UART-USB converter.
This script currently supports updating the excitation LED brightness and
electrical wetting lens driver gain.
.. note::
class UpdateTarget(Enum):
"""Targets to update."""

Not tested after separating from stream_daq.py.
LED = 0
GAIN = 1
ROI_X = 2
ROI_Y = 3
ROI_WIDTH = 4 # not implemented
ROI_HEIGHT = 5 # not implemented
EWL = 6 # not implemented

Examples
--------
>>> updateDevice [COM port] [baudrate] [module] [value]

..todo::
Test to see if changing package structure broke anything.
class DevUpdateCommand(BaseModel):
"""
Command to update device configuration.
"""
logger = init_logger("streamDaq")

args = updateDeviceParser.parse_args()
moduleList = ["LED", "EWL"]

ledMAX = 100
ledMIN = 0

ewlMAX = 255
ewlMIN = 0
device_id: int
port: str
target: UpdateTarget
value: int

model_config = ConfigDict(arbitrary_types_allowed=True)

@model_validator(mode="after")
def validate_values(cls, values: dict) -> dict:
"""
Validate values based on target.
"""
target = values.target
value = values.value

if target == UpdateTarget.LED:
assert 0 <= value <= 100, "For LED, value must be between 0 and 100"
elif target == UpdateTarget.GAIN:
assert value in [1, 2, 4], "For GAIN, value must be 1, 2, or 4"
return values

@field_validator("port")
def validate_port(cls, value: str) -> str:
"""
Validate port.
Args:
value: Port to validate.
Returns:
Validated port.
Raises:
ValueError: If no serial ports found or port not found.
"""
portlist = list(serial.tools.list_ports.comports())

if len(portlist) == 0:
raise ValueError("No serial ports found")
if value not in [port.device for port in portlist]:
raise ValueError(f"Port {value} not found")
return value

@field_validator("target", mode="before")
def validate_target(cls, value: str) -> UpdateTarget:
"""
Validate and convert target string to UpdateTarget Enum type.
Args:
value (str): Target to validate.
Returns:
UpdateTarget: Validated target as UpdateTarget.
Raises:
ValueError: If target not found.
"""
try:
return UpdateTarget[value]
except KeyError as e:
raise ValueError(f"Target {value} not found.") from e


def DevUpdate(
target: str,
value: int,
device_id: int,
port: Optional[str] = None,
) -> None:
"""
IR-based update of device configuration.
ledDeviceTag = 0 # 2-bits each for now
ewlDeviceTag = 1 # 2-bits each for now
.. note::
deviceTagPos = 4
preamblePos = 6
Not tested after separating from stream_daq.py.
Preamble = [2, 1] # 2-bits each for now
Args:
device_id: ID of the device. 0 will update all devices.
port: Serial port to which the device is connected.
target: What to update on the device (e.g., LED or GAIN).
value: Value to which the target should be updated.
uartPayload = 4
uartRepeat = 5
uartTimeGap = 0.01
Returns:
None
"""

try:
assert len(vars(args)) == 4
except AssertionError as msg:
logger.exception("Usage: updateDevice [COM port] [baudrate] [module] [value]")
raise msg
if port:
logger.info(f"Using port {port}")
else:
ftdi_port_list = find_ftdi_device()
if len(ftdi_port_list) == 0:
raise ValueError("No FTDI devices found.")
if len(ftdi_port_list) > 1:
raise ValueError("Multiple FTDI devices found. Please specify the port.")
if len(ftdi_port_list) == 1:
port = ftdi_port_list[0]
logger.info(f"Using port {port}")

command = DevUpdateCommand(device_id=device_id, port=port, target=target, value=value)
logger.info(f"Updating {target} to {value} on port {port}")

# Header to indicate target/value.
# This should be a bit pattern that is unlikely to be the value.
id_header = 0b00000000
target_header = 0b11000000
LSB_header = 0b01000000
MSB_header = 0b10000000
LSB_value_mask = 0b000000111111 # value below 12-bit
MSB_value_mask = 0b111111000000 # value below 12-bit
reset_byte = 0b11111111

try:
comport = str(args.port)
except (ValueError, IndexError) as e:
serial_port = serial.Serial(port=command.port, baudrate=2400, timeout=5, stopbits=2)
except Exception as e:
logger.exception(e)
raise e
logger.info("Open serial port")

try:
baudrate = int(args.baudrate)
except (ValueError, IndexError) as e:
logger.exception(e)
raise e
id_command = (command.device_id + id_header) & 0xFF
serial_port.write(id_command.to_bytes(1, "big"))
logger.debug(f"Command: {format(id_command, '08b')}; Device ID: {command.device_id}")
time.sleep(0.1)

try:
module = str(args.module)
assert module in moduleList
except AssertionError as msg:
err_str = "Available modules:\n"
for module in moduleList:
err_str += "\t" + module + "\n"
logger.exception(err_str)
raise msg
target_command = (command.target.value + target_header) & 0xFF
serial_port.write(target_command.to_bytes(1, "big"))
logger.debug(f"Command: {format(target_command, '08b')}; Target: {command.target.name}")
time.sleep(0.1)

try:
value = int(args.value)
except Exception as e:
logger.exception("Value needs to be an integer")
raise e
value_LSB_command = ((command.value & LSB_value_mask) + LSB_header) & 0xFF
serial_port.write(value_LSB_command.to_bytes(1, "big"))
logger.debug(f"Command: {format(value_LSB_command, '08b')}; Value: {command.value} (LSB)")
time.sleep(0.1)

try:
if module == "LED":
assert value <= ledMAX and value >= ledMIN
if module == "EWL":
assert value <= ewlMAX and value >= ewlMIN
except AssertionError as msg:
if module == "LED":
logger.exception("LED value need to be a integer within 0-100")
if module == "EWL":
logger.exception("EWL value need to be an integer within 0-255")
raise msg

if module == "LED":
deviceTag = ledDeviceTag << deviceTagPos
elif module == "EWL":
deviceTag = ewlDeviceTag << deviceTagPos

command = [0, 0]

command[0] = int(
Preamble[0] * 2**preamblePos + deviceTag + np.floor(value / (2**uartPayload))
).to_bytes(1, "big")
command[1] = int(
Preamble[1] * 2**preamblePos + deviceTag + value % (2**uartPayload)
).to_bytes(1, "big")

# set up serial port
try:
serial_port = serial.Serial(port=comport, baudrate=baudrate, timeout=5, stopbits=1)
except Exception as e:
logger.exception(e)
raise e
logger.info("Open serial port")
value_MSB_command = (((command.value & MSB_value_mask) >> 6) + MSB_header) & 0xFF
serial_port.write(value_MSB_command.to_bytes(1, "big"))
logger.debug(f"Command: {format(value_MSB_command, '08b')}; Value: {command.value} (MSB)")
time.sleep(0.1)

serial_port.write(reset_byte.to_bytes(1, "big"))

finally:
serial_port.close()
logger.info("Closed serial port")


def find_ftdi_device() -> list:
"""
Find FTDI devices connected to the computer.
"""
FTDI_VENDOR_ID = 0x0403
FTDI_PRODUCT_ID = 0x6001
ports = serial.tools.list_ports.comports()
ftdi_ports = []

for uartCommand in command:
for _ in range(uartRepeat):
# read UART data until preamble and put into queue
serial_port.write(uartCommand)
time.sleep(uartTimeGap)
for port in ports:
if port.vid == FTDI_VENDOR_ID and port.pid == FTDI_PRODUCT_ID:
ftdi_ports.append(port.device)

serial_port.close()
logger.info("\t" + module + ": " + str(value))
logger.info("Close serial port")
sys.exit(1)
return ftdi_ports
2 changes: 1 addition & 1 deletion miniscope_io/plots/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def _init_plot(
plt.ion()
fig: plt.Figure
axes: np.ndarray[Any, np.dtype[plt.Axes]]
fig, axes = plt.subplots(len(self.header_keys), 1, figsize=(6, len(self.header_keys) * 3))
fig, axes = plt.subplots(len(self.header_keys), 1, figsize=(6, len(self.header_keys) * 2))
axes = np.array(axes).reshape(-1) # Ensure axes is an array

# Initialize line objects
Expand Down

0 comments on commit 508abdd

Please sign in to comment.