Skip to content

Commit

Permalink
abcs for pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
sneakers-the-rat committed Nov 5, 2024
1 parent d4e4597 commit 0037966
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 1 deletion.
14 changes: 13 additions & 1 deletion miniscope_io/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@
Data models :)
"""

from miniscope_io.models.models import Container, MiniscopeConfig, MiniscopeIOModel
from miniscope_io.models.models import (
Container,
MiniscopeConfig,
MiniscopeIOModel,
PipelineModel,
)
from miniscope_io.models.pipeline import Node, Pipeline, ProcessingNode, Sink, Source

__all__ = [
"Container",
"MiniscopeConfig",
"MiniscopeIOModel",
"Node",
"Pipeline",
"PipelineModel",
"ProcessingNode",
"Sink",
"Source",
]
11 changes: 11 additions & 0 deletions miniscope_io/models/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
"""
Base and meta model classes.
All the models here should be empty - these are abstract parent classes
to group child classes in a semantic inheritance hierarchy.
More specific models should either be defined in their own
module within the `miniscope_io.models` package or in their own
package.
"""

from pydantic import BaseModel
Expand Down Expand Up @@ -32,3 +39,7 @@ class Container(MiniscopeIOModel):
See also: :class:`.MiniscopeConfig`
"""


class PipelineModel(MiniscopeIOModel):
"""A model used in a processing pipeline"""
108 changes: 108 additions & 0 deletions miniscope_io/models/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Base ABCs for pipeline classes
"""

from abc import abstractmethod
from typing import ClassVar, Generic, TypeVar, Union

from pydantic import Field

from miniscope_io.models.models import PipelineModel

T = TypeVar("T")
"""
Input Type typevar
"""
U = TypeVar("U")
"""
Output Type typevar
"""


class Node(PipelineModel):
"""A node within a processing pipeline"""


class Source(Node, Generic[U]):
"""A source of data in a processing pipeline"""

output_type: ClassVar[type[U]]
outputs: list[Union["Sink", "ProcessingNode"]] = Field(default_factory=list)

@abstractmethod
def process(self) -> U:
"""
Process some data, returning an output.
.. note::
The `process` method should not directly call or pass
data to subscribed output nodes, but instead return the output
and allow a containing pipeline class to handle dispatching data.
"""


class Sink(Node, Generic[T]):
"""A sink of data in a processing pipeline"""

input_type: ClassVar[type[T]]
inputs: list[Union["Source", "ProcessingNode"]] = Field(default_factory=list)

@abstractmethod
def process(self, data: T) -> None:
"""
Process some incoming data, returning None
.. note::
The `process` method should not directly be called or passed data,
but instead should be called by a containing pipeline class.
"""


class ProcessingNode(Node, Generic[T, U]):
"""
An intermediate processing node that transforms some input to output
"""

input_type: ClassVar[type[T]]
inputs: list[Union["Source", "ProcessingNode"]] = Field(default_factory=list)
output_type: ClassVar[type[U]]
outputs: list[Union["Sink", "ProcessingNode"]] = Field(default_factory=list)

@abstractmethod
def process(self, data: T) -> U:
"""
Process some incoming data, yielding a transformed output
.. note::
The `process` method should not directly call or be called by
output or input nodes, but instead return the output
and allow a containing pipeline class to handle dispatching data.
"""


class Pipeline(PipelineModel):
"""
A graph of nodes transforming some input source(s) to some output sink(s)
"""

sources: list["Source"] = Field(default_factory=list)
processing_nodes: list["ProcessingNode"] = Field(default_factory=list)
sinks: list["Sink"] = Field(default_factory=list)

@abstractmethod
def process(self) -> None:
"""
Process one step of data from each of the sources,
passing intermediate data to any subscribed nodes in a chain.
The `process` method should not return anything except a to-be-implemented
result/status object, as any data intended to be received/processed by
downstream objects should be accessed via a :class:`.Sink` .
"""

0 comments on commit 0037966

Please sign in to comment.