Skip to content

Commit

Permalink
Merge pull request #311 from jmccreight/feat_flow_dag
Browse files Browse the repository at this point in the history
Feat flow dag
  • Loading branch information
jmccreight authored Nov 12, 2024
2 parents 2920c8f + 593f7b6 commit ff674c3
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 130 deletions.
12 changes: 9 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ jobs:
- name: Set environment variables
run: |
echo "PYTHON_VERSION=${{ matrix.python-version }}" >> $GITHUB_ENV
echo "PYWS_FORTRAN=true" >> $GITHUB_ENV
echo 'SETUPTOOLS_ENABLE_FEATURES="legacy-editable"' >> $GITHUB_ENV
echo "PYWS_FORTRAN=false" >> $GITHUB_ENV
cat .mf6_ci_ref_remote >> $GITHUB_ENV
- name: Enforce MF6 ref and remote merge to main
Expand Down Expand Up @@ -192,7 +191,14 @@ jobs:
- name: domainless - run tests not requiring domain data
working-directory: autotest
run: pytest -m domainless -n=auto -vv
run: pytest
-m domainless
-n=auto
-vv
--durations=0
--cov=pywatershed
--cov-report=xml
--junitxml=pytest_domainless.xml

- name: sagehen_5yr_no_cascades - generate and manage test data domain, run PRMS and convert csv output to NetCDF
working-directory: autotest
Expand Down
9 changes: 6 additions & 3 deletions autotest/test_starfit_flow_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from pywatershed.parameters import Parameters, StarfitParameters

# NB:
# Here we are comparing a daily starfit against an hourly StarfitNode.
# Here we are comparing a daily offline starfit against an hourly
# StarfitNode. The reference output is the mean value from offline runs run
# from 1995-2001 in the file
# ../test_data/starfit/starfit_mean_output_1995-2001.nc
# We only advance the hourly StarfitNode one substepper day. It's
# resulting flow rates are identical but the change in storage is 1/24
# of the daily value, so we check this. We have to track previous storage
Expand All @@ -29,12 +32,12 @@
# & (parameters_ds.end_time >= np.datetime64("2001-12-31 00:00:00"))
# fmt: off
starfit_inds_test = [
0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13,
0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13,
15, 16, 18, 20, 21, 22, 23, 24, 25, 26, 28, 29, 30,
31, 32, 33, 36, 37, 38, 40, 43, 44, 47, 48, 49, 51,
52, 53, 55, 56, 59, 62, 63, 64, 65, 67, 68, 69, 70,
71, 72, 74, 75, 76, 77, 86, 87, 89, 90, 91, 92, 93,
94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106,
94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106,
107, 108, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120,
122, 123, 130, 134, 137, 139, 140, 141, 145, 148, 149, 152, 154,
155, 156, 157, 158, 159, 160, 161, 162, 164, 165, 166
Expand Down
4 changes: 3 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ New Features
(DFW) routing from PRMS NHM input files and a few simple assumptions. The
lateral (to-channel) fluxes from a PRMS are used as time varying boundary
conditions. A new notebook runs the Delaware River Basin using MF6 DFW:
`examples/mmr_to_mf6_dfw.ipynb <https://github.com/EC-USGS/pywatershed/blob/develop/examples/mmr_to_mf6_dfw.ipynb>`__.
`examples/07_mmr_to_mf6_chf_dfw.ipynb <https://github.com/EC-USGS/pywatershed/blob/develop/examples/07_mmr_to_mf6_chf_dfw.ipynb>`__.
(:pull:`290`) By `James McCreight <https://github.com/jmccreight>`_.
- The depression storage option for PRMSRunoff is implemented and tested.
(:pull:`279`) By `James McCreight <https://github.com/jmccreight>`_.
Expand Down Expand Up @@ -96,6 +96,8 @@ Internal changes
PRMSGroundwater: 1.0e-8, PRMSGroundwaterNoDprst: 1.0e-8, PRMSChannel: 5.0e-7)
for all test domains.
(:pull:`288`) By `James McCreight <https://github.com/jmccreight>`_.
- Migration to Numpy 2.0+.
(:pull:`310`) By `James McCreight <https://github.com/jmccreight>`_.


.. _whats-new.1.1.0:
Expand Down
4 changes: 3 additions & 1 deletion examples/02_prms_legacy_models.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@
"metadata": {},
"outputs": [],
"source": [
"control = pws.Control.load_prms(domain_dir / \"nhm.control\", warn_unused_options=False)\n",
"control = pws.Control.load_prms(\n",
" domain_dir / \"nhm.control\", warn_unused_options=False\n",
")\n",
"\n",
"control"
]
Expand Down
197 changes: 106 additions & 91 deletions examples/06_flow_graph_starfit.ipynb

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions examples/model_loop_custom_output.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@
" \n",
" proc = model.processes[var_proc[var]]\n",
" dim_name = needed_metadata[var][\"dims\"][0]\n",
" dim_len = proc.params.dims[dim_name]\n",
" dim_len = proc._params.dims[dim_name]\n",
" coord_name = dim_coord[dim_name]\n",
" coord_data = proc.params.coords[dim_coord[dim_name]]\n",
" coord_data = proc._params.coords[dim_coord[dim_name]]\n",
" type = needed_metadata[var][\"type\"]\n",
" \n",
" var_meta = {\n",
Expand Down
2 changes: 1 addition & 1 deletion pywatershed/analysis/process_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(
# if (self.__seg_poly.crs.name
# == "USA_Contiguous_Albers_Equal_Area_Conic_USGS_version"):
# print("Overriding USGS aea crs with EPSG:5070")
self.seg_gdf.crs = "EPSG:5070"
self.seg_gdf.set_crs("EPSG:5070")

self.seg_geoms_exploded = (
self.seg_gdf.explode(index_parts=True)
Expand Down
6 changes: 6 additions & 0 deletions pywatershed/base/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1135,4 +1135,10 @@ def dd_to_nc4_ds(dd, nc_file):


def open_datasetdict(nc_file: fileish, use_xr=True):
"""Convenience method for opening a DatasetDict.
Args:
nc_file: the file containing the DatasetDict.
use_xr: Use xarray or NetCDF4 for opening the NetCDF file?
"""
return DatasetDict.from_netcdf(nc_file, use_xr=use_xr)
9 changes: 9 additions & 0 deletions pywatershed/base/flow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class FlowNode(Accessor):
A FlowNode is instantiated with its own (optional) data and calculates
outflow, storage_change, and sink_source properties on subtimesteps.
A FlowNode may have additional public variables provided by properties that
can be requested to be collected by :class:`FlowGraph` for output to
NetCDF files. These variable names should just not overwrite any existing
class attributes.
See :class:`FlowGraph` for related examples and discussion.
"""

Expand Down Expand Up @@ -177,6 +182,10 @@ class FlowGraph(ConservativeProcess):
:func:`prms_channel_flow_graph_to_model_dict`
and :func:`prms_channel_flow_graph_postprocess`.
For developers looking to add new :class:`FlowNode`s, please read the
:class:`FlowNode` base class code and also the code for
:class:`FlowNodeMaker`.
Examples:
---------
Expand Down
8 changes: 4 additions & 4 deletions pywatershed/hydrology/prms_channel_flow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ def prms_channel_flow_graph_postprocess(
new_nodes_maker_indices: collated list of indices relative to each
NodeMaker.
new_nodes_maker_ids: Collated list of ids relative to each NodeMaker.
new_nodes_flow_to_nhm_seg: collated list describing the nhm_seg to
which the node will flow. Use of non-positive entries specifies
new_nodes_flow_to_nhm_seg: collated list describing the nhm_segs to
which the new nodes will flow. Use of non-positive entries specifies
the zero-based index for flowing to nodes specified in these
collated parameters, allowing these new nodes to be added in
groups, in series to the existing NHM FlowGraph. Note that a new
Expand Down Expand Up @@ -782,8 +782,8 @@ def prms_channel_flow_graph_to_model_dict(
new_nodes_maker_indices: collated list of indices relative to each
NodeMaker
new_nodes_maker_ids: Collated list of ids relative to each NodeMaker.
new_nodes_flow_to_nhm_seg: collated list describing the nhm_seg to
which the node will flow. Use of non-positive entries specifies
new_nodes_flow_to_nhm_seg: collated list describing the nhm_segs to
which the new nodes will flow. Use of non-positive entries specifies
the zero-based index for flowing to nodes specified in these
collated parameters, allowing these new nodes to be added in
groups, in series to the existing NHM FlowGraph. Note that a new
Expand Down
12 changes: 10 additions & 2 deletions pywatershed/hydrology/starfit.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,12 @@ class StarfitFlowNode(FlowNode):
computed in a :class:`FlowGraph`. The solution has the option for
subtimestep or daily computations.
Daily computations have the same outflows on the substeps of a day and
outflows and storages are calculated on the last subtimestep. On the first
subtimestep, we use the inflow of the first subtimestep as representative
of the mean inflow of the previous day in order to calculate an average
outflow for the first timestep.
The STARFIT reference:
Sean W.D. Turner, Jennie Clarice Steyaert, Laura Condon, Nathalie Voisin,
Expand All @@ -545,8 +551,10 @@ class StarfitFlowNode(FlowNode):
https://github.com/IMMM-SFA/starfit
Adapted from STARFIT implementation in the MOSART-WM model:
https://github.com/IMMM-SFA/mosartwmpy/blob/main/mosartwmpy/reservoirs/istarf.py
Adapted from STARFIT implementation in the [MOSART-WM model](https://github.com/IMMM-SFA/mosartwmpy/blob/main/mosartwmpy/reservoirs/istarf.py)
Thurber, T., Rexer, E., Vernon, C., Sun, N., Turner, S., Yoon, J.,
Broman, D., & Voisin, N. (2022). mosartwmpy (Version 0.2.7)
[Computer software]. https://github.com/IMMM-SFA/mosartwmpy
See :class:`FlowGraph` for discussion and a worked example. The notebook
`examples/06_flow_graph_starfit.ipynb <https://github.com/EC-USGS/pywatershed/blob/develop/examples/06_flow_graph_starfit.ipynb>`__
Expand Down
71 changes: 49 additions & 22 deletions pywatershed/parameters/starfit_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,39 @@


class StarfitParameters(Parameters):
"""
Starfit parameter class
The GRanD data
"""Starfit parameter class.
This parameter class provides STARFIT parameters to for modeling. This
class does NOT calculate the parameters from inputs (e.g. as ISTARF-CONUS
did using ResOpsUS), it simply provides the format for the model to get the
the parameter data.
The data supplied can come from whatever means. The method
`from_istarf_conus_grand` uses existing ISTARF-CONUS and GRanD data to
create a parameter object for the user.
References:
**ISTARF-CONUS (Inferred Storage Targets and Release Functions - Continental
US)**: Sean W.D. Turner, Jennie Clarice Steyaert, Laura Condon,
Nathalie Voisin, Water storage and release policies for all large
reservoirs of conterminous United States, Journal of Hydrology,
Volume 603, Part A, 2021, 126843, ISSN 0022-1694,
https://doi.org/10.1016/j.jhydrol.2021.126843.
https://zenodo.org/records/4602277
**GRanD (Global Reservoir and Dam) database**: Lehner, Bernhard, Catherine
Reidy Liermann, Carmen Revenga, Charles
Vörösmarty, Balazs Fekete, Philippe Crouzet, Petra Döll et al. "High‐
resolution mapping of the world's reservoirs and dams for sustainable
river‐flow management." Frontiers in Ecology and the Environment 9, no. 9
(2011): 494-502.
https://ln.sync.com/dl/bd47eb6b0/anhxaikr-62pmrgtq-k44xf84f-pyz4atkm/view/default/447819520013
The istarf data
https://zenodo.org/record/4602277#.ZCtYj-zMJqs
The resops data
https://zenodo.org/record/5893641#.ZCtakuzMJqs
# add citiatons. add this information to the starfit model too
# add a working example
**ResOpsUS**: Steyaert, Jennie C., Laura E. Condon, Sean WD Turner, and
Nathalie Voisin. "ResOpsUS, a dataset of historical reservoir operations
in the contiguous United States." Scientific Data 9, no. 1 (2022): 34.
https://zenodo.org/records/6612040
Parameters
----------
Expand Down Expand Up @@ -156,7 +175,7 @@ def from_netcdf(
def from_istarf_conus_grand(
grand_file: Union[pl.Path, str],
istarf_file: Union[pl.Path, str] = None,
files_directory: Union[pl.Path, str] = None,
files_directory: Union[pl.Path, str] = pl.Path("."),
grand_ids: list = None,
):
"""Build parameter object from istarf-conus and the GRanD v1.3 sources.
Expand All @@ -175,14 +194,14 @@ def from_istarf_conus_grand(
Starfit.
Args:
grand_file: a path to an existing dbf or shp file. If the file does not
exist, an error will be thrown and you must download it manually
at https://ln.sync.com/dl/bd47eb6b0/anhxaikr-62pmrgtq-k44xf84f-pyz4atkm/view/de
istarf_file: a path to an existing file. If file does not exist or is
None then the file will be dowladed to files_directory. You can
download the file yourself here
https://zenodo.org/records/4602277/files/ISTARF-CONUS.csv?download=1
grand_file: a path to an existing dbf or shp file. If the file does not
exist, an error will be thrown and you must download it manually
at https://ln.sync.com/dl/bd47eb6b0/anhxaikr-62pmrgtq-k44xf84f-pyz4atkm/view/default/447819520013
https://ln.sync.com/dl/bd47eb6b0/anhxaikr-62pmrgtq-k44xf84f-pyz4atkm/view/default/447819520013
files_directory: A local directory where to download the file.
grand_ids: a subset of grand_ids to keep.
Examples:
Expand Down Expand Up @@ -222,7 +241,7 @@ def from_istarf_conus_grand(
... )
... )
""" # noqa: 501
""" # noqa: E501

grand_ds = _get_grand(grand_file)
istarf_ds = _get_istarf_conus(istarf_file, files_directory)
Expand Down Expand Up @@ -260,7 +279,7 @@ def from_istarf_conus_grand(
nreservoirs = len(ds.nreservoirs)
for vv in ["start_time", "end_time"]:
ds[vv] = xr.Variable(
"nreservoirs", np.array([nat] * nreservoirs, "<M8[s]")
"nreservoirs", np.array([nat] * nreservoirs, "<M8[ns]")
)
# <
vv = "initial_storage"
Expand All @@ -271,13 +290,21 @@ def from_istarf_conus_grand(


def _get_grand(grand_file):
if grand_file is None:
msg = (
"You must acquire the GRanD file manually at\n"
"https://ln.sync.com/dl/bd47eb6b0/anhxaikr-62pmrgtq-k44xf84f-pyz4atkm/view/default/447819520013" # noqa: E501
)
raise IOError(msg)
if not pl.Path(grand_file).exists():
msg = f"the GRanD file {grand_file} does not exist."
raise ValueError(msg)
# check that it's a dbf or a shp file?

cols_keep = ["GRAND_ID", "LONG_DD", "LAT_DD"]
grand_ds = gpd.read_file(grand_file)[cols_keep].to_xarray().drop("index")
grand_ds = (
gpd.read_file(grand_file)[cols_keep].to_xarray().drop_vars("index")
)
grand_ds = grand_ds.rename(
{"GRAND_ID": "grand_id", "index": "nreservoirs"}
).set_coords("grand_id")
Expand All @@ -304,7 +331,7 @@ def _get_istarf_conus(istarf_file, files_directory):
"ISTARF-CONUS.csv?download=1"
)
istarf_file = files_directory / "ISTARF-CONUS.csv"
if not istarf_file_in_exists:
if not istarf_file_in_exists and istarf_file_in is not None:
warn(
"The specified istarf_file does not exist: "
f"{istarf_file_in}"
Expand Down

0 comments on commit ff674c3

Please sign in to comment.