Skip to content

Commit

Permalink
First draft model test rewrite, still missing equivalence tests
Browse files Browse the repository at this point in the history
  • Loading branch information
justincdavis committed Aug 5, 2024
1 parent 345efb8 commit 2b8bde1
Show file tree
Hide file tree
Showing 19 changed files with 330 additions and 510 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ ci = [
]
test = [
"pytest>=6.2.0",
"stdlib_list>=0.10.0",
]
docs = [
"sphinx==6.2.1",
Expand Down
51 changes: 27 additions & 24 deletions src/oakutils/blobs/testing/_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,36 +277,39 @@ def allclose(
image_dims = 4
image_output = len(self._output_shape) == image_dims

# if data is None, use results and auto-convert
if data is None:
if self._results is None:
err_msg = "No data provided and no results available."
raise ValueError(err_msg)
data = self._results

# output shape is (W, H, C, B) for images
# for other data types, will be different
if image_output:
channels = self._output_shape[2]
frame_size: tuple[int, int] = self._output_shape[0:2] # type: ignore[assignment]
convert_func = functools.partial(
get_nn_frame,
channels=channels,
frame_size=frame_size,
normalization=255.0,
)
# output shape is (W, H, C, B) for images
# for other data types, will be different
if image_output:
channels = self._output_shape[2]
frame_size: tuple[int, int] = self._output_shape[0:2] # type: ignore[assignment]
convert_func = functools.partial(
get_nn_frame,
channels=channels,
frame_size=frame_size,
)
else:
convert_func = functools.partial(get_nn_data, use_first_layer=u8_input)
try:
# allow a try-except here since the conversion may fail
# specifically if the models have multiple inputs/outputs
converted_data = [convert_func(d) for d in data] # type: ignore[arg-type]
except (AttributeError, ValueError) as err:
err_msg = f"Automatic data conversion failed for data: {data}"
err_msg += (
" The issue may be caused by models with multiple inputs/outputs."
)
err_msg += " Please report this issue and attempt manual conversion."
raise RuntimeError(err_msg) from err
# if data has been provided, simply use that
else:
convert_func = functools.partial(get_nn_data, use_first_layer=u8_input)
try:
# allow a try-except here since the conversion may fail
# specifically if the models have multiple inputs/outputs
converted_data = [convert_func(d) for d in data] # type: ignore[arg-type]
except (AttributeError, ValueError) as err:
err_msg = f"Automatic data conversion failed for data: {data}"
err_msg += (
" The issue may be caused by models with multiple inputs/outputs."
)
err_msg += " Please report this issue and attempt manual conversion."
raise RuntimeError(err_msg) from err
converted_data = data

compare_data = []
for idx1, idx2 in itertools.combinations(range(len(converted_data)), 2):
compare_data.append(
Expand Down
5 changes: 0 additions & 5 deletions tests/blobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,3 @@
#
# MIT License
from __future__ import annotations

from . import models
from .test_install import test_model_paths_valid, test_model_shave_dirs_equal, test_model_shave_dirs_equivalent

__all__ = ["models", "test_model_paths_valid", "test_model_shave_dirs_equal", "test_model_shave_dirs_equivalent"]
41 changes: 0 additions & 41 deletions tests/blobs/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,3 @@
#
# MIT License
from __future__ import annotations

from .test_gaussian import test_gaussian_3x3_1_shave, test_gaussian_3x3_1_shave_gray, test_gaussian_15x15_6_shave, test_gaussian_15x15_6_shave_gray
from .test_gftt import test_gftt_3x3_1_shave, test_gftt_3x3_1_shave_blur, test_gftt_3x3_1_shave_blur_gray, test_gftt_3x3_1_shave_gray
from .test_harris import test_harris_3x3_1_shave, test_harris_3x3_1_shave_blur, test_harris_3x3_1_shave_blur_gray, test_harris_3x3_1_shave_gray
from .test_hessian import test_hessian_3x3_1_shave, test_hessian_3x3_1_shave_blur, test_hessian_3x3_1_shave_blur_gray, test_hessian_3x3_1_shave_gray
from .test_laplacian import test_laplacian_3x3_1_shave, test_laplacian_3x3_1_shave_blur, test_laplacian_3x3_1_shave_blur_gray, test_laplacian_3x3_1_shave_gray
from .test_pointcloud import test_pointcloud_1_shave, test_pointcloud_2_shave, test_pointcloud_3_shave, test_pointcloud_4_shave, test_pointcloud_5_shave, test_pointcloud_6_shave
from .test_sobel import test_sobel_1_shave, test_sobel_1_shave_3x3_blur, test_sobel_1_shave_3x3_blur_gray, test_sobel_1_shave_gray

__all__ = [
"test_gaussian_3x3_1_shave",
"test_gaussian_3x3_1_shave_gray",
"test_gaussian_15x15_6_shave",
"test_gaussian_15x15_6_shave_gray",
"test_gftt_3x3_1_shave",
"test_gftt_3x3_1_shave_blur",
"test_gftt_3x3_1_shave_blur_gray",
"test_gftt_3x3_1_shave_gray",
"test_harris_3x3_1_shave",
"test_harris_3x3_1_shave_blur",
"test_harris_3x3_1_shave_blur_gray",
"test_harris_3x3_1_shave_gray",
"test_hessian_3x3_1_shave",
"test_hessian_3x3_1_shave_blur",
"test_hessian_3x3_1_shave_blur_gray",
"test_hessian_3x3_1_shave_gray",
"test_laplacian_3x3_1_shave",
"test_laplacian_3x3_1_shave_blur",
"test_laplacian_3x3_1_shave_blur_gray",
"test_laplacian_3x3_1_shave_gray",
"test_pointcloud_1_shave",
"test_pointcloud_2_shave",
"test_pointcloud_3_shave",
"test_pointcloud_4_shave",
"test_pointcloud_5_shave",
"test_pointcloud_6_shave",
"test_sobel_1_shave",
"test_sobel_1_shave_3x3_blur",
"test_sobel_1_shave_3x3_blur_gray",
"test_sobel_1_shave_gray",
]
48 changes: 48 additions & 0 deletions tests/blobs/models/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
from __future__ import annotations

from collections.abc import Callable
from functools import partial

from oakutils.nodes import get_nn_frame

from .load import create_model, run_model


def create_model_ghhs(createmodelfunc: Callable):
for use_blur in [True, False]:
for ks in [3, 5, 7, 9, 11, 13, 15]:
for shave in [1, 2, 3, 4, 5, 6]:
for use_gs in [True, False]:
modelfunc = partial(
createmodelfunc,
blur_kernel_size=ks,
shaves=shave,
use_blur=use_blur,
grayscale_out=use_gs,
)
assert create_model(modelfunc) == 0, f"Failed for {ks}, {shave}, {use_blur}, {use_gs}"
return 0


def run_model_ghhs(createmodelfunc: Callable):
for use_blur in [True, False]:
for ks in [3, 5, 7, 9, 11, 13, 15]:
for shave in [1, 2, 3, 4, 5, 6]:
for use_gs in [True, False]:
modelfunc = partial(
createmodelfunc,
blur_kernel_size=ks,
shaves=shave,
use_blur=use_blur,
grayscale_out=use_gs,
)
channels = 1 if use_gs else 3
decodefunc = partial(
get_nn_frame,
channels=channels,
)
assert run_model(modelfunc, decodefunc) == 0, f"Failed for {ks}, {shave}, {use_blur}, {use_gs}"
return 0
59 changes: 59 additions & 0 deletions tests/blobs/models/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
from __future__ import annotations

from collections.abc import Callable

import depthai as dai
from oakutils.nodes import create_color_camera, create_xout

from ...device import get_device_count


def create_model(modelfunc: Callable) -> int:
pipeline = dai.Pipeline()
cam = create_color_camera(pipeline)
model = modelfunc(pipeline, cam.preview)
xout_model = create_xout(pipeline, model.out, "model_out")

all_nodes = [
cam,
model,
xout_model,
]
assert len(all_nodes) == 3
for node in all_nodes:
assert node is not None

return 0


def run_model(modelfunc: Callable, decodefunc: Callable) -> int:
pipeline = dai.Pipeline()
cam = create_color_camera(pipeline)
model = modelfunc(pipeline, cam.preview)
xout_model = create_xout(pipeline, model.out, "model_out")

all_nodes = [
cam,
model,
xout_model,
]
assert len(all_nodes) == 3
for node in all_nodes:
assert node is not None

if get_device_count() == 0:
return 0

with dai.Device(pipeline) as device:
queue: dai.DataOutputQueue = device.getOutputQueue("model_out")

while True:
data = queue.get()
frame = decodefunc(data)
assert frame is not None
break

return 0
78 changes: 31 additions & 47 deletions tests/blobs/models/test_gaussian.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,42 @@
# MIT License
from __future__ import annotations

import time
from functools import partial

import depthai as dai

from oakutils.nodes import create_color_camera, create_xout, get_nn_bgr_frame, get_nn_gray_frame
from oakutils.nodes import get_nn_frame
from oakutils.nodes.models import create_gaussian

from .utils import eval_model
from ...helpers import check_device, TIME_TO_RUN


def check_gaussian(kernel_size: int, shaves: int, grayscale_out: bool):
"""Test the gaussian node"""
pipeline = dai.Pipeline()

cam = create_color_camera(pipeline, preview_size=(640, 480))
lp = create_gaussian(
pipeline,
cam.preview,
kernel_size=kernel_size,
shaves=shaves,
grayscale_out=grayscale_out,
)
_ = create_xout(pipeline, lp.out, "gaussian")
from .load import create_model, run_model

if len(dai.Device.getAllAvailableDevices()) == 0:
return 0 # no device found
with dai.Device(pipeline) as device:
l_queue: dai.DataOutputQueue = device.getOutputQueue("gaussian")

t0 = time.perf_counter()
while True:
l_data = l_queue.get()
if not grayscale_out:
l_frame = get_nn_bgr_frame(l_data, frame_size=(640, 480), normalization=255.0)
else:
l_frame = get_nn_gray_frame(l_data, frame_size=(640, 480), normalization=255.0)
if time.perf_counter() - t0 > TIME_TO_RUN:
break
def test_create():
for ks in [3, 5, 7, 9, 11, 13, 15]:
for shave in [1, 2, 3, 4, 5, 6]:
for use_gs in [True, False]:
modelfunc = partial(
create_gaussian,
kernel_size=ks,
shaves=shave,
grayscale_out=use_gs,
)
assert create_model(modelfunc) == 0, f"Failed for {ks}, {shave}, {use_gs}"
return 0

def test_gaussian_3x3_1_shave():
check_device(lambda: check_gaussian(3, 1, False), TIME_TO_RUN)

def test_gaussian_3x3_1_shave_gray():
check_device(lambda: check_gaussian(3, 1, True), TIME_TO_RUN)

def test_gaussian_15x15_6_shave():
check_device(lambda: check_gaussian(15, 6, False), TIME_TO_RUN)

def test_gaussian_15x15_6_shave_gray():
check_device(lambda: check_gaussian(15, 6, True), TIME_TO_RUN)

def test_results():
eval_model("gaussian", (640, 480, 3))
def test_run():
for ks in [3, 5, 7, 9, 11, 13, 15]:
for shave in [1, 2, 3, 4, 5, 6]:
for use_gs in [True, False]:
modelfunc = partial(
create_gaussian,
kernel_size=ks,
shaves=shave,
grayscale_out=use_gs,
)
channels = 1 if use_gs else 3
decodefunc = partial(
get_nn_frame,
channels=channels,
)
assert run_model(modelfunc, decodefunc) == 0, f"Failed for {ks}, {shave}, {use_gs}"
return 0
55 changes: 5 additions & 50 deletions tests/blobs/models/test_gftt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,59 +3,14 @@
# MIT License
from __future__ import annotations

import time

import depthai as dai

from oakutils.nodes import create_color_camera, create_xout, get_nn_bgr_frame, get_nn_gray_frame
from oakutils.nodes.models import create_gftt

from .utils import eval_model
from ...helpers import check_device, TIME_TO_RUN


def check_gftt(kernel_size: int, shaves: int, use_blur: bool, grayscale_out: bool):
"""Test the gftt node"""
pipeline = dai.Pipeline()

cam = create_color_camera(pipeline, fps=15, preview_size=(640, 480))
lp = create_gftt(
pipeline,
cam.preview,
blur_kernel_size=kernel_size,
shaves=shaves,
use_blur=use_blur,
grayscale_out=grayscale_out,
)
_ = create_xout(pipeline, lp.out, "gftt")

if len(dai.Device.getAllAvailableDevices()) == 0:
return 0 # no device found
with dai.Device(pipeline) as device:
l_queue: dai.DataOutputQueue = device.getOutputQueue("gftt")

t0 = time.perf_counter()
while True:
l_data = l_queue.get()
if grayscale_out:
l_frame = get_nn_gray_frame(l_data, frame_size=(640, 480), normalization=255.0)
else:
l_frame = get_nn_bgr_frame(l_data, frame_size=(640, 480), normalization=255.0)
if time.perf_counter() - t0 > TIME_TO_RUN:
break
return 0

def test_gftt_3x3_1_shave():
check_device(lambda: check_gftt(3, 1, False, False), TIME_TO_RUN)
from .basic import create_model_ghhs, run_model_ghhs

def test_gftt_3x3_1_shave_gray():
check_device(lambda: check_gftt(3, 1, False, True), TIME_TO_RUN)

def test_gftt_3x3_1_shave_blur():
check_device(lambda: check_gftt(3, 1, True, False), TIME_TO_RUN)
def test_create() -> int:
return create_model_ghhs(create_gftt)

def test_gftt_3x3_1_shave_blur_gray():
check_device(lambda: check_gftt(3, 1, True, True), TIME_TO_RUN)

def test_results():
eval_model("gftt", (640, 480, 3))
def test_run():
return run_model_ghhs(create_gftt)
Loading

0 comments on commit 2b8bde1

Please sign in to comment.