Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A couple of serial port improvements #2531

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions mu/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ class DeviceList(QtCore.QAbstractListModel):
def __init__(self, modes, parent=None):
super().__init__(parent)
self.modes = modes
self.force_check = False
self._devices = list()

def __iter__(self):
Expand Down Expand Up @@ -713,6 +714,7 @@ def add_device(self, new_device):
# Insert
self.beginInsertRows(parent, position, position)
self._devices.insert(position, new_device)
self.force_check = True
self.endInsertRows()

def remove_device(self, device):
Expand All @@ -723,6 +725,7 @@ def remove_device(self, device):
position = self._devices.index(device)
self.beginRemoveRows(parent, position, position)
self._devices.remove(device)
self.force_check = True
self.endRemoveRows()

def check_usb(self):
Expand All @@ -735,13 +738,21 @@ def check_usb(self):
devices = []
device_types = set()
# Detect connected devices.
changed = False
for mode_name, mode in self.modes.items():
if hasattr(mode, "check_devices"):
if not mode.check_devices():
continue
if hasattr(mode, "find_devices"):
# The mode can detect attached devices.
detected = mode.find_devices(with_logging=False)
changed = True
if detected:
device_types.add(mode_name)
devices.extend(detected)
if not changed and not self.force_check:
return

# Remove no-longer connected devices.
for device in self:
if device not in devices:
Expand Down Expand Up @@ -776,6 +787,7 @@ def check_usb(self):
device.manufacturer,
)
)
self.force_check = False


class Editor(QObject):
Expand Down
57 changes: 55 additions & 2 deletions mu/modes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@
MODULE_NAMES.add("builtins")


class SerialPorts:
def __init__(self):
self.current_ports = []
self.current_sequence = 0
self.current_time = -1

def ports(self, force=False):
new_time = time.monotonic()
if force or new_time - self.current_time > 0.5:
self.current_time = new_time
available_ports = QSerialPortInfo.availablePorts()
if len(self.current_ports) != len(available_ports):
self.current_ports = available_ports
self.current_sequence += 1
else:
for i in range(len(available_ports)):
old = self.current_ports[i]
new = available_ports[i]
if (
old.portName() != new.portName()
or old.description() != new.description()
or old.serialNumber() != new.serialNumber()
):
self.current_ports = available_ports
self.current_sequence += 1
break
return (self.current_ports, self.current_sequence)


serial_ports = SerialPorts()


class REPLConnection(QObject):
serial = None
data_received = pyqtSignal(bytes)
Expand Down Expand Up @@ -74,6 +106,13 @@ def baudrate(self):
else:
return None

@baudrate.setter
def baudrate(self, value):
if value != self._baudrate:
self._baudrate = value
if self.serial:
self.serial.setBaudRate(value)

def open(self):
"""
Open the serial link
Expand Down Expand Up @@ -379,6 +418,8 @@ class MicroPythonMode(BaseMode):
connection = None
baudrate = 115200
builtins = ["const"]
current_devices = []
current_sequence = None

def compatible_board(self, port):
"""
Expand Down Expand Up @@ -410,13 +451,24 @@ def compatible_board(self, port):
)
return None

def find_devices(self, with_logging=True):
def check_devices(self):
"""
Quickly check to see if the set of devices has
changed, return True if changed else False
"""
(_, new_sequence) = serial_ports.ports()
return new_sequence != self.current_sequence

def find_devices(self, with_logging=True, force=False):
"""
Returns the port and serial number, and name for the first
MicroPython-ish device found connected to the host computer.
If no device is found, returns the tuple (None, None, None).
"""
available_ports = QSerialPortInfo.availablePorts()
(available_ports, new_sequence) = serial_ports.ports(force)
if new_sequence == self.current_sequence:
return self.current_devices
self.current_sequence = new_sequence
devices = []
for port in available_ports:
device = self.compatible_board(port)
Expand Down Expand Up @@ -448,6 +500,7 @@ def find_devices(self, with_logging=True):
for p in available_ports
]
)
self.current_devices = devices
return devices

def port_path(self, port_name):
Expand Down
13 changes: 6 additions & 7 deletions mu/modes/snek.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,18 @@ def set_ready(self):
return
self.ready = True
if self.flowcontrol:
for self._baudrate in snek_bauds:
logger.info("Try baudrate %d" % self._baudrate)
self.serial.setBaudRate(self._baudrate)
self.serial.write(b"\x14\n")
for baudrate in snek_bauds:
logger.info("Try baudrate %d" % baudrate)
self.baudrate = baudrate
self.serial.write(b"\x14")
self.serial.waitForReadyRead(250)
if self.got_dc4:
logger.info("Autobaud response detected")
break
logger.info("No autobaud response")
else:
self._baudrate = snek_bauds[0]
logger.info("Using default baudrate %d" % self._baudrate)
self.serial.setBaudRate(self._baudrate)
logger.info("Using default baudrate %d" % snek_bauds[0])
self.baudrate = snek_bauds[0]
self.write(self.pending)
self.pending = b""

Expand Down
8 changes: 4 additions & 4 deletions tests/modes/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def test_micropython_mode_find_device():
), mock.patch("mu.modes.base.os", mock_os), mock.patch(
"mu.modes.base.sys", mock_sys
):
assert mm.find_devices() == [device]
assert mm.find_devices(force=True) == [device]


def test_micropython_mode_find_device_no_ports():
Expand All @@ -300,7 +300,7 @@ def test_micropython_mode_find_device_no_ports():
with mock.patch(
"mu.modes.base.QSerialPortInfo.availablePorts", return_value=[]
):
assert mm.find_devices() == []
assert mm.find_devices(force=True) == []


def test_micropython_mode_find_device_but_no_device():
Expand All @@ -318,7 +318,7 @@ def test_micropython_mode_find_device_but_no_device():
"mu.modes.base.QSerialPortInfo.availablePorts",
return_value=[mock_port],
):
assert mm.find_devices() == []
assert mm.find_devices(force=True) == []


def test_micropython_mode_find_device_darwin_remove_extraneous_devices():
Expand Down Expand Up @@ -356,7 +356,7 @@ def test_micropython_mode_find_device_darwin_remove_extraneous_devices():
"mu.modes.base.QSerialPortInfo.availablePorts",
return_value=[mock_port, mock_port2],
):
assert mm.find_devices() == [device]
assert mm.find_devices(force=True) == [device]


def test_micropython_mode_port_path_posix():
Expand Down
Loading