Skip to content

Commit

Permalink
Lots of extra debug statements, working example of BlobEvaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
justincdavis committed Aug 1, 2024
1 parent 468e44a commit ab3409d
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 10 deletions.
2 changes: 1 addition & 1 deletion examples/blobs/testing/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from oakutils.blobs.testing import BlobEvaluater


set_log_level("DEBUG")
set_log_level("ERROR")

blob_eval = BlobEvaluater([gauss1, gauss2, gauss3, gauss4, gauss5, gauss6])

Expand Down
15 changes: 10 additions & 5 deletions src/oakutils/blobs/testing/_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(self: Self, blob_paths: list[Path | str]) -> None:
err_msg += f" Found {len(input_shapes)} unique input shapes."
err_msg += f" Input shapes: {input_shapes}"
raise ValueError(err_msg)
self._input_shape: tuple[tuple[int, ...], ...] = input_shapes.pop()
self._input_shape: tuple[int, ...] = input_shapes.pop()[0]
_log.debug(f"Input shape: {self._input_shape}")

# setup the allocations (groups with shaves <= 12)
Expand Down Expand Up @@ -134,15 +134,20 @@ def run(
"""
results = []
rng = np.random.default_rng()
eval_input = data or [
rng.random(shape).astype(np.float32) for shape in self._input_shape
]
for idx, group in enumerate(self._allocations):
if data is not None:
eval_input = data.copy()
else:
eval_input = [
rng.random(self._input_shape).astype(np.float32) for _ in range(len(self._blobs))
]
group_blobs = [blob for _, blob, _, _ in group]
_log.debug(f"BlobEvaluator: Running group {idx + 1} / {len(self._allocations)}")
with VPU() as vpu:
vpu.reconfigure_multi(group_blobs)
_log.debug(f"BlobEvaluator: VPU reconfigured with {len(group_blobs)} blobs.")
batch_result = vpu.run(eval_input, safe=True)
_log.debug(f"Batch {idx + 1} / {len(self._allocations)} completed.")
_log.debug(f"BlobEvaluator: Batch {idx + 1} / {len(self._allocations)} completed.")
results.append(batch_result)
self._results = results
return results
10 changes: 9 additions & 1 deletion src/oakutils/nodes/buffer/_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# MIT License
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import depthai as dai
Expand All @@ -11,6 +12,8 @@
if TYPE_CHECKING:
from typing_extensions import Self

_log = logging.getLogger(__name__)


class Buffer:
"""Buffer for sending and receiving data from OAK-D."""
Expand Down Expand Up @@ -203,6 +206,7 @@ def send(self: Self, data: list[np.ndarray]) -> None:
for idx, d in enumerate(data):
self._buffers[idx].setData(d)
self._input_queues[idx].send(self._buffers[idx])
_log.debug(f"Sent data through buffer: {self._input_streams[idx]}")

def receive(self: Self) -> list[dai.ADatatype]:
"""
Expand All @@ -214,4 +218,8 @@ def receive(self: Self) -> list[dai.ADatatype]:
The data received from the buffer.
"""
return [queue.get() for queue in self._output_queues]
outputs = []
for queue in self._output_queues:
outputs.append(queue.get())
_log.debug(f"Received data from buffer: {queue.getName()}")
return outputs
5 changes: 3 additions & 2 deletions src/oakutils/nodes/neural_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ def create_neural_network(
The input link to connect to the image manip node or,
if there are multiple input links, an iterable of input links.
Example: cam_rgb.preview or (cam_rgb.preview, stereo.depth)
blob_path : str
blob_path : Path
The path to the blob file to use for the neural network.
Will be converted to a pathlib.Path.
input_names : Optional[Union[str, list[str]]], optional
The names of the input links, by default None
Must be the same length as input_link if a list
Expand Down Expand Up @@ -145,6 +144,7 @@ def create_neural_network(
# connect the input link to the neural network node
if not isinstance(input_link, list):
# handle a single input to the network
_log.debug(f"Linking {input_link.name} to input of neural network")
input_link.link(nn.input)
else:
if input_names is None or reuse_messages is None:
Expand Down Expand Up @@ -177,6 +177,7 @@ def create_neural_network(
with contextlib.suppress(IndexError):
nn.inputs[name].setQueueSize(input_sizes[idx])

_log.debug(f"Neural network created with blob: {blob_path}")
return nn


Expand Down
5 changes: 5 additions & 0 deletions src/oakutils/nodes/xin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
import depthai as dai

_log = logging.getLogger(__name__)


def create_xin(
pipeline: dai.Pipeline,
Expand Down Expand Up @@ -45,4 +48,6 @@ def create_xin(
xin.setStreamName(stream_name)
xin.setMaxDataSize(max_data_size)

_log.debug(f"Created XLinkIn node with stream name: {stream_name}")

return xin
5 changes: 5 additions & 0 deletions src/oakutils/nodes/xout.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
import depthai as dai

_log = logging.getLogger(__name__)


def create_xout(
pipeline: dai.Pipeline,
Expand Down Expand Up @@ -68,4 +71,6 @@ def create_xout(
if input_wait_for_message is not None:
xout.input.setWaitForMessage(input_wait_for_message)

_log.debug(f"Created XLinkOut node with stream name: {stream_name}")

return xout
10 changes: 9 additions & 1 deletion src/oakutils/vpu/_vpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ def _run_thread(
with device_object as device:
# pre-fetch queues and allocate buffers
buffer: MultiBuffer = MultiBuffer(device, self._xin_names, self._xout_names)
_log.debug(f"VPU-Thread: Created MultiBuffer with {len(self._xin_names)} inputs, {len(self._xout_names)} outputs.")
_log.debug(f"VPU-Thread: Input names: {self._xin_names}")
_log.debug(f"VPU-Thread: Output names: {self._xout_names}")

# notify the main thread that VPU is ready
# this will allow the reconfigure call to return
Expand All @@ -404,10 +407,13 @@ def _run_thread(
continue

# push data to networks
_log.debug(f"VPU-Thread: Recevied {len(all_data)} data elements.")
buffer.send(all_data)

# get the results
_log.debug("VPU-Thread: Waiting on results from device.")
self._result_queue.put(buffer.receive())
_log.debug("VPU-Thread: Results forwarded.")

def run(
self: Self,
Expand Down Expand Up @@ -458,7 +464,7 @@ def run(
if not self._thread.is_alive():
err_msg = "VPU thread is not alive."
raise RuntimeError(err_msg)
_log.debug("VPU run called.")
_log.debug(f"VPU run called with data type: {type(data)}")
if not self._multimode:
result = self._run_single(data, safe=safe) # type: ignore[arg-type, assignment]
else:
Expand All @@ -471,6 +477,7 @@ def _run_single(
*,
safe: bool | None = None,
) -> dai.ADatatype | list[dai.ADatatype]:
_log.debug(f"Running single network with data type: {type(data)}")
if safe is None:
safe = True
if safe:
Expand All @@ -497,6 +504,7 @@ def _run_multi(
*,
safe: bool | None = None,
) -> list[dai.ADatatype | list[dai.ADatatype]]:
_log.debug(f"Running multiple networks with data type: {type(data)}")
if safe is None:
safe = True
if safe:
Expand Down

0 comments on commit ab3409d

Please sign in to comment.