Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support local paths for InputDataset.source #30

Merged
merged 5 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ dependencies:
- compilers
- netcdf-fortran
- mpich
- nco
- ncview
- xarray
- netCDF4
45 changes: 0 additions & 45 deletions cstar_ocean/.pre-commit-config.yaml

This file was deleted.

12 changes: 6 additions & 6 deletions cstar_ocean/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ def __str__(self):
if isinstance(self.input_datasets, InputDataset)
else 0
)
base_str += f"\n{NAC} AdditionalCode repositories (query using ROMSComponent.additional_code)"
base_str += f"\n{NAC} AdditionalCode repositories (query using Component.additional_code)"
base_str += (
f"\n{NID} InputDataset objects (query using ROMSComponent.input_datasets"
f"\n{NID} InputDataset objects (query using Component.input_datasets"
)

disc_str=''
disc_str = ""
if hasattr(self, "time_step") and self.time_step is not None:
disc_str += "\ntime_step: " + str(self.time_step)
if hasattr(self, "n_procs_x") and self.n_procs_x is not None:
Expand All @@ -141,9 +141,9 @@ def __str__(self):
if hasattr(self, "exe_path") and self.exe_path is not None:
disc_str += "\n\nIs compiled: True"
disc_str += "\n exe_path: " + self.exe_path
if len(disc_str)>0:
disc_str = "\n\nDiscretization info:"+disc_str
base_str+=disc_str
if len(disc_str) > 0:
disc_str = "\n\nDiscretization info:" + disc_str
base_str += disc_str
return base_str

def __repr__(self):
Expand Down
35 changes: 21 additions & 14 deletions cstar_ocean/cstar_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(
RuntimeWarning,
)
self.valid_start_date = None

if valid_end_date is not None:
self.valid_end_date: Optional[dt.datetime] = (
valid_end_date
Expand All @@ -127,8 +127,7 @@ def __init__(
+ "and valid_end_date attributes.",
RuntimeWarning,
)
self.valid_end_date=None

self.valid_end_date = None

# Make sure Case start_date is set and is a datetime object:
if start_date is not None:
Expand Down Expand Up @@ -485,14 +484,16 @@ def persist(self, filename: str):

# Add metadata to dictionary
bp_dict["registry_attrs"] = {"name": self.name}

# Add start date to valid_date_range if it exists
if self.valid_start_date is not None:
bp_dict["registry_attrs"]["valid_date_range"] = {
"start_date": str(self.valid_start_date)
}
bp_dict["registry_attrs"].setdefault("valid_date_range", {})[
"start_date"
] = str(self.valid_start_date)
if self.valid_end_date is not None:
bp_dict["registry_attrs"]["valid_date_range"] = {
"end_date": str(self.valid_end_date)
}
bp_dict["registry_attrs"].setdefault("valid_date_range", {})["end_date"] = (
str(self.valid_end_date)
)

bp_dict["components"] = []

Expand Down Expand Up @@ -563,6 +564,7 @@ def persist(self, filename: str):
if isinstance(input_datasets, list):
input_dataset_info: dict = {}
for ind in input_datasets:
# Determine what kind of input dataset we are adding
if isinstance(ind, ModelGrid):
dct_key = "model_grid"
elif isinstance(ind, InitialConditions):
Expand All @@ -575,11 +577,17 @@ def persist(self, filename: str):
dct_key = "surface_forcing"
if dct_key not in input_dataset_info.keys():
input_dataset_info[dct_key] = {}

# Create a dictionary of file_info for each dataset file:
if "files" not in input_dataset_info[dct_key].keys():
input_dataset_info[dct_key]["files"] = []
input_dataset_info[dct_key]["files"].append(
{"source": ind.source, "file_hash": ind.file_hash}
)
file_info = {"source": ind.source, "hash": ind.file_hash}
if hasattr(ind, "start_date"):
file_info["start_date"] = str(ind.start_date)
if hasattr(ind, "end_date"):
file_info["end_date"] = str(ind.end_date)

input_dataset_info[dct_key]["files"].append(file_info)

component_info["input_datasets"] = input_dataset_info

Expand Down Expand Up @@ -611,13 +619,12 @@ def check_is_setup(self) -> bool:
]
elif isinstance(self.components, list):
component_list = self.components

for component in component_list:
if component.base_model.local_config_status != 0:
return False

# Check AdditionalCode
if (component.additional_code is not None) and (
if (component.additional_code is None) or not (
component.additional_code.check_exists_locally(self.caseroot)
):
return False
Expand Down
129 changes: 80 additions & 49 deletions cstar_ocean/input_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datetime as dt
import dateutil.parser
from typing import Optional
from cstar_ocean.utils import _get_source_type
from cstar_ocean.base_model import BaseModel


Expand All @@ -16,13 +17,13 @@ class InputDataset:
base_model: BaseModel
The base model with which this input dataset is associated
source: str
URL pointing to the netCDF file containing this input dataset
local path or URL pointing to the netCDF file containing this input dataset
file_hash: str
The 256 bit SHA sum associated with the file for verifying downloads
exists_locally: bool, default None
True if the input dataset has been fetched to the local machine, set when `check_exists_locally()` method is called
True if the input dataset exists on the local machine, set by `check_exists_locally()` method if source is a URL
local_path: str, default None
The path to where the input dataset has been fetched locally, set when `get()` method is called
The path where the input dataset exists locally, set when `get()` is called if source is a URL

Methods:
--------
Expand All @@ -48,17 +49,23 @@ def __init__(
base_model: BaseModel
The base model with which this input dataset is associated
source: str
URL pointing to the netCDF file containing this input dataset
URL or path pointing to the netCDF file containing this input dataset
file_hash: str
The 256 bit SHA sum associated with the file for verifying downloads
The 256 bit SHA sum associated with the file for verification

"""

self.base_model: BaseModel = base_model

self.source: str = source
self.file_hash: str = file_hash

self.exists_locally: Optional[bool] = None
self.local_path: Optional[str] = None
if _get_source_type(source) == "path":
self.exists_locally = True
self.local_path = source
Comment on lines +65 to +67
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again I wonder if this could all be abstracted behind a class. e.g.

self.source = DataSource(source)

then later use self.source.exists_locally and self.source.path.

Copy link
Contributor Author

@dafyddstephenson dafyddstephenson Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I began implementing this for this PR but it got hairier than expected. I think it requires a lot of design considerations and should be returned to once the core feature set is complete. AdditionalCode and BaseModel also have a source_repo attribute (that I think is about to be renamed source to accommodate local files) that could be replaced with an instance of whatever this class will be. I think this could be part of the tidy-up that brings in pathlib. I'll raise an issue for now.


self.start_date = start_date
self.end_date = end_date
if isinstance(start_date, str):
Expand All @@ -76,7 +83,7 @@ def __str__(self):
base_str += "\n" + "-" * (len(name) + 7)

base_str += f"\nBase model: {self.base_model.name}"
base_str += f"\nRemote path URL: {self.source}"
base_str += f"\nsource: {self.source}"
if self.start_date is not None:
base_str += f"\nstart_date: {self.start_date}"
if self.end_date is not None:
Expand All @@ -91,35 +98,59 @@ def __str__(self):
def __repr__(self):
return self.__str__()

def get(self, local_path: str):
def get(self, local_dir: str):
"""
Fetch the file containing this input dataset and save it to `local_path` using Pooch.
Make the file containing this input dataset available in `local_dir/input_datasets`

This method updates the `local_path` attribute of the calling InputDataset object
If InputDataset.source is...
- ...a local path: create a symbolic link to the file in `local_dir/input_datasets`.
- ...a URL: fetch the file to `local_dir/input_datasets` using Pooch
(updating the `local_path` attribute of the calling InputDataset)
Comment on lines +105 to +108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is another argument for abstracting out the concept of a "source" into a standalone class / type.

This is a similar idea to using cloudpathlib, but maybe we need to attach more info, in which case we would need our own Source class with a .path attribute.


Parameters:
-----------
local_path: str
The local path where this input dataset will be saved.
local_dir: str
The local directory in which this input dataset will be saved.

"""

tgt_dir = local_path + "/input_datasets/" + self.base_model.name + "/"
tgt_dir = local_dir + "/input_datasets/" + self.base_model.name + "/"
os.makedirs(tgt_dir, exist_ok=True)
tgt_path = tgt_dir + os.path.basename(self.source)

# NOTE: default timeout was leading to a lot of timeouterrors
downloader = pooch.HTTPDownloader(timeout=120)
to_fetch = pooch.create(
path=tgt_dir,
base_url=os.path.dirname(self.source),
registry={os.path.basename(self.source): self.file_hash},
)

to_fetch.fetch(os.path.basename(self.source), downloader=downloader)
self.exists_locally = True
self.local_path = tgt_dir + "/" + os.path.basename(self.source)
# If the file is somewhere else on the system, make a symbolic link where we want it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could all go behind a Source interface.

if self.exists_locally:
assert (
self.local_path is not None
), "local_path should always be set when exists_locally is True"
if os.path.abspath(self.local_path) != os.path.abspath(tgt_path):
if os.path.exists(tgt_path):
raise FileExistsError(
f"A file by the name of {os.path.basename(self.source)}"
+ f"already exists at {tgt_dir}."
)
# TODO maybe this should check the hash and just `return` if it matches?
else:
os.symlink(self.local_path, tgt_path)
return
else:
# nothing to do as file is already at tgt_path
return

def check_exists_locally(self, local_path: str) -> bool:
else:
# Otherwise, download the file
# NOTE: default timeout was leading to a lot of timeouterrors
downloader = pooch.HTTPDownloader(timeout=120)
to_fetch = pooch.create(
path=tgt_dir,
base_url=os.path.dirname(self.source),
registry={os.path.basename(self.source): self.file_hash},
)

to_fetch.fetch(os.path.basename(self.source), downloader=downloader)
self.exists_locally = True
self.local_path = tgt_dir + "/" + os.path.basename(self.source)

def check_exists_locally(self, local_dir: str) -> bool:
"""
Checks whether this InputDataset has already been fetched to the local machine

Expand All @@ -128,38 +159,38 @@ def check_exists_locally(self, local_path: str) -> bool:

Parameters:
-----------
local_path (str):
The local path to check for the existence of this input dataset
local_dir (str):
The local directory in which to check for the existence of this input dataset

Returns:
--------
exists_locally (bool):
True if the method has verified the local existence of the dataset
"""
tgt_dir = local_path + "/input_datasets/" + self.base_model.name + "/"
fpath = tgt_dir + os.path.basename(self.source)
if os.path.exists(fpath):
sha256_hash = hashlib.sha256()
with open(fpath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)

hash_hex = sha256_hash.hexdigest()
if self.file_hash != hash_hex:
raise ValueError(
f"{fpath} exists locally but the local file hash {hash_hex}"
+ "does not match that associated with this InputDataset object"
+ f"{self.file_hash}"
)

if self.exists_locally is None:
tgt_dir = local_dir + "/input_datasets/" + self.base_model.name + "/"
fpath = tgt_dir + os.path.basename(self.source)
if os.path.exists(fpath):
sha256_hash = hashlib.sha256()
with open(fpath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)

hash_hex = sha256_hash.hexdigest()
if self.file_hash != hash_hex:
raise ValueError(
f"{fpath} exists locally but the local file hash {hash_hex}"
+ "does not match that associated with this InputDataset object"
+ f"{self.file_hash}"
)
else:
self.exists_locally = True
self.local_path = tgt_dir
else:
self.exists_locally = True
self.exists_locally = False

if self.exists_locally:
self.local_path = tgt_dir
return True
else:
self.exists_locally = False
return False
return self.exists_locally


class ModelGrid(InputDataset):
Expand Down
16 changes: 16 additions & 0 deletions cstar_ocean/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
import os
import re
import subprocess
import pathlib
from math import ceil
from urllib.parse import urlparse
from cstar_ocean.environment import _CSTAR_CONFIG_FILE


def _get_source_type(source):
"""Determine whether a string (source) describes a local path or url"""
urlparsed_source = urlparse(source)
if all([urlparsed_source.scheme, urlparsed_source.netloc]):
return "url"
elif pathlib.Path(source).exists():
return "path"
else:
raise ValueError(
f"{source} is not a recognised URL or local path pointing to an existing file"
)


def _write_to_config_file(config_file_str):
"""write to C-Star config file to configure environment on import"""
if not os.path.exists(_CSTAR_CONFIG_FILE):
base_conf_str = (
"# This file was generated by C-Star and is specific to your machine. "
Expand Down
Loading
Loading