Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial implementation, single initiator works but only the last one … #286

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 157 additions & 84 deletions chunkflow/flow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Generator, List

from copy import deepcopy

import h5py
import numpy as np
import click
import json
Expand All @@ -15,6 +15,7 @@
import zarr
import tinybrain

#from chunkflow.lib.flow import main, operator, generator, initiator
from chunkflow.lib.flow import *
from cloudvolume import CloudVolume
from cloudvolume.lib import Vec
Expand Down Expand Up @@ -59,89 +60,159 @@
@click.option('--size', '-z',
type=click.INT, default=None, nargs=3, callback=default_none,
help='volume size or dimension.')
@generator
def create_bbox(start: tuple, stop: tuple, size: tuple):
@click.option('--name', '-n', type=str, default='bbox',
help='the variable name of this bounding box. Normally "bbox" or "volume_bbox".')
@initiator
def create_bbox(start: tuple, stop: tuple, size: tuple, name: str):
assert stop is not None or size is not None
if stop is None:
stop = Cartesian.from_collection(start) + Cartesian.from_collection(size)
bbox = BoundingBox(start, stop)
task = get_initial_task()
task['bbox'] = bbox
yield task
# task = next(tasks)
state[name] = bbox


@main.command('generate-tasks')
@click.option('--layer-path', '-l',
type=str, default=None,
help='dataset layer path to fetch dataset information.')
@click.option('--mip', '-m',
type=click.INT, default=None, help='mip level of the dataset layer.')
@click.option('--roi-start', '-s',
type=click.INT, default=None, nargs=3, callback=default_none,
help='(z y x), start of the chunks')
@click.option('--roi-stop', '-r',
type=click.INT, nargs=3, default=None, callback=default_none,
help='stop coordinate of region of interest')
@click.option('--roi-size', '-z',
type=click.INT, nargs=3, default=None, callback=default_none,
help='size of region of interest')
@main.command('create-bbox-from-string')
@click.option('--bbox_string', '-b', type=str, default=None,
help='the string representation of a bounding box')
@click.option('--name', '-n', type=str, default='bbox',
help='bounding box name. Normally "bbox" or "volume_bbox".')
@initiator
def create_bbox_from_string(bbox_string: str, name: str):
bbox = BoundingBox.from_string(bbox_string)
state[name] = bbox

@main.command('create-bbox-from-h5')
@click.option('--file-path', '-f', required=True, type=click.Path(
exists=True, dir_okay=False, readable=True, resolve_path=True),
help='HDF5 file path.'
)
@click.option('--volume/--task', default='task',
help='the bounding box is for volume or single task.')
@initiator
def create_bbox_from_h5(file_path: str, volume: bool):
assert os.path.exists(file_path)
with h5py.File(file_path, mode='r') as file:
for key in file.keys():
if 'voxel_offset' in key:
roi_start = Cartesian(*(file[key]))
roi_size = Cartesian(*file[key].shape[-3:])
if roi_start is None:
roi_start = Cartesian(0, 0, 0)

if volume:
key_name = 'volume_bbox'
else:
key_name = 'bbox'

state[key_name] = BoundingBox.from_delta(roi_start, roi_size)


@main.command('create-bbox-from-zarr', group='Initiator')
@click.option('--path', '-p', type=str, required=True,
help='path of Zarr volume.')
@click.option('--name', '-n', type=str, default='volume_bbox',
help='bounding box name. Normally "bbox" or "volume_bbox".')
@initiator
def create_volume_bbox_from_zarr(path: str, name: str):
"""Create a bounding box from a zarr volume."""
z = zarr.open(path, mode='r')
meta= z.attrs.asdict()
bbox = BoundingBox.from_delta(meta['offset'], z.shape)
print('bounding box: ', bbox)
state[name] = bbox
print(f'state after zarr: {state}')
breakpoint()
yield None

@main.command('create-bbox-from-precomputed')
@click.option('--path', '-p', type=str, required=True,
help='path of precomputed volume')
@click.option('--mip', '-m', type=click.INT, default=0,
help='MIP level. The highest resolution mip level is 0.')
@click.option('--use-https/--use-credential', default=False,
help='if we read from a public dataset in cloud storage, it is required to use https.')
@click.option('--name', '-n', type=str, default='volume_bbox',
help='name of volume bounding box.')
@initiator
def create_bbox_from_precomputed(path: str, mip: int, use_https: bool, name: str):
"""Create volume bounding box from a volume stored in precomputed format."""
vol = CloudVolume(path, mip=mip, use_https=use_https)
# dataset shape as z,y,x
dataset_size = vol.mip_shape(mip)[:3][::-1]
dataset_offset = vol.mip_voxel_offset(mip)[::-1]
dataset_size = Cartesian.from_collection(dataset_size)
dataset_offset = Cartesian.from_collection(dataset_offset)
volume_bbox = BoundingBox.from_delta(dataset_offset, dataset_size)
state[name] = volume_bbox


@main.command('divide-volume', group='Initiator')
@click.option('--chunk-size', '-c',
type=click.INT, default=None, nargs=3,
help='(z y x), size/shape of chunks')
@click.option('--bounding-box', '-b', type=str, default=None,
help='the string representation of a bounding box')
@click.option('--grid-size', '-g',
type=click.INT, default=None, nargs=3, callback=default_none,
help='(z y x), grid size of output blocks')
@click.option('--file-path', '-f', default = None,
type=click.Path(writable=True, dir_okay=False, resolve_path=True),
help='output tasks as an numpy array formated as npy.')
@click.option('--queue-name', '-q',
type=str, default=None, help='sqs queue name')
@click.option('--respect-chunk-size/--respect-stop',
default=True, help="""for the last bounding box, \
make the chunk size consistent or cut off at the stopping boundary.""")
@click.option('--aligned-block-size', '-a',
type=click.INT, default=None, nargs=3, callback=default_none,
help='force alignment of block size. Note that the alignment start from (0, 0, 0).')
@click.option('--bounded/--not-bounded', default=False,
help='the bounding boxes should be inside volume or they can cross the volume boundary.')
@initiator
def divide_volume(chunk_size: tuple, respect_chunk_size: bool,
aligned_block_size: bool, bounded: bool):
"""Decompose the volume to a grid of chunks with/without overlap."""
breakpoint()
volume_bbox = state['volume_bbox']
bboxes = BoundingBoxes.from_manual_setup(
volume_bbox, chunk_size,
respect_chunk_size=respect_chunk_size,
aligned_block_size=aligned_block_size,
bounded = bounded,
)
state['bboxes'] = bboxes
print(f'state after divide volume: {state}')
yield None


@main.command('save-tasks')
@click.option('--file-path', '-f', required=True,
type=click.Path(writable=True, dir_okay=False, resolve_path=True),
help='output tasks as an numpy array formated as npy.')
@initiator
def save_tasks(file_path: str):
bboxes = state['bboxes']
# write out as a file
# this could be used for iteration in slurm cluster.
if not file_path.endswith('.npy'):
file_path += len(bboxes) + '.npy'
bboxes.to_file(file_path)


@main.command('ingest-tasks-to-sqs-queue')
@click.option('--queue-name', '-q',
type=str, default=None, help='sqs queue name')
@initiator
def ingest_tasks_to_sqs_queue(queue_name: str):
"""Ingest tasksk to an AWS SQS Queue"""
queue = SQSQueue(queue_name)
bboxes = state['bboxes']
queue.send_message_list(bboxes)


@main.command('generate-tasks')
@click.option('--task-index-start', '-i',
type=click.INT, default=0, help='starting index of task list.')
@click.option('--task-index-stop', '-p',
type=click.INT, default=None, help='stop index of task list.')
@click.option('--disbatch/--no-disbatch', '-d',
default=False, help='use disBatch environment variable or not')
@click.option('--use-https/--use-credential', default=False,
help='if we read from a public dataset in cloud storage, it is required to use https.')
@generator
def generate_tasks(
layer_path: str, mip: int, roi_start: tuple, roi_stop: tuple,
roi_size: tuple, chunk_size: tuple, bounding_box:str,
grid_size: tuple, file_path: str, queue_name: str,
respect_chunk_size: bool, aligned_block_size: tuple,
task_index_start: tuple, task_index_stop: tuple,
disbatch: bool, use_https: bool):
def generate_tasks(task_index_start: tuple, task_index_stop: tuple,
disbatch: bool):
"""Generate a batch of tasks."""
if mip is None:
mip = state['mip']
assert mip >=0

if bounding_box is not None:
bboxes = [BoundingBox.from_string(bounding_box)]
if chunk_size is None:
chunk_size = bboxes[0].shape
else:
assert chunk_size == bboxes[0].shape
else:
bboxes = BoundingBoxes.from_manual_setup(
chunk_size, layer_path=layer_path,
roi_start=roi_start, roi_stop=roi_stop,
roi_size=roi_size, mip=mip, grid_size=grid_size,
respect_chunk_size=respect_chunk_size,
aligned_block_size=aligned_block_size,
use_https=use_https
)
print(f'number of all the candidate tasks: {len(bboxes)}')

bboxes = state['bboxes']
if task_index_start:
if task_index_stop is None:
# task_index_stop = task_index_start + 1
Expand All @@ -155,35 +226,37 @@ def generate_tasks(
bboxes = [bboxes[disbatch_index],]
logging.info(f'selected a task with disBatch index {disbatch_index}')

# write out as a file
# this could be used for iteration in slurm cluster.
if file_path:
if not file_path.endswith('.npy'):
file_path += len(bboxes) + '.npy'
bboxes.to_file(file_path)


# if state['verbose']:
bbox_num = len(bboxes)
logging.info(f'total number of tasks: {bbox_num}')
print(f'total number of tasks: {bbox_num}')

if queue_name is not None:
queue = SQSQueue(queue_name)
queue.send_message_list(bboxes)
else:
for bbox_index, bbox in enumerate(bboxes):
if disbatch:
assert len(bboxes) == 1
bbox_index = disbatch_index
print(f'executing task {bbox_index+task_index_start} in {bbox_num+task_index_start} with bounding box: {bbox.to_filename()}')
logging.info(f'executing task {bbox_index+task_index_start} in {bbox_num+task_index_start} with bounding box: {bbox.to_filename()}')
task = get_initial_task()
task['bbox'] = bbox
task['bbox_index'] = bbox_index
task['bbox_num'] = bbox_num
task['log']['bbox'] = bbox.to_filename()
yield task
for bbox_index, bbox in enumerate(bboxes):
if disbatch:
assert len(bboxes) == 1
bbox_index = disbatch_index
print(f'executing task {bbox_index+task_index_start} in {bbox_num+task_index_start} with bounding box: {bbox.to_filename()}')
logging.info(f'executing task {bbox_index+task_index_start} in {bbox_num+task_index_start} with bounding box: {bbox.to_filename()}')
task = get_initial_task()
task['bbox'] = bbox
task['bbox_index'] = bbox_index
task['bbox_num'] = bbox_num
task['log']['bbox'] = bbox.to_filename()
yield task

@main.command('generate-task')
@generator
def generate_task():
if 'bbox' in state:
bbox = state['bbox']
elif 'volume_bbox' in state:
bbox = state['volume_bbox']
else:
raise ValueError(f'no bounding box in state: {state}')
task = get_initial_task()
task['bbox'] = bbox
yield task

@main.command('adjust-bbox')
@click.option('--corner-offset', '-c', type=click.INT, nargs=6, default=None,
Expand Down
Loading