Skip to content

Commit

Permalink
Detect full queues, make mp.queue.put non-blocking.
Browse files Browse the repository at this point in the history
  • Loading branch information
t-sasatani committed Nov 1, 2024
1 parent f953342 commit 3ba9255
Showing 1 changed file with 43 additions and 15 deletions.
58 changes: 43 additions & 15 deletions miniscope_io/stream_daq.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ def _uart_recv(
# read UART data until preamble and put into queue
uart_bites = serial_port.read_until(pre_bytes)
log_uart_buffer = [x for x in uart_bites]
serial_buffer_queue.put(log_uart_buffer)
try:
serial_buffer_queue.put(log_uart_buffer, block=False)
except multiprocessing.queues.Full:
self.logger.warning("Serial buffer queue full, skipping buffer.")
finally:
time.sleep(1) # time for ending other process
serial_port.close()
Expand Down Expand Up @@ -348,13 +351,19 @@ def _fpga_recv(
buf_start + len(self.preamble),
buf_stop + len(self.preamble),
)
serial_buffer_queue.put(cur_buffer[buf_start:buf_stop].tobytes())
try:
serial_buffer_queue.put(cur_buffer[buf_start:buf_stop].tobytes(), block=False)
except multiprocessing.queues.Full:
locallogs.warning("Serial buffer queue full, skipping buffer.")
if pre_pos:
cur_buffer = cur_buffer[pre_pos[-1] :]

finally:
locallogs.debug("Quitting, putting sentinel in queue")
serial_buffer_queue.put(None)
try:
serial_buffer_queue.put(None, block=False)
except multiprocessing.queues.Full:
locallogs.error("Serial buffer queue full, Could not put sentinel.")

def _buffer_to_frame(
self,
Expand Down Expand Up @@ -411,7 +420,10 @@ def _buffer_to_frame(
f"Discarding buffer."
)
if header_list:
frame_buffer_queue.put((None, header_list))
try:
frame_buffer_queue.put((None, header_list), block=False)
except multiprocessing.queues.Full:
locallogs.warning("Frame buffer queue full, skipping frame.")
continue

# if first buffer of a frame
Expand All @@ -421,7 +433,10 @@ def _buffer_to_frame(
continue

# push previous frame_buffer into frame_buffer queue
frame_buffer_queue.put((frame_buffer, header_list))
try:
frame_buffer_queue.put((frame_buffer, header_list), block=False)
except multiprocessing.queues.Full:
locallogs.warning("Frame buffer queue full, skipping frame.")

# init new frame_buffer
frame_buffer = frame_buffer_prealloc.copy()
Expand All @@ -447,9 +462,16 @@ def _buffer_to_frame(
if continuous is False:
break
finally:
frame_buffer_queue.put((None, header_list)) # for getting remaining buffers.
locallogs.debug("Quitting, putting sentinel in queue")
frame_buffer_queue.put(None)
try:
frame_buffer_queue.put((None, header_list), block=False) # get remaining buffers.
except multiprocessing.queues.Full:
locallogs.warning("Frame buffer queue full, skipping frame.")

try:
frame_buffer_queue.put(None, block=False)
locallogs.debug("Quitting, putting sentinel in queue")
except multiprocessing.queues.Full:
locallogs.error("Frame buffer queue full, Could not put sentinel.")

def _format_frame(
self,
Expand Down Expand Up @@ -483,11 +505,11 @@ def _format_frame(
while not self.terminate.is_set():
for frame_data, header_list in exact_iter(frame_buffer_queue.get, None):

if not frame_data:
imagearray.put((None, header_list))
continue
if len(frame_data) == 0:
imagearray.put((None, header_list))
if not frame_data or len(frame_data) == 0:
try:
imagearray.put((None, header_list), block=False)
except multiprocessing.queues.Full:
locallogs.warning("Image array queue full, skipping frame.")
continue
frame_data = np.concatenate(frame_data, axis=0)

Expand All @@ -509,12 +531,18 @@ def _format_frame(
frame = np.zeros(
(self.config.frame_width, self.config.frame_height), dtype=np.uint8
)
imagearray.put((frame, header_list))
try:
imagearray.put((frame, header_list), block=False)
except multiprocessing.queues.Full:
locallogs.warning("Image array queue full, skipping frame.")
if continuous is False:
break
finally:
locallogs.debug("Quitting, putting sentinel in queue")
imagearray.put(None)
try:
imagearray.put(None, block=False)
except multiprocessing.queues.Full:
locallogs.error("Image array queue full, Could not put sentinel.")

def init_video(
self, path: Union[Path, str], fourcc: str = "Y800", **kwargs: dict
Expand Down

0 comments on commit 3ba9255

Please sign in to comment.