From 1b25a51a51d79159bc90d7f6eecacf349f2c43a9 Mon Sep 17 00:00:00 2001 From: jingpengw Date: Wed, 18 Jan 2023 16:20:09 -0500 Subject: [PATCH 1/2] initial implementation, single initiator works but only the last one work if they are chained together. --- chunkflow/flow/flow.py | 239 +++++++++++++++++--------- chunkflow/lib/cartesian_coordinate.py | 85 ++------- chunkflow/lib/flow.py | 46 +++-- 3 files changed, 200 insertions(+), 170 deletions(-) diff --git a/chunkflow/flow/flow.py b/chunkflow/flow/flow.py index dda43564..3e428d1a 100755 --- a/chunkflow/flow/flow.py +++ b/chunkflow/flow/flow.py @@ -6,7 +6,7 @@ from typing import Generator, List from copy import deepcopy - +import h5py import numpy as np import click import json @@ -15,6 +15,7 @@ import zarr import tinybrain +#from chunkflow.lib.flow import main, operator, generator, initiator from chunkflow.lib.flow import * from cloudvolume import CloudVolume from cloudvolume.lib import Vec @@ -59,89 +60,157 @@ @click.option('--size', '-z', type=click.INT, default=None, nargs=3, callback=default_none, help='volume size or dimension.') -@generator -def create_bbox(start: tuple, stop: tuple, size: tuple): +@click.option('--name', '-n', type=str, default='bbox', + help='the variable name of this bounding box. Normally "bbox" or "volume_bbox".') +@initiator +def create_bbox(start: tuple, stop: tuple, size: tuple, name: str): assert stop is not None or size is not None if stop is None: stop = Cartesian.from_collection(start) + Cartesian.from_collection(size) bbox = BoundingBox(start, stop) - task = get_initial_task() - task['bbox'] = bbox - yield task + # task = next(tasks) + state[name] = bbox -@main.command('generate-tasks') -@click.option('--layer-path', '-l', - type=str, default=None, - help='dataset layer path to fetch dataset information.') -@click.option('--mip', '-m', - type=click.INT, default=None, help='mip level of the dataset layer.') -@click.option('--roi-start', '-s', - type=click.INT, default=None, nargs=3, callback=default_none, - help='(z y x), start of the chunks') -@click.option('--roi-stop', '-r', - type=click.INT, nargs=3, default=None, callback=default_none, - help='stop coordinate of region of interest') -@click.option('--roi-size', '-z', - type=click.INT, nargs=3, default=None, callback=default_none, - help='size of region of interest') +@main.command('create-bbox-from-string') +@click.option('--bbox_string', '-b', type=str, default=None, + help='the string representation of a bounding box') +@click.option('--name', '-n', type=str, default='bbox', + help='bounding box name. Normally "bbox" or "volume_bbox".') +@initiator +def create_bbox_from_string(bbox_string: str, name: str): + bbox = BoundingBox.from_string(bbox_string) + state[name] = bbox + +@main.command('create-bbox-from-h5') +@click.option('--file-path', '-f', required=True, type=click.Path( + exists=True, dir_okay=False, readable=True, resolve_path=True), + help='HDF5 file path.' +) +@click.option('--volume/--task', default='task', + help='the bounding box is for volume or single task.') +@initiator +def create_bbox_from_h5(file_path: str, volume: bool): + assert os.path.exists(file_path) + with h5py.File(file_path, mode='r') as file: + for key in file.keys(): + if 'voxel_offset' in key: + roi_start = Cartesian(*(file[key])) + roi_size = Cartesian(*file[key].shape[-3:]) + if roi_start is None: + roi_start = Cartesian(0, 0, 0) + + if volume: + key_name = 'volume_bbox' + else: + key_name = 'bbox' + + state[key_name] = BoundingBox.from_delta(roi_start, roi_size) + + +@main.command('create-bbox-from-zarr') +@click.option('--path', '-p', type=str, required=True, + help='path of Zarr volume.') +@click.option('--name', '-n', type=str, default='volume_bbox', + help='bounding box name. Normally "bbox" or "volume_bbox".') +@initiator +def create_volume_bbox_from_zarr(path: str, name: str): + """Create a bounding box from a zarr volume.""" + z = zarr.open(path, mode='r') + meta= z.attrs.asdict() + bbox = BoundingBox.from_delta(meta['offset'], z.shape) + print('bounding box: ', bbox) + state[name] = bbox + print(f'state after zarr: {state}') + yield state + +@main.command('create-bbox-from-precomputed') +@click.option('--path', '-p', type=str, required=True, + help='path of precomputed volume') +@click.option('--mip', '-m', type=click.INT, default=0, + help='MIP level. The highest resolution mip level is 0.') +@click.option('--use-https/--use-credential', default=False, + help='if we read from a public dataset in cloud storage, it is required to use https.') +@click.option('--name', '-n', type=str, default='volume_bbox', + help='name of volume bounding box.') +@initiator +def create_bbox_from_precomputed(path: str, mip: int, use_https: bool, name: str): + """Create volume bounding box from a volume stored in precomputed format.""" + vol = CloudVolume(path, mip=mip, use_https=use_https) + # dataset shape as z,y,x + dataset_size = vol.mip_shape(mip)[:3][::-1] + dataset_offset = vol.mip_voxel_offset(mip)[::-1] + dataset_size = Cartesian.from_collection(dataset_size) + dataset_offset = Cartesian.from_collection(dataset_offset) + volume_bbox = BoundingBox.from_delta(dataset_offset, dataset_size) + state[name] = volume_bbox + + +@main.command('divide-volume') @click.option('--chunk-size', '-c', type=click.INT, default=None, nargs=3, help='(z y x), size/shape of chunks') -@click.option('--bounding-box', '-b', type=str, default=None, - help='the string representation of a bounding box') -@click.option('--grid-size', '-g', - type=click.INT, default=None, nargs=3, callback=default_none, - help='(z y x), grid size of output blocks') -@click.option('--file-path', '-f', default = None, - type=click.Path(writable=True, dir_okay=False, resolve_path=True), - help='output tasks as an numpy array formated as npy.') -@click.option('--queue-name', '-q', - type=str, default=None, help='sqs queue name') @click.option('--respect-chunk-size/--respect-stop', default=True, help="""for the last bounding box, \ make the chunk size consistent or cut off at the stopping boundary.""") @click.option('--aligned-block-size', '-a', type=click.INT, default=None, nargs=3, callback=default_none, help='force alignment of block size. Note that the alignment start from (0, 0, 0).') +@click.option('--bounded/--not-bounded', default=False, + help='the bounding boxes should be inside volume or they can cross the volume boundary.') +@initiator +def divide_volume(chunk_size: tuple, respect_chunk_size: bool, + aligned_block_size: bool, bounded: bool): + breakpoint() + volume_bbox = state['volume_bbox'] + bboxes = BoundingBoxes.from_manual_setup( + volume_bbox, chunk_size, + respect_chunk_size=respect_chunk_size, + aligned_block_size=aligned_block_size, + bounded = bounded, + ) + state['bboxes'] = bboxes + print(f'state after divide volume: {state}') + yield state + + +@main.command('save-tasks') +@click.option('--file-path', '-f', required=True, + type=click.Path(writable=True, dir_okay=False, resolve_path=True), + help='output tasks as an numpy array formated as npy.') +@initiator +def save_tasks(file_path: str): + bboxes = state['bboxes'] + # write out as a file + # this could be used for iteration in slurm cluster. + if not file_path.endswith('.npy'): + file_path += len(bboxes) + '.npy' + bboxes.to_file(file_path) + + +@main.command('ingest-tasks-to-sqs-queue') +@click.option('--queue-name', '-q', + type=str, default=None, help='sqs queue name') +@initiator +def ingest_tasks_to_sqs_queue(queue_name: str): + """Ingest tasksk to an AWS SQS Queue""" + queue = SQSQueue(queue_name) + bboxes = state['bboxes'] + queue.send_message_list(bboxes) + + +@main.command('generate-tasks') @click.option('--task-index-start', '-i', type=click.INT, default=0, help='starting index of task list.') @click.option('--task-index-stop', '-p', type=click.INT, default=None, help='stop index of task list.') @click.option('--disbatch/--no-disbatch', '-d', default=False, help='use disBatch environment variable or not') -@click.option('--use-https/--use-credential', default=False, - help='if we read from a public dataset in cloud storage, it is required to use https.') @generator -def generate_tasks( - layer_path: str, mip: int, roi_start: tuple, roi_stop: tuple, - roi_size: tuple, chunk_size: tuple, bounding_box:str, - grid_size: tuple, file_path: str, queue_name: str, - respect_chunk_size: bool, aligned_block_size: tuple, - task_index_start: tuple, task_index_stop: tuple, - disbatch: bool, use_https: bool): +def generate_tasks(task_index_start: tuple, task_index_stop: tuple, + disbatch: bool): """Generate a batch of tasks.""" - if mip is None: - mip = state['mip'] - assert mip >=0 - - if bounding_box is not None: - bboxes = [BoundingBox.from_string(bounding_box)] - if chunk_size is None: - chunk_size = bboxes[0].shape - else: - assert chunk_size == bboxes[0].shape - else: - bboxes = BoundingBoxes.from_manual_setup( - chunk_size, layer_path=layer_path, - roi_start=roi_start, roi_stop=roi_stop, - roi_size=roi_size, mip=mip, grid_size=grid_size, - respect_chunk_size=respect_chunk_size, - aligned_block_size=aligned_block_size, - use_https=use_https - ) - print(f'number of all the candidate tasks: {len(bboxes)}') - + bboxes = state['bboxes'] if task_index_start: if task_index_stop is None: # task_index_stop = task_index_start + 1 @@ -155,35 +224,37 @@ def generate_tasks( bboxes = [bboxes[disbatch_index],] logging.info(f'selected a task with disBatch index {disbatch_index}') - # write out as a file - # this could be used for iteration in slurm cluster. - if file_path: - if not file_path.endswith('.npy'): - file_path += len(bboxes) + '.npy' - bboxes.to_file(file_path) - + # if state['verbose']: bbox_num = len(bboxes) logging.info(f'total number of tasks: {bbox_num}') print(f'total number of tasks: {bbox_num}') - if queue_name is not None: - queue = SQSQueue(queue_name) - queue.send_message_list(bboxes) - else: - for bbox_index, bbox in enumerate(bboxes): - if disbatch: - assert len(bboxes) == 1 - bbox_index = disbatch_index - print(f'executing task {bbox_index+task_index_start} in {bbox_num+task_index_start} with bounding box: {bbox.to_filename()}') - logging.info(f'executing task {bbox_index+task_index_start} in {bbox_num+task_index_start} with bounding box: {bbox.to_filename()}') - task = get_initial_task() - task['bbox'] = bbox - task['bbox_index'] = bbox_index - task['bbox_num'] = bbox_num - task['log']['bbox'] = bbox.to_filename() - yield task + for bbox_index, bbox in enumerate(bboxes): + if disbatch: + assert len(bboxes) == 1 + bbox_index = disbatch_index + print(f'executing task {bbox_index+task_index_start} in {bbox_num+task_index_start} with bounding box: {bbox.to_filename()}') + logging.info(f'executing task {bbox_index+task_index_start} in {bbox_num+task_index_start} with bounding box: {bbox.to_filename()}') + task = get_initial_task() + task['bbox'] = bbox + task['bbox_index'] = bbox_index + task['bbox_num'] = bbox_num + task['log']['bbox'] = bbox.to_filename() + yield task +@main.command('generate-task') +@generator +def generate_task(): + if 'bbox' in state: + bbox = state['bbox'] + elif 'volume_bbox' in state: + bbox = state['volume_bbox'] + else: + raise ValueError(f'no bounding box in state: {state}') + task = get_initial_task() + task['bbox'] = bbox + yield task @main.command('adjust-bbox') @click.option('--corner-offset', '-c', type=click.INT, nargs=6, default=None, diff --git a/chunkflow/lib/cartesian_coordinate.py b/chunkflow/lib/cartesian_coordinate.py index 664f26b8..3a8d73b0 100644 --- a/chunkflow/lib/cartesian_coordinate.py +++ b/chunkflow/lib/cartesian_coordinate.py @@ -349,82 +349,22 @@ def __init__(self, bboxes: list) -> None: self.data = bboxes @classmethod - def from_manual_setup(cls, - chunk_size:Union[Vec, tuple], - chunk_overlap: Union[Vec, tuple, Cartesian]=Cartesian(0,0,0), - roi_start: Union[Vec, tuple, Cartesian]=None, - roi_stop: Union[Vec, tuple, Cartesian]=None, - roi_size: Union[Vec, tuple, Cartesian]=None, - grid_size: Union[Vec, tuple, Cartesian]=None, + def from_volume_bbox(cls, + volume_bbox: BoundingBox, + chunk_size: Union[Cartesian, Vec, tuple, list], + chunk_overlap: Union[Cartesian, Vec, tuple, list] = Cartesian(0, 0, 0), respect_chunk_size: bool = True, aligned_block_size: Union[Vec, tuple, Cartesian]=None, bounded: bool = False, - layer_path: str = None, - mip: int = 0, - use_https: bool = False, - ): - - if not layer_path: - if grid_size is None and roi_size is None and roi_stop is None: - grid_size = Cartesian(1, 1, 1) - - if roi_start is None: - roi_start = Cartesian(0, 0, 0) - elif not isinstance(roi_start, Cartesian): - roi_start = Cartesian.from_collection(roi_start) - if roi_size is None and chunk_size is not None: - roi_size = Cartesian.from_collection(chunk_size) - roi_stop = roi_start + roi_size - else: - if layer_path.endswith('.h5'): - assert os.path.exists(layer_path) - with h5py.File(layer_path, mode='r') as file: - for key in file.keys(): - if 'offset' in key: - roi_start = Cartesian(*(file[key])) - elif 'voxel_size' not in key: - if roi_size is None: - roi_size = Cartesian(*file[key].shape[-3:]) - if roi_start is None: - roi_start = Cartesian(0, 0, 0) - if roi_size is None and chunk_size is not None: - roi_size = Cartesian.from_collection(chunk_size) - - roi_stop = roi_start + roi_size - else: - vol = CloudVolume(layer_path, mip=mip, use_https=use_https) - # dataset shape as z,y,x - dataset_size = vol.mip_shape(mip)[:3][::-1] - dataset_offset = vol.mip_voxel_offset(mip)[::-1] - dataset_size = Cartesian.from_collection(dataset_size) - dataset_offset = Cartesian.from_collection(dataset_offset) - - if roi_size is None: - roi_size = dataset_size - if roi_stop is None: - roi_stop = dataset_offset + dataset_size - if roi_start is None: - # note that we normally start from -overlap to keep the chunks aligned! - roi_start = dataset_offset - chunk_overlap - assert roi_start is not None - - if not isinstance(chunk_size, Cartesian): - chunk_size = Cartesian(*chunk_size) + ): if not isinstance(chunk_overlap, Cartesian): chunk_overlap = Cartesian(*chunk_overlap) - if not isinstance(roi_start, Cartesian): - assert len(roi_start) == 3 - roi_start = Cartesian(*roi_start) - if not isinstance(roi_size, Cartesian): - roi_size = Cartesian(*roi_size) - if grid_size is not None and not isinstance(grid_size, Cartesian): - grid_size = Cartesian(*grid_size) - if not isinstance(roi_stop, Cartesian): - roi_stop = Cartesian(*roi_stop) + + roi_start = volume_bbox.start + roi_size = volume_bbox.shape stride = chunk_size - chunk_overlap - if roi_stop is None: - roi_stop = roi_start + stride*grid_size + chunk_overlap + # roi_stop = roi_start + stride*grid_size + chunk_overlap if aligned_block_size is not None: if not isinstance(aligned_block_size, Vec): @@ -439,13 +379,10 @@ def from_manual_setup(cls, if roi_stop[idx] % aligned_block_size[idx] > 0: roi_stop_temp[idx] += aligned_block_size[idx] - roi_stop[idx] % aligned_block_size[idx] roi_stop = Cartesian.from_collection(roi_stop_temp) - - if roi_size is None: roi_size = roi_stop - roi_start - if grid_size is None: - grid_size = (roi_size - chunk_overlap) / stride - grid_size = grid_size.ceil + grid_size = (roi_size - chunk_overlap) / stride + grid_size = grid_size.ceil # grid_size = Cartesian.from_collection([ceil(x) for x in grid_size]) # the stride should not be zero if there is more than one chunks diff --git a/chunkflow/lib/flow.py b/chunkflow/lib/flow.py index 3e69eb9e..01c6098e 100644 --- a/chunkflow/lib/flow.py +++ b/chunkflow/lib/flow.py @@ -1,6 +1,8 @@ import sys -from typing import Union +from typing import Union, Callable import logging +logging.getLogger().setLevel(logging.INFO) + from functools import update_wrapper, wraps import click @@ -43,8 +45,8 @@ def default_none(ctx, _, value): @click.group(chain=True) @click.option('--log-level', '-l', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical']), - default='info', - help='print informations level. default is level 1.') + default='debug', + help='print informations level. default is level INFO.') @click.option('--log-file', '-f', type=click.Path(exists=False), default=None, help='log file path.') @@ -65,6 +67,7 @@ def main(log_level, log_file, mip, dry_run, verbose): 'critical' : logging.CRITICAL } log_level = str2level[log_level] + logging.getLogger().setLevel(log_level) console = logging.StreamHandler(sys.stdout) console.setLevel(log_level) @@ -73,11 +76,11 @@ def main(log_level, log_file, mip, dry_run, verbose): logging.getLogger().addHandler(console) if log_file is not None and len(log_file)>0: - fileHandler = logging.FileHandler(filename=log_file) - fileHandler.setLevel(log_level) + file_handler = logging.FileHandler(filename=log_file) + file_handler.setLevel(log_level) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - fileHandler.setFormatter(formatter) - logging.getLogger().addHandler(fileHandler) + file_handler.setFormatter(formatter) + logging.getLogger().addHandler(file_handler) state['mip'] = mip state['dry_run'] = dry_run @@ -102,11 +105,13 @@ def process_commands(operators, log_level, log_file, mip, dry_run, verbose): # task = next(stream) # Evaluate the stream and throw away the items. - for _ in stream: + for item in stream: + print(f'item in stream: {item}') + breakpoint() pass -def operator(func): +def operator(func: Callable): """ Help decorator to rewrite a function so that it returns another function from it. @@ -120,13 +125,30 @@ def operator(stream): return wrapper -def generator(func): +def generator(func: Callable): """Similar to the :func:`operator` but passes through old values unchanged and does not pass through the values as parameter. """ @operator def new_func(stream, *args, **kwargs): - for item in func(*args, **kwargs): - yield item + for task in func(*args, **kwargs): + yield task + + return update_wrapper(new_func, func) +def initiator(func: Callable): + """Setup some basic parameters for the task. + Note that a pipeline should be composed by initiator-->generator-->operator in order. + """ + + # @wraps(func) + # def wrapper(*args, **kwargs): + # def initiator(stream): + # return func(stream, *args, **kwargs) + # return initiator + # return wrapper + + @operator + def new_func(stream, *args, **kwargs): + yield func(*args, **kwargs) return update_wrapper(new_func, func) \ No newline at end of file From 021001e2cbdfd177e09ecfadb01cc68c457a73b3 Mon Sep 17 00:00:00 2001 From: jingpengw Date: Thu, 19 Jan 2023 14:21:57 -0500 Subject: [PATCH 2/2] update --- chunkflow/flow/flow.py | 10 ++++--- chunkflow/lib/flow.py | 68 ++++++++++++++++++++++++++++++++++-------- requirements.txt | 3 +- 3 files changed, 63 insertions(+), 18 deletions(-) diff --git a/chunkflow/flow/flow.py b/chunkflow/flow/flow.py index 3e428d1a..1d731ea5 100755 --- a/chunkflow/flow/flow.py +++ b/chunkflow/flow/flow.py @@ -108,7 +108,7 @@ def create_bbox_from_h5(file_path: str, volume: bool): state[key_name] = BoundingBox.from_delta(roi_start, roi_size) -@main.command('create-bbox-from-zarr') +@main.command('create-bbox-from-zarr', group='Initiator') @click.option('--path', '-p', type=str, required=True, help='path of Zarr volume.') @click.option('--name', '-n', type=str, default='volume_bbox', @@ -122,7 +122,8 @@ def create_volume_bbox_from_zarr(path: str, name: str): print('bounding box: ', bbox) state[name] = bbox print(f'state after zarr: {state}') - yield state + breakpoint() + yield None @main.command('create-bbox-from-precomputed') @click.option('--path', '-p', type=str, required=True, @@ -146,7 +147,7 @@ def create_bbox_from_precomputed(path: str, mip: int, use_https: bool, name: str state[name] = volume_bbox -@main.command('divide-volume') +@main.command('divide-volume', group='Initiator') @click.option('--chunk-size', '-c', type=click.INT, default=None, nargs=3, help='(z y x), size/shape of chunks') @@ -161,6 +162,7 @@ def create_bbox_from_precomputed(path: str, mip: int, use_https: bool, name: str @initiator def divide_volume(chunk_size: tuple, respect_chunk_size: bool, aligned_block_size: bool, bounded: bool): + """Decompose the volume to a grid of chunks with/without overlap.""" breakpoint() volume_bbox = state['volume_bbox'] bboxes = BoundingBoxes.from_manual_setup( @@ -171,7 +173,7 @@ def divide_volume(chunk_size: tuple, respect_chunk_size: bool, ) state['bboxes'] = bboxes print(f'state after divide volume: {state}') - yield state + yield None @main.command('save-tasks') diff --git a/chunkflow/lib/flow.py b/chunkflow/lib/flow.py index 01c6098e..d3837084 100644 --- a/chunkflow/lib/flow.py +++ b/chunkflow/lib/flow.py @@ -1,5 +1,5 @@ import sys -from typing import Union, Callable +from typing import Union, Callable, Iterable import logging logging.getLogger().setLevel(logging.INFO) @@ -14,7 +14,7 @@ class CartesianParamType(click.ParamType): def convert(self, value: Union[list, tuple], param, ctx): assert len(value) == 3 - return Cartesian.from_collection(value) + return Cartesian.from_collection(value) CartesianParam = CartesianParamType() @@ -40,9 +40,45 @@ def default_none(ctx, _, value): return value +# https://stackoverflow.com/questions/58745652/how-can-command-list-display-be-categorised-within-a-click-chained-group +class GroupedGroup(click.Group): + def command(self, *args, **kwargs): + """Gather the command help groups""" + help_group = kwargs.pop('group', None) + decorator = super(GroupedGroup, self).command(*args, **kwargs) + + def wrapper(f): + cmd = decorator(f) + cmd.help_group = help_group + return cmd + + return wrapper + + def format_commands(self, ctx, formatter): + # Modified fom the base class method + + commands = [] + for subcommand in self.list_commands(ctx): + cmd = self.get_command(ctx, subcommand) + if not (cmd is None or cmd.hidden): + commands.append((subcommand, cmd)) + + if commands: + longest = max(len(cmd[0]) for cmd in commands) + # allow for 3 times the default spacing + limit = formatter.width - 6 - longest + + groups = {} + for subcommand, cmd in commands: + help_str = cmd.get_short_help_str(limit) + subcommand += ' ' * (longest - len(subcommand)) + groups.setdefault( + cmd.help_group, []).append((subcommand, help_str)) + + # the code design is based on: -# https://github.com/pallets/click/blob/master/examples/imagepipe/imagepipe.py -@click.group(chain=True) +# https://github.com/pallets/click/blob/main/examples/imagepipe/imagepipe.py +@click.group(name='chunkflow', chain=True, cls=GroupedGroup) @click.option('--log-level', '-l', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical']), default='debug', @@ -90,7 +126,7 @@ def main(log_level, log_file, mip, dry_run, verbose): @main.result_callback() -def process_commands(operators, log_level, log_file, mip, dry_run, verbose): +def process_commands(processors: Iterable, log_level, log_file, mip, dry_run, verbose): """This result callback is invoked with an iterable of all the chained subcommands. As in this example each subcommand returns a function we can chain them together to feed one @@ -100,14 +136,17 @@ def process_commands(operators, log_level, log_file, mip, dry_run, verbose): stream = [get_initial_task(), ] # Pipe it through all stream operators. - for operator in operators: - stream = operator(stream) + for processor in processors: + breakpoint() + stream = processor(stream) # task = next(stream) # Evaluate the stream and throw away the items. for item in stream: print(f'item in stream: {item}') - breakpoint() + if item is None: + breakpoint() + continue pass @@ -118,9 +157,9 @@ def operator(func: Callable): """ @wraps(func) def wrapper(*args, **kwargs): - def operator(stream): + def processor(stream: Iterable): return func(stream, *args, **kwargs) - return operator + return processor return wrapper @@ -130,7 +169,7 @@ def generator(func: Callable): and does not pass through the values as parameter. """ @operator - def new_func(stream, *args, **kwargs): + def new_func(stream: Iterable, *args, **kwargs): for task in func(*args, **kwargs): yield task @@ -149,6 +188,9 @@ def initiator(func: Callable): # return wrapper @operator - def new_func(stream, *args, **kwargs): - yield func(*args, **kwargs) + def new_func(stream: Iterable, *args, **kwargs): + ret = func(*args, **kwargs) + # while True: + yield ret + return update_wrapper(new_func, func) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 57317b3c..c63760f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,4 +20,5 @@ pyparsing>=2.4.2 urllib3>=1.25.11 docutils>=0.10 pynrrd -tensorstore \ No newline at end of file +tensorstore +zarr \ No newline at end of file