Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skeleton of a pipeline to explore parameters and store images #1

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9959217
Add classes to read and write folders and the beginning of a possible…
lauraporta Oct 7, 2024
a1c703e
Update dependencies
lauraporta Nov 5, 2024
98ac2b5
Add logging and usage of pattern for glob
lauraporta Nov 5, 2024
b7a1f34
Pre-commit hook modifications
lauraporta Nov 5, 2024
e11b2e2
Fix error with logging filepath
lauraporta Nov 5, 2024
e85206c
WIP on filename
lauraporta Nov 5, 2024
08cdd2e
Improve log name
lauraporta Nov 5, 2024
077b364
Merge branch 'dev' of github.com:neuroinformatics-unit/calcium-imagin…
lauraporta Nov 5, 2024
f59b4aa
Be stricter on allowed sessions
lauraporta Nov 5, 2024
bb41c55
Change usage of file pattern in reader
lauraporta Nov 8, 2024
8054484
Change folder
lauraporta Nov 8, 2024
3e65c8e
Merge branch 'dev' of github.com:neuroinformatics-unit/calcium-imagin…
lauraporta Nov 8, 2024
ad5d0ae
Fix error related to file reading patterns
lauraporta Nov 8, 2024
8a34d2a
Add script useful to launch debugging
lauraporta Nov 8, 2024
9c49db1
Add minimal wandb implementation
lauraporta Nov 8, 2024
831ed71
WIP: saving images 🐛
lauraporta Nov 8, 2024
ca67671
Fix image save bug
lauraporta Nov 11, 2024
e2381bc
Move from wandb to mlflow
lauraporta Nov 11, 2024
789147d
Store mlflow folder differently, link artifacts
lauraporta Nov 11, 2024
385ffec
Update manifest
lauraporta Nov 11, 2024
b44e7ff
Remove test script
lauraporta Nov 11, 2024
3893ef7
Add first group of docstrings
lauraporta Nov 11, 2024
b580fbc
WIP: nested runs, 🐛 on artifacts saving
lauraporta Nov 12, 2024
9a3502e
Add dependencies
lauraporta Nov 18, 2024
ca90cfe
Refactoring
lauraporta Nov 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ venv/

# written by setuptools_scm
**/_version.py

# custom scripts
examples/*.sh
4 changes: 4 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ include LICENSE
include README.md
exclude .pre-commit-config.yaml

recursive-include calcium_imaging_automation *.py
recursive-include examples *.py

recursive-exclude * __pycache__
recursive-exclude * *.py[co]
recursive-exclude docs *
recursive-exclude tests *
recursive-exclude examples *.sh
Empty file.
44 changes: 44 additions & 0 deletions calcium_imaging_automation/core/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import argparse
from pathlib import Path

from calcium_imaging_automation.core.pipeline import pipeline

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Example usage of the pipeline manager."
)

parser.add_argument(
"raw_data_path", type=Path, help="Path to the raw data."
)
parser.add_argument(
"output_path", type=Path, help="Path to the output data."
)
parser.add_argument(
"--folder_read_pattern",
type=str,
help="Glob pattern for reading folder.",
default="*",
)
parser.add_argument(
"--file_read_pattern",
type=str,
help="List of glob patterns for reading files.",
action="append",
)
parser.add_argument(
"--experiment_name",
type=str,
help="Name of the experiment.",
default="pipeline_test",
)

args = parser.parse_args()

pipeline(
args.raw_data_path,
args.output_path,
args.folder_read_pattern,
args.file_read_pattern,
args.experiment_name,
)
175 changes: 175 additions & 0 deletions calcium_imaging_automation/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import datetime
import logging
from pathlib import Path
from typing import List

import mlflow
import numpy as np
import setuptools_scm

from calcium_imaging_automation.core.reader import ReadAquiredData
from calcium_imaging_automation.core.writer import DatashuttleWrapper


def pipeline(
raw_data_path: Path,
output_path: Path,
folder_read_pattern: str,
file_read_pattern: List[str],
experiment_name: str = "pipeline_test",
):
# --- Setup logging and MLflow ---
logging_setup(output_path)
mlflow_setup(output_path)

# --- Read folders and files ---
reader = ReadAquiredData(
raw_data_path,
folder_read_pattern,
file_read_pattern,
)
logging.info(f"Found {len(reader.datasets_paths)} datasets.")
logging.info(f"Dataset names: {reader.dataset_names}")

number_of_tiffs = reader.max_session_number(filetype="tif")
logging.info(f"Max of tiffs found: {number_of_tiffs}")

# --- Write folders and files ---
writer = DatashuttleWrapper(output_path)
writer.create_folders(reader.dataset_names, session_number=number_of_tiffs)

# --- Start processing ---
for dataset in reader.datasets_paths:
dataset_name = dataset.stem

for session in range(0, number_of_tiffs):
mlflow_set_experiment(experiment_name, dataset_name, session)

# Generate mock data
data = np.random.rand(100, 100)

# Start a new MLflow experiment for each dataset-session
with mlflow.start_run(): # this is the parent run
mlflow_parent_run_logs(
dataset_name,
session,
raw_data_path,
output_path,
folder_read_pattern,
file_read_pattern,
)

logging.info(
f"Starting MLflow experiment for dataset {dataset_name} "
+ f"session {session}..."
)

# Mock processing for different runs within the experiment
for i in range(0, 10): # n runs with varying parameters
# Start a child run under the main dataset-session run
with mlflow.start_run(nested=True):
# Mock metric calculation
metric_measured = np.mean(data) * i

# Log the generated data as an artifact if desired
# Here, simulate an image or data file save path
image_path = writer.save_image(
image=data,
dataset_name=dataset_name,
session_number=session,
filename=f"image_{mlflow.active_run().info.run_id}.png",
)

mlflow_log_run(
i,
dataset_name,
session,
metric_measured,
image_path,
)

logging.info(
f"Completed MLflow run iteration {i} for dataset "
+ f"{dataset_name} session {session}"
)

logging.info(
f"Completed MLflow experiment for dataset {dataset_name}"
+ f" session {session}"
)

logging.info("Pipeline finished.")


def logging_setup(output_path: Path):
# --- Setup experiment-wide logging to file ---
(output_path / "logs").mkdir(exist_ok=True)
logging.basicConfig(
filename=str(
output_path
/ "logs"
/ f"{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}.log"
),
level=logging.INFO,
format="%(asctime)s - %(message)s",
)


def mlflow_setup(output_path: Path):
# --- Setup MLflow tracking ---
mlflow_tracking_dir = output_path / "mlflow"
mlflow.set_tracking_uri(str(mlflow_tracking_dir))


def mlflow_set_experiment(
experiment_name: str, dataset_name: str, session: int
):
# Start a new MLflow experiment for each dataset and session
mlflow.set_experiment(
f"{experiment_name}/{dataset_name}/session_{session}"
)


def mlflow_parent_run_logs(
dataset_name: str,
session: int,
raw_data_path: Path,
output_path: Path,
folder_read_pattern: str,
file_read_pattern: List[str],
):
# give specific name to the parent run
mlflow.set_tag("mlflow.runName", f"{dataset_name}_session_{session}")

# Log session-specific parameters
mlflow.log_param("mlflow.Dataset", dataset_name)
mlflow.log_param("session_number", session)
mlflow.log_param("raw_data_path", str(raw_data_path))
mlflow.log_param("output_path", str(output_path))
mlflow.log_param("folder_read_pattern", folder_read_pattern)
mlflow.log_param("file_read_pattern", file_read_pattern)
mlflow.log_param("local_changes_hash", setuptools_scm.get_version())


def mlflow_log_run(
i: int,
dataset_name: str,
session: int,
metric_measured: float,
image_path: Path,
):
# give specific name to the run
mlflow.set_tag("mlflow.runName", f"param_{i}")

# Log parameters and metrics specific to this run
mlflow.log_param("data_size", f"{i * 10}x100")
mlflow.log_param("run_iteration", i)
mlflow.log_param("run_id", mlflow.active_run().info.run_id)
mlflow.log_metric("metric_measured", metric_measured)

mlflow.log_artifact(
# where I am storing the image according to Neuroblueprint
# I think it gets copied in the mlflow data structure
image_path,
artifact_path=f"{dataset_name}/session_{session}/run_{i}",
)
130 changes: 130 additions & 0 deletions calcium_imaging_automation/core/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from pathlib import Path
from typing import List


class ReadAquiredData:
def __init__(
self,
raw_data_folder: Path,
folder_read_pattern: str,
file_read_pattern: List[str],
):
"""
Class to handle filepaths and dataset names in the raw data folder.
It can load folders and files based on the provided patterns, allowing
flexibility in the data structure of origin.
It also provides the maximum number of sessions for each dataset based
on the total number of files found in the dataset folders, by default
it searches for tif files.

Parameters
----------
raw_data_folder : Path
The path to the raw data folder.
folder_read_pattern : str
The pattern to search for folders in the raw data folder. It
corresponds to the naming convention of the datasets.
file_read_pattern : List[str]
The patterns to search for files in the dataset folders. It
corresponds to the naming convention of the files in the dataset
folders.
"""
self.folder_read_pattern = folder_read_pattern
self.file_read_pattern = file_read_pattern

self.datasets_paths = self.get_folders_first_layer(raw_data_folder)
self.dataset_names = [
dataset_path.name for dataset_path in self.datasets_paths
]

def get_folders_first_layer(self, file_path: Path) -> List[Path]:
"""
Get the first layer of folders in the raw data folder. The rest
of the class assumes that the first layer of folders corresponds
to the dataset folders.

Parameters
----------
file_path : Path
The path to the raw data folder.

Returns
-------
List[Path]
The list of paths to the dataset folders.
"""
return list(file_path.glob(self.folder_read_pattern))

def get_files_paths_by_format(
self, folder: Path, filetype="tif"
) -> List[Path]:
"""
Get the paths to the files in the dataset folders based on the
provided file type. By default, it searches for tif files.

Parameters
----------
folder : Path
The path to the dataset folder.
filetype : str, optional
The file type to search for in the dataset folder, by default
"tif".

Returns
-------
List[Path]
The list of paths to the files in the dataset folder.
"""
return list(folder.rglob(filetype))

def total_objects_by_extension(self, folder: Path) -> dict:
"""
Get the total number of files in the dataset folder based on the
extensions included in the file_read_pattern.

Parameters
----------
folder : Path
The path to the dataset folder.

Returns
-------
dict
The dictionary with the number of files for each extension in the
patterns found in file_read_pattern.
"""

return {
filetype.split(".")[-1]: len(
self.get_files_paths_by_format(folder, filetype)
)
for filetype in self.file_read_pattern
}

def max_session_number(self, filetype="tif", max_allowed=5) -> int:
"""
Get the maximum number of sessions for each dataset based on the total
number of files found in the dataset folders. By default, it searches
for tif files and allows a maximum of 5 sessions. It assumes that every
tif file corresponds to an experimental session.

Parameters
----------
filetype : str, optional
The file type to search for in the dataset folder, by default
"tif".
max_allowed : int, optional
The maximum number of sessions allowed, by default 5.

Returns
-------
int
The maximum number of sessions for each dataset.
"""

total_tif_number = [
self.total_objects_by_extension(dataset_path).get(filetype, 0)
for dataset_path in self.datasets_paths
]

return min(max(total_tif_number), max_allowed)
Loading
Loading