From 04db18e21cf38bb63aba215dd6f4108f07ee4778 Mon Sep 17 00:00:00 2001 From: Minh Pham Date: Mon, 13 Jul 2020 20:04:55 -0700 Subject: [PATCH 1/4] Fix reader for no-repr datasets --- examples/dame/filled/dem_cropping.yml | 57 +++ examples/dame/filled/gldas_cycles.yml | 48 +++ .../cycles2econ/price/demo_pipeline.py | 4 +- .../to-be-fixed/cycles2econ/price/pipeline.py | 4 +- .../to-be-fixed/cycles2econ/yield/pipeline.py | 2 +- .../pihm2netcdf/geotiff_pipeline.py | 2 +- .../pihm2netcdf/netcdf_pipeline.py | 2 +- funcs/cycles/gldas2cycles.py | 15 +- funcs/dcat_write_func.py | 27 +- funcs/readers/dcat_read_func.py | 326 +++++++++++++----- funcs/readers/dcat_read_no_repr.py | 66 ++-- funcs/topoflow/topoflow_climate.py | 3 + .../topoflow/write_topoflow4_climate_func.py | 5 +- funcs/writers/geotiff_write_func.py | 28 +- 14 files changed, 428 insertions(+), 161 deletions(-) create mode 100644 examples/dame/filled/dem_cropping.yml create mode 100644 examples/dame/filled/gldas_cycles.yml diff --git a/examples/dame/filled/dem_cropping.yml b/examples/dame/filled/dem_cropping.yml new file mode 100644 index 00000000..fc42a013 --- /dev/null +++ b/examples/dame/filled/dem_cropping.yml @@ -0,0 +1,57 @@ +version: "1.0" +description: Data transformation to generate daily average data from original GLDAS data sources +inputs: + gldas_dataset_id: + comment: DataCatalog Dataset ID for GLDAS + value: 5babae3f-c468-4e01-862e-8b201468e3b5 + agg_function: + comment: Operation to be used for aggregation. Values can be ("sum", "average", "count") + value: average + agg_time_period: + comment: Time period for aggregation. Values can be ("minute", "hour", "day", "month", "year") + value: day + start_time: + comment: Start time to filter Resources for DataCatalog GLDAS Dataset (can also be "null" to leave this end open) + value: '2011-01-01 00:00:00' + end_time: + comment: End time to filter Resources for DataCatalog GLDAS Dataset (can also be "null" to leave this end open) + value: '2011-01-02 00:00:00' + crop_region_xmin: + comment: Target region bounding box xmin coordinate + value: 32.75418 + crop_region_ymin: + comment: Target region bounding box ymin coordinate + value: 3.22206 + crop_region_xmax: + comment: Target region bounding box xmax coordinate + value: 47.98942 + crop_region_ymax: + comment: Target region bounding box ymax coordinate + value: 15.15943 + output_file: + comment: Path to output compressed file + value: output.zip +adapters: + gldas_read_func: + comment: My gldas read func adapter + adapter: funcs.DcatReadFunc + inputs: + dataset_id: $$.gldas_dataset_id + start_time: $$.start_time + end_time: $$.end_time + my_crop_wrapper: + comment: My cropping func wrapper adapter + adapter: funcs.CroppingTransFunc + inputs: + variable_name: $$.variable_name + dataset: $.gldas_read_func.data + xmin: $$.crop_region_xmin + ymin: $$.crop_region_ymin + xmax: $$.crop_region_xmax + ymax: $$.crop_region_ymax + geotiff_writer: + adapter: funcs.GeoTiffWriteFunc + inputs: + dataset: $.weather_data.data + variable_name: $$.variable_name + output_dir: $$.tmp_dir_geotiff diff --git a/examples/dame/filled/gldas_cycles.yml b/examples/dame/filled/gldas_cycles.yml new file mode 100644 index 00000000..6c0e11f7 --- /dev/null +++ b/examples/dame/filled/gldas_cycles.yml @@ -0,0 +1,48 @@ +version: "1" +description: Data transformation to generate Cycles-ready input files (RTS) from GLDAS weather data sources +inputs: + start_date: + comment: Start time to filter Resources for DataCatalog GLDAS/GPM Dataset ("YYYY-MM-DD") + value: "2000-01-01" + end_date: + comment: End time to filter Resources for DataCatalog GLDAS/GPM Dataset ("YYYY-MM-DD") + value: "2000-01-01" + dataset_id: + comment: Dataset ID + value: 5babae3f-c468-4e01-862e-8b201468e3b5 + output_file: + comment: "Path to the output directory" + value: "/tmp/demo/output/GLDAS/" + output_prefix: + comment: "Prefix to be added to the output file names" + value: "cycles" + latitude: + comment: "Latitude to extract data (use coord_file when lat = -1 or long = -1)" + value: -1 # + longitude: + comment: "Longitude to extract data (use coord_file when lat = -1 or long = -1)" + value: -1 # use coord_file when lat = -1 or long = -1 + coord_file: + comment: "File path that contains lat/long of the extracting locations" + value: "/tmp/demo/input/oromia.csv" +adapters: + dcat_read_func: + comment: | + Weather dataset + adapter: funcs.DcatReadFunc + inputs: + dataset_id: $$.dataset_id + start_time: $$.start_time + end_time: $$.end_time + gldas2cycles_func: + comment: GLDAS2Cycles adapter + adapter: funcs.Gldas2CyclesFunc + inputs: + start_date: $$.start_date + end_date: $$.end_date + gldas_path: $$.gldas_path + output_path: $$.output_file + output_prefix: $$.output_prefix + latitude: $$.latitude + longitude: $$.longitude + coord_file: $$.coord_file diff --git a/examples/to-be-fixed/cycles2econ/price/demo_pipeline.py b/examples/to-be-fixed/cycles2econ/price/demo_pipeline.py index 857f7753..c8fad1df 100644 --- a/examples/to-be-fixed/cycles2econ/price/demo_pipeline.py +++ b/examples/to-be-fixed/cycles2econ/price/demo_pipeline.py @@ -46,14 +46,14 @@ UnitTransFunc.I.unit_desired: "$/kg", CSVWriteFunc.I.main_class: "qb:Observation", CSVWriteFunc.I.mapped_columns: {}, - CSVWriteFunc.I.output_file: wdir / "price.csv", + CSVWriteFunc.I.output_path: wdir / "price.csv", VisJsonWriteFunc.I.filter: "@type = 'qb:Observation' and " "sdmx-attribute:refArea.contains('Aweil (Town)') and " "sdmx-dimension:refPeriod = '2016-10-15' and " f"dcat-dimension:thing in {str(set(crop_names.keys()))}", VisJsonWriteFunc.I.main_class: "qb:Observation", VisJsonWriteFunc.I.mapped_columns: {}, - VisJsonWriteFunc.I.output_file: wdir / "visualization.json", + VisJsonWriteFunc.I.output_path: wdir / "visualization.json", } outputs = pipeline.exec(inputs) diff --git a/examples/to-be-fixed/cycles2econ/price/pipeline.py b/examples/to-be-fixed/cycles2econ/price/pipeline.py index 8bc28ad1..2a890c02 100644 --- a/examples/to-be-fixed/cycles2econ/price/pipeline.py +++ b/examples/to-be-fixed/cycles2econ/price/pipeline.py @@ -34,14 +34,14 @@ UnitTransFunc.I.unit_desired: "$/kg", CSVWriteFunc.I.main_class: "qb:Observation", CSVWriteFunc.I.mapped_columns: {}, - CSVWriteFunc.I.output_file: wdir / "price.csv", + CSVWriteFunc.I.output_path: wdir / "price.csv", VisJsonWriteFunc.I.filter: "@type = 'qb:Observation' and " "sdmx-attribute:refArea.contains('Aweil (Town)') and " "sdmx-dimension:refPeriod = '2016-10-15' and " f"dcat-dimension:thing in {str(set(crop_names.keys()))}", VisJsonWriteFunc.I.main_class: "qb:Observation", VisJsonWriteFunc.I.mapped_columns: {}, - VisJsonWriteFunc.I.output_file: wdir / "visualization.json", + VisJsonWriteFunc.I.output_path: wdir / "visualization.json", } outputs = pipeline.exec(inputs) diff --git a/examples/to-be-fixed/cycles2econ/yield/pipeline.py b/examples/to-be-fixed/cycles2econ/yield/pipeline.py index 1766b461..884d4373 100644 --- a/examples/to-be-fixed/cycles2econ/yield/pipeline.py +++ b/examples/to-be-fixed/cycles2econ/yield/pipeline.py @@ -79,7 +79,7 @@ def exec(self) -> dict: TransWrapperFunc.I._2.filter: "@type = 'qb:Observation' and sdmx-dimension:refPeriod = '2016-10-12'", TransWrapperFunc.I.code: wdir / "cycles-to-crop.py", CSVWriteFunc.I.main_class: "qb:Observation", - CSVWriteFunc.I.output_file: wdir / "output.csv", + CSVWriteFunc.I.output_path: wdir / "output.csv", CSVWriteFunc.I.mapped_columns: {}, } diff --git a/examples/to-be-fixed/pihm2netcdf/geotiff_pipeline.py b/examples/to-be-fixed/pihm2netcdf/geotiff_pipeline.py index 6f4e54e1..99ce056b 100644 --- a/examples/to-be-fixed/pihm2netcdf/geotiff_pipeline.py +++ b/examples/to-be-fixed/pihm2netcdf/geotiff_pipeline.py @@ -37,7 +37,7 @@ "2017-12-31 23:59:59", "%Y-%m-%d %H:%M:%S" ), # MintGeoTiffWriteFunc.I.output_file: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017.tif", - MintGeoTiffWriteFunc.I.output_file: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017", + MintGeoTiffWriteFunc.I.output_path: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017", MintGeoTiffWriteFunc.I.is_multiple_files: True, } diff --git a/examples/to-be-fixed/pihm2netcdf/netcdf_pipeline.py b/examples/to-be-fixed/pihm2netcdf/netcdf_pipeline.py index c3272b2e..f8a6641e 100644 --- a/examples/to-be-fixed/pihm2netcdf/netcdf_pipeline.py +++ b/examples/to-be-fixed/pihm2netcdf/netcdf_pipeline.py @@ -30,7 +30,7 @@ PihmMonthlyAverageFloodingFunc.I.mean_space: 0.05, PihmMonthlyAverageFloodingFunc.I.start_time: datetime.strptime("2017-01-01 00:00:00", '%Y-%m-%d %H:%M:%S'), PihmMonthlyAverageFloodingFunc.I.end_time: datetime.strptime("2017-12-31 23:59:59", '%Y-%m-%d %H:%M:%S'), - MintNetCDFWriteFunc.I.output_file: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017.nc", + MintNetCDFWriteFunc.I.output_path: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017.nc", MintNetCDFWriteFunc.I.title: "Monthly gridded surface inundation for Pongo River in 2017", MintNetCDFWriteFunc.I.comment: "Outputs generated from the workflow", MintNetCDFWriteFunc.I.naming_authority: "edu.isi.workflow", diff --git a/funcs/cycles/gldas2cycles.py b/funcs/cycles/gldas2cycles.py index 7522577b..3af984cf 100644 --- a/funcs/cycles/gldas2cycles.py +++ b/funcs/cycles/gldas2cycles.py @@ -11,6 +11,7 @@ from dtran import IFunc, ArgType from dtran.ifunc import IFuncType from dtran.metadata import Metadata +import xarray class Gldas2CyclesFunc(IFunc): @@ -84,10 +85,10 @@ def change_metadata( return metadata -def Closest(lat, lon, path): +def closest(lat, lon, path): elevation_fp = path + "/GLDASp4_elevation_025d.nc4" - nc = Dataset(elevation_fp, "r") + nc = xr.ope(elevation_fp, "r") best_y = (np.abs(nc.variables["lat"][:] - lat)).argmin() best_x = (np.abs(nc.variables["lon"][:] - lon)).argmin() @@ -101,7 +102,7 @@ def Closest(lat, lon, path): ) -def ReadVar(y, x, nc_name): +def read_var(y, x, nc_name): with Dataset(nc_name, "r") as nc: _prcp = nc["Rainf_f_tavg"][0, y, x] _temp = nc["Tair_f_inst"][0, y, x] @@ -148,7 +149,7 @@ def process_day(t, y, x, path): for nc_name in os.listdir(nc_path): if nc_name.endswith(".nc4"): - (_prcp, _temp, _wind, _solar, _rh) = ReadVar( + (_prcp, _temp, _wind, _solar, _rh) = read_var( y, x, os.path.join(nc_path, nc_name) ) @@ -235,7 +236,7 @@ def gldas2cycles( for lat, lon, fname in coords: print("Processing data for {0}, {1}".format(lat, lon)) - (y, x, grid_lat, grid_lon, elevation) = Closest(lat, lon, data_path) + (y, x, grid_lat, grid_lon, elevation) = closest(lat, lon, data_path) if grid_lat < 0.0: lat_str = "%.2fS" % (abs(grid_lat)) @@ -256,8 +257,8 @@ def gldas2cycles( Path(output_path).mkdir(parents=True, exist_ok=True) # fname = "met" + lat_str + "x" + lon_str + ".weather" outfp = open(os.path.join(output_path, fname), "w") - outfp.write("LATITUDE %.2f\n" % (grid_lat)) - outfp.write("ALTITUDE %.2f\n" % (elevation)) + outfp.write("LATITUDE %.2f\n" % grid_lat) + outfp.write("ALTITUDE %.2f\n" % elevation) outfp.write("SCREENING_HEIGHT 2\n") outfp.write( "YEAR DOY PP TX TN SOLAR RHX RHN WIND\n" diff --git a/funcs/dcat_write_func.py b/funcs/dcat_write_func.py index 5ce46331..10d0a5bb 100644 --- a/funcs/dcat_write_func.py +++ b/funcs/dcat_write_func.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- import subprocess from pathlib import Path -from typing import Union +from typing import Union, Optional, Dict import os import ujson as json @@ -10,6 +10,7 @@ from dtran.argtype import ArgType from dtran.dcat.api import DCatAPI from dtran.ifunc import IFunc, IFuncType +from dtran.metadata import Metadata class DcatWriteFunc(IFunc): @@ -24,10 +25,9 @@ class DcatWriteFunc(IFunc): } outputs = {"data": ArgType.String} friendly_name: str = "Data Catalog Writer" - func_type = IFuncType.WRITER example = { "resource_path": "$.my_graph_write_func.output_file", - "metadata": '[{"name": "WFP Food Prices - South Sudan", "description": "Food price dataset for South Sudan (2012-2019)"}]' + "metadata": '[{"name": "WFP Food Prices - South Sudan", "description": "Food price dataset for South Sudan (2012-2019)"}]', } PROVENANCE_ID = "b3e79dc2-8fa1-4203-ac82-b5267925191f" @@ -59,28 +59,28 @@ def exec(self) -> dict: def validate(self) -> bool: return True + def change_metadata( + self, metadata: Optional[Dict[str, Metadata]] + ) -> Dict[str, Metadata]: + pass + class DcatWriteMetadataFunc(IFunc): id = "dcat_write_metadata_func" description = """ A data catalog metadata writer adapter. """ func_type = IFuncType.WRITER - inputs = { - "metadata": ArgType.String, - "dataset_id": ArgType.String - } + inputs = {"metadata": ArgType.String, "dataset_id": ArgType.String} outputs = {"data": ArgType.String} friendly_name: str = "Data Catalog Metadata Writer" example = { "metadata": '[{"name": "WFP Food Prices - South Sudan", "description": "Food price dataset for South Sudan (2012-2019)"}]', - "dataset_id": "ea0e86f3-9470-4e7e-a581-df85b4a7075d" + "dataset_id": "ea0e86f3-9470-4e7e-a581-df85b4a7075d", } PROVENANCE_ID = "b3e79dc2-8fa1-4203-ac82-b5267925191f" - def __init__( - self, metadata: str, dataset_id: str - ): + def __init__(self, metadata: str, dataset_id: str): self.metadata = json.loads(metadata) self.dataset_id = dataset_id self.dcat = DCatAPI.get_instance() @@ -93,3 +93,8 @@ def exec(self) -> dict: def validate(self) -> bool: return True + + def change_metadata( + self, metadata: Optional[Dict[str, Metadata]] + ) -> Dict[str, Metadata]: + pass diff --git a/funcs/readers/dcat_read_func.py b/funcs/readers/dcat_read_func.py index 3adeb2a5..40accaef 100644 --- a/funcs/readers/dcat_read_func.py +++ b/funcs/readers/dcat_read_func.py @@ -10,10 +10,18 @@ from collections import OrderedDict from datetime import datetime from pathlib import Path -from typing import Union, Dict +from typing import Union, Dict, Optional from functools import partial from playhouse.kv import KeyValue -from peewee import SqliteDatabase, Model, UUIDField, IntegerField, BooleanField, BigIntegerField, DoesNotExist +from peewee import ( + SqliteDatabase, + Model, + UUIDField, + IntegerField, + BooleanField, + BigIntegerField, + DoesNotExist, +) from drepr import DRepr from drepr.outputs import ArrayBackend, GraphBackend @@ -21,9 +29,10 @@ from dtran.backend import ShardedBackend, ShardedClassID, LazyLoadBackend from dtran.dcat.api import DCatAPI from dtran.ifunc import IFunc, IFuncType +from dtran.metadata import Metadata DATA_CATALOG_DOWNLOAD_DIR = os.path.abspath(os.environ["DATA_CATALOG_DOWNLOAD_DIR"]) -if os.environ['NO_CHECK_CERTIFICATE'].lower().strip() == 'true': +if os.environ["NO_CHECK_CERTIFICATE"].lower().strip() == "true": DOWNLOAD_CMD = "wget --no-check-certificate" else: DOWNLOAD_CMD = "wget" @@ -31,12 +40,12 @@ Path(DATA_CATALOG_DOWNLOAD_DIR).mkdir(exist_ok=True, parents=True) UNITS_MAPPING = { - 'PB': 1 << 50, - 'TB': 1 << 40, - 'GB': 1 << 30, - 'MB': 1 << 20, - 'KB': 1 << 10, - 'B': 1 + "PB": 1 << 50, + "TB": 1 << 40, + "GB": 1 << 30, + "MB": 1 << 20, + "KB": 1 << 10, + "B": 1, } @@ -47,16 +56,20 @@ class Resource(Model): size = BigIntegerField(default=0) class Meta: - database = SqliteDatabase(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, 'dcat_read_func.db'), timeout=10) + database = SqliteDatabase( + os.path.join(DATA_CATALOG_DOWNLOAD_DIR, "dcat_read_func.db"), timeout=10 + ) class ResourceManager: instance = None def __init__(self): - self.max_capacity = 200 * UNITS_MAPPING['MB'] - self.max_clear_size = 100 * UNITS_MAPPING['MB'] - assert self.max_capacity >= self.max_clear_size, "max_capacity cannot be less than max_clear_size" + self.max_capacity = 200 * UNITS_MAPPING["MB"] + self.max_clear_size = 100 * UNITS_MAPPING["MB"] + assert ( + self.max_capacity >= self.max_clear_size + ), "max_capacity cannot be less than max_clear_size" self.poll_interval = 10 self.compressed_resource_types = {".zip", ".tar.gz", ".tar"} self.db = Resource._meta.database @@ -64,19 +77,30 @@ def __init__(self): self.db.create_tables([Resource], safe=True) self.db.close() self.kv = KeyValue(database=self.db, value_field=BigIntegerField()) - with self.db.atomic('EXCLUSIVE'): + with self.db.atomic("EXCLUSIVE"): # initializing current_size of DATA_CATALOG_DOWNLOAD_DIR in the database - if 'current_size' not in self.kv: - self.kv['current_size'] = sum(f.stat().st_size for f in Path(DATA_CATALOG_DOWNLOAD_DIR).rglob('*')) + if "current_size" not in self.kv: + self.kv["current_size"] = sum( + f.stat().st_size for f in Path(DATA_CATALOG_DOWNLOAD_DIR).rglob("*") + ) else: for resource in Resource.select(): if resource.is_downloading: continue - path = Path(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id) + '.dat')) + path = Path( + os.path.join( + DATA_CATALOG_DOWNLOAD_DIR, + str(resource.resource_id) + ".dat", + ) + ) if not path.exists(): - path = Path(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id))) + path = Path( + os.path.join( + DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id) + ) + ) if not path.exists(): - self.kv['current_size'] -= resource.size + self.kv["current_size"] -= resource.size resource.delete_instance() @staticmethod @@ -85,26 +109,37 @@ def get_instance(): ResourceManager.instance = ResourceManager() return ResourceManager.instance - def download(self, resource_id: str, resource_metadata: Dict[str, str], should_redownload: bool) -> str: - is_compressed = resource_metadata['resource_type'] in self.compressed_resource_types + def download( + self, + resource_id: str, + resource_metadata: Dict[str, str], + should_redownload: bool, + ) -> str: + is_compressed = ( + resource_metadata["resource_type"] in self.compressed_resource_types + ) if is_compressed: - if resource_metadata['resource_type'].startswith(".tar"): + if resource_metadata["resource_type"].startswith(".tar"): raise NotImplementedError() path = os.path.join(DATA_CATALOG_DOWNLOAD_DIR, resource_id) else: - path = os.path.join(DATA_CATALOG_DOWNLOAD_DIR, resource_id + '.dat') + path = os.path.join(DATA_CATALOG_DOWNLOAD_DIR, resource_id + ".dat") download = True - with self.db.atomic('EXCLUSIVE'): + with self.db.atomic("EXCLUSIVE"): try: # if resource already exists - resource = Resource.select().where(Resource.resource_id == resource_id).get() + resource = ( + Resource.select().where(Resource.resource_id == resource_id).get() + ) resource.ref_count += 1 if not should_redownload: # TODO: comparing timestamp before skipping download download = False except DoesNotExist: - resource = Resource.create(resource_id=resource_id, ref_count=1, is_downloading=True, size=0) + resource = Resource.create( + resource_id=resource_id, ref_count=1, is_downloading=True, size=0 + ) if download: required_size = 0 @@ -112,25 +147,37 @@ def download(self, resource_id: str, resource_metadata: Dict[str, str], should_r # adjust required_size when the resource is to be redownloaded required_size -= resource.size try: - resource.size = int(requests.head(resource_metadata['resource_data_url']).headers['Content-Length']) + resource.size = int( + requests.head(resource_metadata["resource_data_url"]).headers[ + "Content-Length" + ] + ) required_size += resource.size except KeyError: pass - if self.max_capacity - self.kv['current_size'] < required_size: + if self.max_capacity - self.kv["current_size"] < required_size: # clear files to make space - self.kv['current_size'] -= self.clear() - assert self.max_capacity - self.kv['current_size'] >= required_size, "Not enough disk space" - self.kv['current_size'] += required_size + self.kv["current_size"] -= self.clear() + assert ( + self.max_capacity - self.kv["current_size"] >= required_size + ), "Not enough disk space" + self.kv["current_size"] += required_size resource.save() if download: if resource.ref_count > 1: # block until all other processes accessing the resource are finished - DcatReadFunc.logger.debug(f"Waiting for some other process/thread to free resource {resource_id} ...") + DcatReadFunc.logger.debug( + f"Waiting for some other process/thread to free resource {resource_id} ..." + ) while resource.ref_count > 1: time.sleep(self.poll_interval) - with self.db.atomic('EXCLUSIVE'): - resource = Resource.select().where(Resource.resource_id == resource_id).get() + with self.db.atomic("EXCLUSIVE"): + resource = ( + Resource.select() + .where(Resource.resource_id == resource_id) + .get() + ) if resource.ref_count == 1: # setting is_downloading before redownload resource.is_downloading = True @@ -142,52 +189,85 @@ def download(self, resource_id: str, resource_metadata: Dict[str, str], should_r Path(path).unlink() DcatReadFunc.logger.debug(f"Downloading resource {resource_id} ...") if is_compressed: - temp_path = path + resource_metadata['resource_type'] - subprocess.check_call(f"wget -q \"{resource_metadata['resource_data_url']}\" -O {temp_path}", shell=True, close_fds=False) - self.uncompress(resource_metadata['resource_type'], path) + temp_path = path + resource_metadata["resource_type"] + subprocess.check_call( + f"wget -q \"{resource_metadata['resource_data_url']}\" -O {temp_path}", + shell=True, + close_fds=False, + ) + self.uncompress(resource_metadata["resource_type"], path) # adjust required_size when the resource is compressed required_size = -resource.size - required_size += sum(f.stat().st_size for f in Path(path).rglob('*')) + Path(path).stat().st_size + required_size += ( + sum(f.stat().st_size for f in Path(path).rglob("*")) + + Path(path).stat().st_size + ) Path(temp_path).unlink() else: - subprocess.check_call(f"wget -q \"{resource_metadata['resource_data_url']}\" -O {path}", shell=True, close_fds=False) + subprocess.check_call( + f"wget -q \"{resource_metadata['resource_data_url']}\" -O {path}", + shell=True, + close_fds=False, + ) required_size = 0 - with self.db.atomic('EXCLUSIVE'): - self.kv['current_size'] += required_size - resource = Resource.select().where(Resource.resource_id == resource_id).get() + with self.db.atomic("EXCLUSIVE"): + self.kv["current_size"] += required_size + resource = ( + Resource.select().where(Resource.resource_id == resource_id).get() + ) resource.size += required_size resource.is_downloading = False resource.save() else: - DcatReadFunc.logger.debug(f"Skipping resource {resource_id}, found in cache") + DcatReadFunc.logger.debug( + f"Skipping resource {resource_id}, found in cache" + ) if resource.is_downloading: # block until some other process is done downloading the resource - DcatReadFunc.logger.debug(f"Waiting for other process/thread to finish downloading resource {resource_id} ...") + DcatReadFunc.logger.debug( + f"Waiting for other process/thread to finish downloading resource {resource_id} ..." + ) while resource.is_downloading: time.sleep(self.poll_interval) - with self.db.atomic('EXCLUSIVE'): - resource = Resource.select().where(Resource.resource_id == resource_id).get() + with self.db.atomic("EXCLUSIVE"): + resource = ( + Resource.select() + .where(Resource.resource_id == resource_id) + .get() + ) return self.path(resource_id, path, is_compressed) def unlink(self, resource_id): - with self.db.atomic('EXCLUSIVE'): - resource = Resource.select().where(Resource.resource_id == resource_id).get() + with self.db.atomic("EXCLUSIVE"): + resource = ( + Resource.select().where(Resource.resource_id == resource_id).get() + ) resource.ref_count -= 1 resource.save() def clear(self) -> int: size = 0 - for resource in Resource.select().where(Resource.ref_count == 0).order_by(Resource.id): + for resource in ( + Resource.select().where(Resource.ref_count == 0).order_by(Resource.id) + ): DcatReadFunc.logger.debug(f"Clearing resource {resource.resource_id}") - path = Path(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id) + '.dat')) + path = Path( + os.path.join( + DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id) + ".dat" + ) + ) if path.exists(): size += path.stat().st_size path.unlink() else: - path = Path(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id))) - size += sum(f.stat().st_size for f in path.rglob('*')) + path.stat().st_size + path = Path( + os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id)) + ) + size += ( + sum(f.stat().st_size for f in path.rglob("*")) + path.stat().st_size + ) shutil.rmtree(str(path)) resource.delete_instance() if size >= self.max_clear_size: @@ -195,23 +275,30 @@ def clear(self) -> int: return size def uncompress(self, resource_type: str, path: Union[Path, str]): - subprocess.check_call(f"unzip {path + resource_type} -d {path}", shell=True, close_fds=False) + subprocess.check_call( + f"unzip {path + resource_type} -d {path}", shell=True, close_fds=False + ) # flatten the structure (max two levels) for fpath in Path(path).iterdir(): if fpath.is_dir(): for sub_file in fpath.iterdir(): new_file = os.path.join(path, sub_file.name) if os.path.exists(new_file): - raise Exception("Invalid resource. Shouldn't overwrite existing file") + raise Exception( + "Invalid resource. Shouldn't overwrite existing file" + ) os.rename(str(sub_file), new_file) shutil.rmtree(str(fpath)) - def path(self, resource_id: str, path: Union[Path, str], is_compressed: bool) -> str: + def path( + self, resource_id: str, path: Union[Path, str], is_compressed: bool + ) -> str: if not is_compressed: return path # we need to look in the folder and find the resource files = [ - fpath for fpath in Path(path).iterdir() + fpath + for fpath in Path(path).iterdir() if fpath.is_file() and not fpath.name.startswith(".") ] if len(files) == 0: @@ -241,95 +328,139 @@ class DcatReadFunc(IFunc): "should_redownload": ArgType.Boolean(optional=True), "override_drepr": ArgType.String(optional=True), } - outputs = {"data": ArgType.DataSet(None), "data_path": ArgType.ListString(optional=True)} + outputs = { + "data": ArgType.DataSet(None), + "data_path": ArgType.ListString(optional=True), + } example = { "dataset_id": "ea0e86f3-9470-4e7e-a581-df85b4a7075d", "start_time": "2020-03-02T12:30:55", "end_time": "2020-03-02T12:30:55", "lazy_load_enabled": "False", "should_redownload": "False", - "override_drepr": "/tmp/model.yml" + "override_drepr": "/tmp/model.yml", } logger = logging.getLogger(__name__) - def __init__(self, - dataset_id: str, - start_time: datetime = None, - end_time: datetime = None, - lazy_load_enabled: bool = False, - should_redownload: bool = False, - override_drepr: str = None - ): + def __init__( + self, + dataset_id: str, + start_time: datetime = None, + end_time: datetime = None, + lazy_load_enabled: bool = False, + should_redownload: bool = False, + override_drepr: str = None, + ): self.dataset_id = dataset_id self.lazy_load_enabled = lazy_load_enabled self.should_redownload = should_redownload self.resource_manager = ResourceManager.get_instance() dataset = DCatAPI.get_instance().find_dataset_by_id(dataset_id) - assert ('resource_repr' in dataset['metadata']) or ('dataset_repr' in dataset['metadata']), \ - "Dataset is missing both 'resource_repr' and 'dataset_repr'" - assert not (('resource_repr' in dataset['metadata']) and ('dataset_repr' in dataset['metadata'])), \ - "Dataset has both 'resource_repr' and 'dataset_repr'" + assert ("resource_repr" in dataset["metadata"]) or ( + "dataset_repr" in dataset["metadata"] + ), "Dataset is missing both 'resource_repr' and 'dataset_repr'" + assert not ( + ("resource_repr" in dataset["metadata"]) + and ("dataset_repr" in dataset["metadata"]) + ), "Dataset has both 'resource_repr' and 'dataset_repr'" - resources = DCatAPI.get_instance().find_resources_by_dataset_id(dataset_id, start_time, end_time) + resources = DCatAPI.get_instance().find_resources_by_dataset_id( + dataset_id, start_time, end_time + ) self.resources = OrderedDict() - if 'resource_repr' in dataset['metadata']: + if "resource_repr" in dataset["metadata"]: if override_drepr is not None: self.drepr = DRepr.parse_from_file(override_drepr) else: - self.drepr = DRepr.parse(dataset['metadata']['resource_repr']) + self.drepr = DRepr.parse(dataset["metadata"]["resource_repr"]) for resource in resources: - self.resources[resource['resource_id']] = {key: resource[key] for key in - {'resource_data_url', 'resource_type'}} - self.repr_type = 'resource_repr' + self.resources[resource["resource_id"]] = { + key: resource[key] for key in {"resource_data_url", "resource_type"} + } + self.repr_type = "resource_repr" else: # TODO: fix me!! assert len(resources) == 1 - self.resources[resources[0]['resource_id']] = {key: resources[0][key] for key in - {'resource_data_url', 'resource_type'}} + self.resources[resources[0]["resource_id"]] = { + key: resources[0][key] for key in {"resource_data_url", "resource_type"} + } if override_drepr is not None: self.drepr = DRepr.parse_from_file(override_drepr) else: - self.drepr = DRepr.parse(dataset['metadata']['dataset_repr']) - self.repr_type = 'dataset_repr' + self.drepr = DRepr.parse(dataset["metadata"]["dataset_repr"]) + self.repr_type = "dataset_repr" self.logger.debug(f"Found key '{self.repr_type}'") def exec(self) -> dict: # TODO: fix me! incorrect way to choose backend - if self.get_preference("data") is None or self.get_preference("data") == 'array': + if ( + self.get_preference("data") is None + or self.get_preference("data") == "array" + ): backend = ArrayBackend else: backend = GraphBackend if self.lazy_load_enabled: - if self.repr_type == 'dataset_repr': + if self.repr_type == "dataset_repr": resource_id, resource_metadata = list(self.resources.items())[0] - return {"data": LazyLoadBackend(backend, self.drepr, partial(self.resource_manager.download, - resource_id, resource_metadata, - self.should_redownload), - self.resource_manager.unlink)} + return { + "data": LazyLoadBackend( + backend, + self.drepr, + partial( + self.resource_manager.download, + resource_id, + resource_metadata, + self.should_redownload, + ), + self.resource_manager.unlink, + ) + } else: dataset = ShardedBackend(len(self.resources)) for resource_id, resource_metadata in self.resources.items(): - dataset.add(LazyLoadBackend(backend, self.drepr, partial(self.resource_manager.download, - resource_id, resource_metadata, - self.should_redownload), - self.resource_manager.unlink, partial(ShardedClassID, dataset.count))) + dataset.add( + LazyLoadBackend( + backend, + self.drepr, + partial( + self.resource_manager.download, + resource_id, + resource_metadata, + self.should_redownload, + ), + self.resource_manager.unlink, + partial(ShardedClassID, dataset.count), + ) + ) return {"data": dataset} else: # data_path is location of the resources in disk, for pipeline that wants to download the file - if self.repr_type == 'dataset_repr': + if self.repr_type == "dataset_repr": resource_id, resource_metadata = list(self.resources.items())[0] - resource_file = self.resource_manager.download(resource_id, resource_metadata, self.should_redownload) - return {"data": backend.from_drepr(self.drepr, resource_file), "data_path": [resource_file]} + resource_file = self.resource_manager.download( + resource_id, resource_metadata, self.should_redownload + ) + return { + "data": backend.from_drepr(self.drepr, resource_file), + "data_path": [resource_file], + } else: dataset = ShardedBackend(len(self.resources)) data_path = [] for resource_id, resource_metadata in self.resources.items(): - resource_file = self.resource_manager.download(resource_id, resource_metadata, self.should_redownload) - dataset.add(backend.from_drepr(self.drepr, resource_file, dataset.inject_class_id)) + resource_file = self.resource_manager.download( + resource_id, resource_metadata, self.should_redownload + ) + dataset.add( + backend.from_drepr( + self.drepr, resource_file, dataset.inject_class_id + ) + ) data_path.append(data_path) return {"data": dataset, "data_path": data_path} @@ -340,3 +471,8 @@ def __del__(self): def validate(self) -> bool: return True + + def change_metadata( + self, metadata: Optional[Dict[str, Metadata]] + ) -> Dict[str, Metadata]: + return metadata diff --git a/funcs/readers/dcat_read_no_repr.py b/funcs/readers/dcat_read_no_repr.py index 855fbe11..d6ed0bf1 100644 --- a/funcs/readers/dcat_read_no_repr.py +++ b/funcs/readers/dcat_read_no_repr.py @@ -3,10 +3,21 @@ import subprocess from pathlib import Path +from typing import Dict, Optional from dtran.argtype import ArgType from dtran.ifunc import IFunc, IFuncType -from funcs.readers.dcat_read_func import DCatAPI +from dtran.metadata import Metadata +from funcs.readers.dcat_read_func import DCatAPI, ResourceManager +import os + +DATA_CATALOG_DOWNLOAD_DIR = os.path.abspath(os.environ["DATA_CATALOG_DOWNLOAD_DIR"]) +if os.environ["NO_CHECK_CERTIFICATE"].lower().strip() == "true": + DOWNLOAD_CMD = "wget --no-check-certificate" +else: + DOWNLOAD_CMD = "wget" + +Path(DATA_CATALOG_DOWNLOAD_DIR).mkdir(exist_ok=True, parents=True) class DcatReadNoReprFunc(IFunc): @@ -18,44 +29,33 @@ class DcatReadNoReprFunc(IFunc): friendly_name: str = " Data Catalog Reader Without repr File" inputs = {"dataset_id": ArgType.String} outputs = {"data": ArgType.String} - example = { - "dataset_id": "05c43c58-ed42-4830-9b1f-f01059c4b96f" - } + example = {"dataset_id": "05c43c58-ed42-4830-9b1f-f01059c4b96f"} def __init__(self, dataset_id: str): - # TODO: move to a diff arch (pointer to Data-Catalog URL) - DCAT_URL = "https://api.mint-data-catalog.org" - self.dataset_id = dataset_id + self.resource = [] - resource_results = DCatAPI.get_instance(DCAT_URL).find_resources_by_dataset_id( - dataset_id - ) - # TODO: fix me!! - assert len(resource_results) == 1 - resource_ids = {"default": resource_results[0]["resource_data_url"]} - Path("/tmp/dcat_read_func").mkdir(exist_ok=True, parents=True) - - self.resources = {} - for resource_id, resource_url in resource_ids.items(): - file_full_path = f"/tmp/dcat_read_func/{resource_id}.dat" - subprocess.check_call( - f"wget {resource_url} -O {file_full_path}", shell=True - ) - self.resources[resource_id] = file_full_path + resources = DCatAPI.get_instance().find_resources_by_dataset_id(dataset_id) + + self.resource_manager = ResourceManager.get_instance() + + assert len(resources) == 1 + + self.resource_id = resources[0]["resource_id"] + self.resource_metadata = { + key: resources[0][key] for key in {"resource_data_url", "resource_type"} + } def exec(self) -> dict: - input_dir_full_path = f"/data/{self.dataset_id}" - for resource in self.resources.values(): - if not Path(input_dir_full_path).exists(): - print("Not exists") - Path(input_dir_full_path).mkdir(parents=True) - else: - subprocess.check_output(f"rm -rf {input_dir_full_path}/*", shell=True) - subprocess.check_call( - f"tar -xvzf {resource} -C {input_dir_full_path}/", shell=True - ) - return {"data": input_dir_full_path} + data_path = self.resource_manager.download( + self.resource_id, self.resource_metadata, should_redownload=True + ) + return {"data_path": data_path} def validate(self) -> bool: return True + + def change_metadata( + self, metadata: Optional[Dict[str, Metadata]] + ) -> Dict[str, Metadata]: + return metadata diff --git a/funcs/topoflow/topoflow_climate.py b/funcs/topoflow/topoflow_climate.py index 2698d231..6393e982 100644 --- a/funcs/topoflow/topoflow_climate.py +++ b/funcs/topoflow/topoflow_climate.py @@ -86,6 +86,9 @@ def exec(self) -> dict: def validate(self) -> bool: return True + def change_metadata(self, metadata: Optional[Dict[str, Metadata]]) -> Dict[str, Metadata]: + return metadata + class Topoflow4ClimateWriteWrapperFunc(IFunc): func_cls = Topoflow4ClimateWriteFunc diff --git a/funcs/topoflow/write_topoflow4_climate_func.py b/funcs/topoflow/write_topoflow4_climate_func.py index 9b968836..3b75e536 100644 --- a/funcs/topoflow/write_topoflow4_climate_func.py +++ b/funcs/topoflow/write_topoflow4_climate_func.py @@ -71,6 +71,9 @@ def exec(self) -> dict: def validate(self) -> bool: return True + def change_metadata(self, metadata: Optional[Dict[str, Metadata]]) -> Dict[str, Metadata]: + return metadata + class Topoflow4ClimateWritePerMonthFunc(IFunc): id = "topoflow4_climate_write_per_month_func" @@ -87,7 +90,7 @@ class Topoflow4ClimateWritePerMonthFunc(IFunc): func_type = IFuncType.MODEL_TRANS example = { "grid_dir": f"/data/mint/gpm_grid_baro", - "date_regex": '3B-HHR-E.MS.MRG.3IMERG.(?P\d{4})(?P\d{2})(?P\d{2})', + "date_regex": r'3B-HHR-E.MS.MRG.3IMERG.(?P\d{4})(?P\d{2})(?P\d{2})', "output_file": f"/data/mint/baro/climate.rts", } diff --git a/funcs/writers/geotiff_write_func.py b/funcs/writers/geotiff_write_func.py index be72241b..69966de2 100644 --- a/funcs/writers/geotiff_write_func.py +++ b/funcs/writers/geotiff_write_func.py @@ -1,6 +1,7 @@ import os from pathlib import Path from typing import Dict, Optional, Union +from zipfile import ZipFile from drepr.outputs.base_output_sm import BaseOutputSM from datetime import datetime, timezone @@ -16,32 +17,45 @@ class GeoTiffWriteFunc(IFunc): inputs = { "dataset": ArgType.DataSet(None), "variable_name": ArgType.String, - "output_dir": ArgType.String, + "output_path": ArgType.String, } outputs = { "output_files": ArgType.ListString } - def __init__(self, dataset: BaseOutputSM, variable_name: str, output_dir: Union[str, Path]): + def __init__(self, dataset: BaseOutputSM, variable_name: str, output_path: Union[str, Path]): self.dataset = dataset self.variable_name = variable_name - self.output_dir = os.path.abspath(str(output_dir)) + self.output_path = os.path.abspath(str(output_path)) - if not os.path.exists(self.output_dir): - Path(self.output_dir).mkdir(exist_ok=True, parents=True) + if self.output_path.endswith(".zip"): + self.dir_path = self.output_path[:-4] + else: + self.dir_path = self.output_path + + if not os.path.exists(self.output_path): + Path(self.output_path).mkdir(exist_ok=True, parents=True) def exec(self): rasters = CroppingTransFunc.extract_raster(self.dataset, self.variable_name) rasters = sorted(rasters, key=lambda x: x['timestamp']) + outfiles = [ - os.path.join(self.output_dir, + os.path.join(self.dir_path, datetime.fromtimestamp(raster['timestamp'], tz=timezone.utc).strftime(f"%Y%m%d%H%M%S.{i}.tif")) for i, raster in enumerate(rasters) ] for outfile, raster in zip(outfiles, rasters): raster['raster'].to_geotiff(outfile) - return {"output_files": outfiles} + if self.output_path.endswith(".zip"): + # compress the outfile + with ZipFile(self.output_path, 'w') as z: + for outfile in outfiles: + z.write(outfile, os.path.basename(outfile)) + return {"output_files": [self.output_path]} + else: + return {"output_files": outfiles} def validate(self) -> bool: return True From 0b66410e0c13df988f02f4f35a842ffc0df89d1d Mon Sep 17 00:00:00 2001 From: Binh Vu Date: Sat, 27 Jun 2020 11:51:36 -0700 Subject: [PATCH 2/4] fix bugs --- dtran/argtype.py | 2 + dtran/config_parser.py | 40 +- dtran/dcat/scripts/upload_files_in_batch.py | 12 +- dtran/main.py | 11 +- examples/topoflow4/dev/run_gldas.py | 85 ++++ examples/topoflow4/dev/run_gldas.sh | 49 ++ .../topoflow4/dev/tf_climate.gldas_remote.yml | 9 +- .../topoflow4/dev/tf_climate_make_rts.yml | 13 + .../tf_climate_writegeotiff.gldas_remote.yml | 16 + examples/topoflow4/dev/verify_result.py | 58 +++ funcs/readers/dcat_read_func.py | 421 ++++++++---------- funcs/readers/read_func.py | 45 ++ funcs/topoflow/topoflow/__init__.py | 2 +- .../topoflow/components/tests/__init__.py | 2 +- funcs/topoflow/topoflow/framework/emeli.py | 2 +- .../topoflow/framework/emeli_with_cfunits.py | 2 +- funcs/topoflow/topoflow_climate.py | 16 +- funcs/topoflow/topoflow_funcs.py | 6 +- funcs/writers/geotiff_write_func.py | 36 +- uploaded.json | 10 + 20 files changed, 544 insertions(+), 293 deletions(-) create mode 100644 examples/topoflow4/dev/run_gldas.py create mode 100644 examples/topoflow4/dev/run_gldas.sh create mode 100644 examples/topoflow4/dev/tf_climate_make_rts.yml create mode 100644 examples/topoflow4/dev/tf_climate_writegeotiff.gldas_remote.yml create mode 100644 examples/topoflow4/dev/verify_result.py create mode 100644 uploaded.json diff --git a/dtran/argtype.py b/dtran/argtype.py index 3bf0f07b..2392b1b7 100644 --- a/dtran/argtype.py +++ b/dtran/argtype.py @@ -22,6 +22,7 @@ class ArgType(object): Callable[[Any, bool, str, str], 'ArgType']] = dataset OrderedDict: 'ArgType' = None ListString: 'ArgType' = None + ListOrOneString: 'ArgType' = None String: 'ArgType' = None Number: 'ArgType' = None Boolean: 'ArgType' = None @@ -75,6 +76,7 @@ def type_cast(self, val: str): from_str=lambda val: str(Path(val))) ArgType.OrderedDict = ArgType("ordered_dict", validate=lambda val: isinstance(val, dict)) ArgType.ListString = ArgType("list_string", validate=lambda val: isinstance(val, list) and all(isinstance(x, str) for x in val)) +ArgType.ListOrOneString = ArgType("list_or_one_string", validate=lambda val: isinstance(val, str) or (isinstance(val, list) and all(isinstance(x, str) for x in val))) ArgType.String = ArgType("string", validate=lambda val: isinstance(val, str)) ArgType.Number = ArgType("number", validate=lambda val: isinstance(val, (int, float)), from_str=lambda val: ('.' in val and float(val)) or int(val)) diff --git a/dtran/config_parser.py b/dtran/config_parser.py index 85fa7943..ccf586a5 100644 --- a/dtran/config_parser.py +++ b/dtran/config_parser.py @@ -36,10 +36,6 @@ class InputSchema(Schema): class PipelineSchema(Schema): - def __init__(self, cli_inputs, **kwargs): - super().__init__(**kwargs) - self.cli_inputs = cli_inputs - version = fields.Str(required=True) description = fields.Str() inputs = OrderedDictField(validate=validate.Length(min=1), @@ -53,6 +49,10 @@ def __init__(self, cli_inputs, **kwargs): class Meta: ordered = True + def __init__(self, cli_inputs, **kwargs): + super().__init__(**kwargs) + self.cli_inputs: Dict[Union[Tuple[str, str], str], str] = cli_inputs + @staticmethod def process_input(val, data): # processing for root-level pipeline inputs recursively @@ -96,17 +96,27 @@ def construct_pipeline(self, data, **kwargs): mappings[name] = (cls, adapter_count[adapter['adapter']]) # validating cli_inputs - for name, input in self.cli_inputs: - if name not in mappings: - raise ValidationError( - ['cli_inputs exception', f"invalid adapter name {name}. not found in config file"]) - if input not in mappings[name][0].inputs: - raise ValidationError(['cli_inputs exception', - f"invalid input {input} in {data['adapters'][name]['adapter']} for {name}"]) - # cli_inputs has higher priority and overwrites config_file data - if 'inputs' not in data['adapters'][name]: - data['adapters'][name]['inputs'] = OrderedDict() - data['adapters'][name]['inputs'][input] = self.cli_inputs[(name, input)] + for cli_input in self.cli_inputs: + if isinstance(cli_input, (tuple,list)): + name, arg = cli_input + + if name not in mappings: + raise ValidationError( + ['cli_inputs exception', f"invalid adapter name {name}. not found in config file"]) + if arg not in mappings[name][0].inputs: + raise ValidationError(['cli_inputs exception', + f"invalid input {arg} in {data['adapters'][name]['adapter']} for {name}"]) + # cli_inputs has higher priority and overwrites config_file data + if 'inputs' not in data['adapters'][name]: + data['adapters'][name]['inputs'] = OrderedDict() + data['adapters'][name]['inputs'][arg] = self.cli_inputs[(name, arg)] + else: + name = cli_input + if name in data['inputs']: + if isinstance(data['inputs'][name], (dict, OrderedDict)): + data['inputs'][name]['value'] = self.cli_inputs[name] + else: + data['inputs'][name] = self.cli_inputs[name] inputs = {} wired = [] diff --git a/dtran/dcat/scripts/upload_files_in_batch.py b/dtran/dcat/scripts/upload_files_in_batch.py index 53005406..97e8991a 100644 --- a/dtran/dcat/scripts/upload_files_in_batch.py +++ b/dtran/dcat/scripts/upload_files_in_batch.py @@ -8,14 +8,20 @@ def setup_owncloud(upload_dir): oc = owncloud.Client('https://files.mint.isi.edu/') - oc.login('datacatalog', 'sVMIryVWEx3Ec2') - oc.mkdir(upload_dir) + oc.login(os.environ['USERNAME'], os.environ['PASSWORD']) + try: + # https://github.com/owncloud/pyocclient/blob/master/owncloud/owncloud.py + # trying to look through the documentation. However, I didn't see a way to check if the directory exists + # before, so I just assume that if the operator fails, the directory is already there. + oc.mkdir(upload_dir) + except: + pass return oc def upload_to_mint_server(target_dir, target_filename, upload_url): upload_output = subprocess.check_output( - f"curl -sD - --user upload:HVmyqAPWDNuk5SmkLOK2 --upload-file {target_dir}/{target_filename} {upload_url}", + f"curl -sD - --user {os.environ['USERNAME']}:{os.environ['PASSWORD']} --upload-file {target_dir}/{target_filename} {upload_url}", shell=True, ) uploaded_url = f'https://{upload_output.decode("utf-8").split("https://")[-1]}' diff --git a/dtran/main.py b/dtran/main.py index b69bf1cb..27f90aaf 100644 --- a/dtran/main.py +++ b/dtran/main.py @@ -19,7 +19,7 @@ def exec_pipeline(ctx, config=None): Creates a pipeline and execute it based on given config and input(optional). To specify the input to pipeline, use (listed in ascending priority): 1) config file option: --config path_to_file - 2) arg params: e.g. --FuncName.Attr=value + 2) arg params: e.g. --FuncName.Attr=value, --InputName=value """ # Accept user-specified inputs: expect format of --key=value @@ -27,10 +27,13 @@ def exec_pipeline(ctx, config=None): for arg in ctx.args: try: key, value = arg[2:].split("=") - func_name, attr_name = key.split(".") - user_inputs[(func_name, attr_name)] = value + if key.find(".") != -1: + func_name, attr_name = key.split(".") + user_inputs[(func_name, attr_name)] = value + else: + user_inputs[key] = value except ValueError: - print(f"user input: '{arg}' should have format '--FuncName.Attr=value'") + print(f"user input: '{arg}' should have format '--FuncName.Attr=value' or --InputName=value") return parser = ConfigParser(user_inputs) diff --git a/examples/topoflow4/dev/run_gldas.py b/examples/topoflow4/dev/run_gldas.py new file mode 100644 index 00000000..057301ef --- /dev/null +++ b/examples/topoflow4/dev/run_gldas.py @@ -0,0 +1,85 @@ +import subprocess +import os +from pathlib import Path +from dateutil import parser +from datetime import timedelta, datetime +from dataclasses import dataclass +from tqdm.auto import tqdm + + +HOME_DIR = Path(os.path.abspath(os.environ['HOME_DIR'])) +DOWNLOAD_DIR = Path(os.path.abspath(os.environ['DATA_CATALOG_DOWNLOAD_DIR'])) + +arcsecs = [30, 60] +areas = { + "awash": "37.829583333333, 6.654583333333, 39.934583333333, 9.374583333333", + "baro": "34.221249999999, 7.362083333332, 36.450416666666, 9.503749999999", + "shebelle": "38.159583333333, 6.319583333333, 43.559583333333, 9.899583333333", + "ganale": "39.174583333333, 5.527916666666, 41.124583333333, 7.098749999999", + "guder": "37.149583333333, 8.596250000000, 38.266250000000, 9.904583333333", + "muger": "37.807916666667, 8.929583333333, 39.032916666667, 10.112916666667", + "beko": "35.241249999999, 6.967916666666, 35.992916666666, 7.502916666666", + "alwero": "34.206249999999, 7.415416666666, 35.249583333333, 8.143749999999" +} + +run_dir = HOME_DIR / "data" / "tf_gldas" +commands = [] + +def add_geotif_command(commands, start_time, end_time, geotiff_dir): + writegeotiff_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_writegeotiff.gldas_remote.yml") + cmd = f"""dotenv run python -m dtran.main exec_pipeline --config {writegeotiff_config_file} \ + --dataset.start_time={start_time} \ + --dataset.end_time={end_time} \ + --geotiff_writer.output_dir={geotiff_dir} \ + --geotiff_writer.skip_on_exist=true \ + """.strip() + commands.append(cmd) + + +def add_rts_command(commands, start_time, end_time, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area): + makerts_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_make_rts.yml") + cmd = f"""dotenv run python -m dtran.main exec_pipeline --config {makerts_config_file} \ + --topoflow.geotiff_files='{geotiff_dir}*/*.tif' \ + --topoflow.cropped_geotiff_dir={geotiff_dir_crop} \ + --topoflow.output_file={output_file} \ + --topoflow.skip_crop_on_exist=true \ + --topoflow.xres_arcsecs={arcsec} \ + --topoflow.yres_arcsecs={arcsec} \ + --topoflow.bounds="{area}" + """.strip() + commands.append(cmd) + + +for year in range(2016, 2020): + for month in range(1, 13): + s0 = parser.parse(f"{year}-{month:02d}-01T00:00:00") + if month == 12: + s1 = s0.replace(day=31, hour=23, minute=59, second=59) + else: + s1 = s0.replace(month=s0.month + 1) - timedelta(seconds=1) + + start_time = s0.isoformat() + end_time = s1.isoformat() + geotiff_dir = str(run_dir / str(year) / f"data_m{month:02d}") + add_geotif_command(commands, start_time, end_time, geotiff_dir) + + for area_name, area in areas.items(): + for arcsec in arcsecs: + geotiff_dir_crop = str(run_dir / str(year) / area_name / f"geotiff_crop_{arcsec}") + output_file = str(run_dir / str(year) / area_name / f"output_r{arcsec}_m{month:02d}.rts") + add_rts_command(commands, start_time, end_time, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area) + + start_time = f"{year}-01-01T00:00:00" + end_time = f"{year}-12-31T23:59:59" + geotiff_dir = str(run_dir / str(year) / f"data_m") + + for area_name, area in areas.items(): + for arcsec in arcsecs: + geotiff_dir_crop = str(run_dir / str(year) / area_name / f"geotiff_crop_{arcsec}") + output_file = str(run_dir / str(year) / area_name / f"output_r{arcsec}.rts") + add_rts_command(commands, start_time, end_time, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area) + + +commands = commands[:] +for cmd in tqdm(commands, desc="run commands"): + output = subprocess.check_output(cmd, shell=True, cwd=str(HOME_DIR)) \ No newline at end of file diff --git a/examples/topoflow4/dev/run_gldas.sh b/examples/topoflow4/dev/run_gldas.sh new file mode 100644 index 00000000..d5b76507 --- /dev/null +++ b/examples/topoflow4/dev/run_gldas.sh @@ -0,0 +1,49 @@ +#!/bin/bash + +# This bash scripts run the transformation to produce all GLDAS datasets. Should run it with dotenv + +if [[ -z "${HOME_DIR}" ]]; then + echo "Home directory is not defined. Exit" + exit -1 +fi + +# change the working directory to the current project +cd ${HOME_DIR} + +# define parameters +awash="37.829583333333, 6.654583333333, 39.934583333333, 9.374583333333" +baro="34.221249999999, 7.362083333332, 36.450416666666, 9.503749999999" +shebelle="38.159583333333, 6.319583333333, 43.559583333333, 9.899583333333" +ganale="39.174583333333, 5.527916666666, 41.124583333333, 7.098749999999" +guder="37.149583333333, 8.596250000000, 38.266250000000, 9.904583333333" +muger="37.807916666667, 8.929583333333, 39.032916666667, 10.112916666667" +beko="35.241249999999, 6.967916666666, 35.992916666666, 7.502916666666" +alwero="34.206249999999, 7.415416666666, 35.249583333333, 8.143749999999" +AREAS=("$awash" "$baro" "$shebelle" "$ganale" "$guder" "$muger" "$beko" "$alwero") +AREA_NAMES=("awash" "baro" "shebelle" "ganale" "guder" "muger" "beko" "alwero") + +# START_TIMES +# end_year=2019 + +ARCSECS=(30 60) + +# create a basic command +CONFIG_FILE=$HOME_DIR/examples/topoflow4/dev/tf_climate.gldas_remote.yml +RUN_DIR=$HOME_DIR/data/tf_gldas +CMD="python -m dtran.main exec_pipeline --config $CONFIG_FILE" + +for ((i=0;i<${#AREAS[@]};++i)); do + area=${AREAS[i]} + area_name=${AREA_NAMES[i]} + + for arcsec in "${ARCSECS}"; do + year=2008 + echo --GLDAS.start_time $year-01-01T00:00:00 \ + --GLDAS.end_time $year-01-01T00:00:00 \ + --geotiff_writer.output_dir $RUN_DIR/$year/geotiff \ + --topoflow.cropped_geotiff_dir $RUN_DIR/$year/${area_name}_geotiff \ + --topoflow.xres_arcsecs $arcsec \ + --topoflow.xres_arcsecs $arcsec \ + --topoflow.bounds "\"$area\"" + done +done diff --git a/examples/topoflow4/dev/tf_climate.gldas_remote.yml b/examples/topoflow4/dev/tf_climate.gldas_remote.yml index e965072d..1477ca1e 100644 --- a/examples/topoflow4/dev/tf_climate.gldas_remote.yml +++ b/examples/topoflow4/dev/tf_climate.gldas_remote.yml @@ -1,23 +1,22 @@ version: "1" adapters: - weather_data: + dataset: comment: | Weather dataset - adapter: funcs.DcatReadFunc + adapter: funcs.readers.dcat_read_func.DcatReadStreamFunc inputs: # gldas dataset_id: 5babae3f-c468-4e01-862e-8b201468e3b5 start_time: 2014-08-01 00:00:00 end_time: 2014-09-01 00:00:00 - lazy_load_enabled: false # override_drepr: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gldas.yml geotiff_writer: adapter: funcs.GeoTiffWriteFunc inputs: - dataset: $.weather_data.data + dataset: $.dataset.data variable_name: atmosphere_water__rainfall_mass_flux output_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/geotiff - tf_trans: + topoflow: adapter: funcs.topoflow.topoflow_climate.Topoflow4ClimateWriteFunc inputs: geotiff_files: $.geotiff_writer.output_files diff --git a/examples/topoflow4/dev/tf_climate_make_rts.yml b/examples/topoflow4/dev/tf_climate_make_rts.yml new file mode 100644 index 00000000..e63f3656 --- /dev/null +++ b/examples/topoflow4/dev/tf_climate_make_rts.yml @@ -0,0 +1,13 @@ +version: "1" +adapters: + topoflow: + adapter: funcs.topoflow.topoflow_climate.Topoflow4ClimateWriteFunc + inputs: + geotiff_files: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/geotiff/*.tif + cropped_geotiff_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/geotiff_crop + output_file: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/output.rts + bounds: "34.221249999999, 7.353749999999, 36.45458333333234, 9.503749999999" + xres_arcsecs: 60 + yres_arcsecs: 60 + # required for GLDAS (unit transformation) + unit_multiplier: 3600 \ No newline at end of file diff --git a/examples/topoflow4/dev/tf_climate_writegeotiff.gldas_remote.yml b/examples/topoflow4/dev/tf_climate_writegeotiff.gldas_remote.yml new file mode 100644 index 00000000..905ed43c --- /dev/null +++ b/examples/topoflow4/dev/tf_climate_writegeotiff.gldas_remote.yml @@ -0,0 +1,16 @@ +version: "1" +adapters: + dataset: + comment: | + Weather dataset + adapter: funcs.readers.dcat_read_func.DcatReadStreamFunc + inputs: + dataset_id: 5babae3f-c468-4e01-862e-8b201468e3b5 + start_time: 2014-08-01 00:00:00 + end_time: 2014-09-01 00:00:00 + geotiff_writer: + adapter: funcs.GeoTiffWriteFunc + inputs: + dataset: $.dataset.data + variable_name: atmosphere_water__rainfall_mass_flux + output_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/geotiff \ No newline at end of file diff --git a/examples/topoflow4/dev/verify_result.py b/examples/topoflow4/dev/verify_result.py new file mode 100644 index 00000000..d5804fb8 --- /dev/null +++ b/examples/topoflow4/dev/verify_result.py @@ -0,0 +1,58 @@ +import subprocess +import os +from pathlib import Path +from dateutil import parser +from datetime import timedelta, datetime +from dataclasses import dataclass +from tqdm.auto import tqdm +import numpy as np + +HOME_DIR = Path(os.path.abspath(os.environ['HOME_DIR'])) +DOWNLOAD_DIR = Path(os.path.abspath(os.environ['DATA_CATALOG_DOWNLOAD_DIR'])) + +run_dir = HOME_DIR / "data" / "tf_gldas" + + +def get_size(infile): + data = np.fromfile(infile, dtype=np.float32) + return data.shape[0] + +# gldas is 8 +n_files_per_day = 8 + +n_month_days = [ + (parser.parse(f'2000-{i+1:02}-01T00:00') - parser.parse(f'2000-{i:02}-01T00:00')).days + for i in range(1, 12) +] +n_month_days.append(31) + +for year in range(2019, 2020): + n_year_days = (parser.parse(f'{year+1}-01-01T00:00') - parser.parse(f'{year}-01-01T00:00')).days + n_month_days[1] = (parser.parse(f'{year}-03-01T00:00') - parser.parse(f'{year}-02-01T00:00')).days + + for area_dir in (run_dir / str(year)).iterdir(): + area_name = area_dir.name + if area_name.startswith("data_m"): + continue + print(f"check {year} area {area_name}") + + for arcsec in [30, 60]: + with open(area_dir / f"output_r{arcsec}.rti", "r") as f: + lines = f.readlines() + n_cols = -1 + n_rows = -1 + for line in lines: + if line.startswith("Number of columns:"): + n_cols = int(line.replace("Number of columns:", "").strip()) + if line.startswith("Number of rows:"): + n_rows = int(line.replace("Number of rows:", "").strip()) + assert n_cols != -1 and n_rows != -1 + + infile = area_dir / f"output_r{arcsec}.rts" + size = get_size(infile) + assert size == n_rows * n_cols * n_files_per_day * n_year_days, f"Incorrect {infile}" + + for month in range(1, 13): + infile = area_dir / f"output_r{arcsec}_m{month:02}.rts" + size = get_size(infile) + assert size == n_rows * n_cols * n_files_per_day * n_month_days[month-1] \ No newline at end of file diff --git a/funcs/readers/dcat_read_func.py b/funcs/readers/dcat_read_func.py index 40accaef..c715c225 100644 --- a/funcs/readers/dcat_read_func.py +++ b/funcs/readers/dcat_read_func.py @@ -10,29 +10,20 @@ from collections import OrderedDict from datetime import datetime from pathlib import Path -from typing import Union, Dict, Optional +from typing import Union, Dict from functools import partial from playhouse.kv import KeyValue -from peewee import ( - SqliteDatabase, - Model, - UUIDField, - IntegerField, - BooleanField, - BigIntegerField, - DoesNotExist, -) - +from peewee import SqliteDatabase, Model, UUIDField, IntegerField, BooleanField, BigIntegerField, DoesNotExist from drepr import DRepr from drepr.outputs import ArrayBackend, GraphBackend from dtran.argtype import ArgType from dtran.backend import ShardedBackend, ShardedClassID, LazyLoadBackend from dtran.dcat.api import DCatAPI from dtran.ifunc import IFunc, IFuncType -from dtran.metadata import Metadata +from tqdm.auto import tqdm DATA_CATALOG_DOWNLOAD_DIR = os.path.abspath(os.environ["DATA_CATALOG_DOWNLOAD_DIR"]) -if os.environ["NO_CHECK_CERTIFICATE"].lower().strip() == "true": +if os.environ['NO_CHECK_CERTIFICATE'].lower().strip() == 'true': DOWNLOAD_CMD = "wget --no-check-certificate" else: DOWNLOAD_CMD = "wget" @@ -40,12 +31,12 @@ Path(DATA_CATALOG_DOWNLOAD_DIR).mkdir(exist_ok=True, parents=True) UNITS_MAPPING = { - "PB": 1 << 50, - "TB": 1 << 40, - "GB": 1 << 30, - "MB": 1 << 20, - "KB": 1 << 10, - "B": 1, + 'PB': 1 << 50, + 'TB': 1 << 40, + 'GB': 1 << 30, + 'MB': 1 << 20, + 'KB': 1 << 10, + 'B': 1 } @@ -56,20 +47,16 @@ class Resource(Model): size = BigIntegerField(default=0) class Meta: - database = SqliteDatabase( - os.path.join(DATA_CATALOG_DOWNLOAD_DIR, "dcat_read_func.db"), timeout=10 - ) + database = SqliteDatabase(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, 'dcat_read_func.db'), timeout=10) class ResourceManager: instance = None def __init__(self): - self.max_capacity = 200 * UNITS_MAPPING["MB"] - self.max_clear_size = 100 * UNITS_MAPPING["MB"] - assert ( - self.max_capacity >= self.max_clear_size - ), "max_capacity cannot be less than max_clear_size" + self.max_capacity = 32 * UNITS_MAPPING['GB'] + self.max_clear_size = 20 * UNITS_MAPPING['GB'] + assert self.max_capacity >= self.max_clear_size, "max_capacity cannot be less than max_clear_size" self.poll_interval = 10 self.compressed_resource_types = {".zip", ".tar.gz", ".tar"} self.db = Resource._meta.database @@ -77,30 +64,19 @@ def __init__(self): self.db.create_tables([Resource], safe=True) self.db.close() self.kv = KeyValue(database=self.db, value_field=BigIntegerField()) - with self.db.atomic("EXCLUSIVE"): + with self.db.atomic('EXCLUSIVE'): # initializing current_size of DATA_CATALOG_DOWNLOAD_DIR in the database - if "current_size" not in self.kv: - self.kv["current_size"] = sum( - f.stat().st_size for f in Path(DATA_CATALOG_DOWNLOAD_DIR).rglob("*") - ) + if 'current_size' not in self.kv: + self.kv['current_size'] = sum(f.stat().st_size for f in Path(DATA_CATALOG_DOWNLOAD_DIR).rglob('*')) else: for resource in Resource.select(): if resource.is_downloading: continue - path = Path( - os.path.join( - DATA_CATALOG_DOWNLOAD_DIR, - str(resource.resource_id) + ".dat", - ) - ) + path = Path(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id) + '.dat')) if not path.exists(): - path = Path( - os.path.join( - DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id) - ) - ) + path = Path(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id))) if not path.exists(): - self.kv["current_size"] -= resource.size + self.kv['current_size'] -= resource.size resource.delete_instance() @staticmethod @@ -109,37 +85,26 @@ def get_instance(): ResourceManager.instance = ResourceManager() return ResourceManager.instance - def download( - self, - resource_id: str, - resource_metadata: Dict[str, str], - should_redownload: bool, - ) -> str: - is_compressed = ( - resource_metadata["resource_type"] in self.compressed_resource_types - ) + def download(self, resource_id: str, resource_metadata: Dict[str, str], should_redownload: bool) -> str: + is_compressed = resource_metadata['resource_type'] in self.compressed_resource_types if is_compressed: - if resource_metadata["resource_type"].startswith(".tar"): + if resource_metadata['resource_type'].startswith(".tar"): raise NotImplementedError() path = os.path.join(DATA_CATALOG_DOWNLOAD_DIR, resource_id) else: - path = os.path.join(DATA_CATALOG_DOWNLOAD_DIR, resource_id + ".dat") + path = os.path.join(DATA_CATALOG_DOWNLOAD_DIR, resource_id + '.dat') download = True - with self.db.atomic("EXCLUSIVE"): + with self.db.atomic('EXCLUSIVE'): try: # if resource already exists - resource = ( - Resource.select().where(Resource.resource_id == resource_id).get() - ) + resource = Resource.select().where(Resource.resource_id == resource_id).get() resource.ref_count += 1 if not should_redownload: # TODO: comparing timestamp before skipping download download = False except DoesNotExist: - resource = Resource.create( - resource_id=resource_id, ref_count=1, is_downloading=True, size=0 - ) + resource = Resource.create(resource_id=resource_id, ref_count=1, is_downloading=True, size=0) if download: required_size = 0 @@ -147,37 +112,25 @@ def download( # adjust required_size when the resource is to be redownloaded required_size -= resource.size try: - resource.size = int( - requests.head(resource_metadata["resource_data_url"]).headers[ - "Content-Length" - ] - ) + resource.size = int(requests.head(resource_metadata['resource_data_url']).headers['Content-Length']) required_size += resource.size except KeyError: pass - if self.max_capacity - self.kv["current_size"] < required_size: + if self.max_capacity - self.kv['current_size'] < required_size: # clear files to make space - self.kv["current_size"] -= self.clear() - assert ( - self.max_capacity - self.kv["current_size"] >= required_size - ), "Not enough disk space" - self.kv["current_size"] += required_size + self.kv['current_size'] -= self.clear() + assert self.max_capacity - self.kv['current_size'] >= required_size, "Not enough disk space" + self.kv['current_size'] += required_size resource.save() if download: if resource.ref_count > 1: # block until all other processes accessing the resource are finished - DcatReadFunc.logger.debug( - f"Waiting for some other process/thread to free resource {resource_id} ..." - ) + DcatReadFunc.logger.debug(f"Waiting for some other process/thread to free resource {resource_id} ...") while resource.ref_count > 1: time.sleep(self.poll_interval) - with self.db.atomic("EXCLUSIVE"): - resource = ( - Resource.select() - .where(Resource.resource_id == resource_id) - .get() - ) + with self.db.atomic('EXCLUSIVE'): + resource = Resource.select().where(Resource.resource_id == resource_id).get() if resource.ref_count == 1: # setting is_downloading before redownload resource.is_downloading = True @@ -189,85 +142,52 @@ def download( Path(path).unlink() DcatReadFunc.logger.debug(f"Downloading resource {resource_id} ...") if is_compressed: - temp_path = path + resource_metadata["resource_type"] - subprocess.check_call( - f"wget -q \"{resource_metadata['resource_data_url']}\" -O {temp_path}", - shell=True, - close_fds=False, - ) - self.uncompress(resource_metadata["resource_type"], path) + temp_path = path + resource_metadata['resource_type'] + subprocess.check_call(f"wget \"{resource_metadata['resource_data_url']}\" -O {temp_path}", shell=True) + self.uncompress(resource_metadata['resource_type'], path) # adjust required_size when the resource is compressed required_size = -resource.size - required_size += ( - sum(f.stat().st_size for f in Path(path).rglob("*")) - + Path(path).stat().st_size - ) + required_size += sum(f.stat().st_size for f in Path(path).rglob('*')) + Path(path).stat().st_size Path(temp_path).unlink() else: - subprocess.check_call( - f"wget -q \"{resource_metadata['resource_data_url']}\" -O {path}", - shell=True, - close_fds=False, - ) + subprocess.check_call(f"wget \"{resource_metadata['resource_data_url']}\" -O {path}", shell=True) required_size = 0 - with self.db.atomic("EXCLUSIVE"): - self.kv["current_size"] += required_size - resource = ( - Resource.select().where(Resource.resource_id == resource_id).get() - ) + with self.db.atomic('EXCLUSIVE'): + self.kv['current_size'] += required_size + resource = Resource.select().where(Resource.resource_id == resource_id).get() resource.size += required_size resource.is_downloading = False resource.save() else: - DcatReadFunc.logger.debug( - f"Skipping resource {resource_id}, found in cache" - ) + DcatReadFunc.logger.debug(f"Skipping resource {resource_id}, found in cache") if resource.is_downloading: # block until some other process is done downloading the resource - DcatReadFunc.logger.debug( - f"Waiting for other process/thread to finish downloading resource {resource_id} ..." - ) + DcatReadFunc.logger.debug(f"Waiting for other process/thread to finish downloading resource {resource_id} ...") while resource.is_downloading: time.sleep(self.poll_interval) - with self.db.atomic("EXCLUSIVE"): - resource = ( - Resource.select() - .where(Resource.resource_id == resource_id) - .get() - ) + with self.db.atomic('EXCLUSIVE'): + resource = Resource.select().where(Resource.resource_id == resource_id).get() return self.path(resource_id, path, is_compressed) def unlink(self, resource_id): - with self.db.atomic("EXCLUSIVE"): - resource = ( - Resource.select().where(Resource.resource_id == resource_id).get() - ) + with self.db.atomic('EXCLUSIVE'): + resource = Resource.select().where(Resource.resource_id == resource_id).get() resource.ref_count -= 1 resource.save() def clear(self) -> int: size = 0 - for resource in ( - Resource.select().where(Resource.ref_count == 0).order_by(Resource.id) - ): + for resource in Resource.select().where(Resource.ref_count == 0).order_by(Resource.id): DcatReadFunc.logger.debug(f"Clearing resource {resource.resource_id}") - path = Path( - os.path.join( - DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id) + ".dat" - ) - ) + path = Path(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id) + '.dat')) if path.exists(): size += path.stat().st_size path.unlink() else: - path = Path( - os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id)) - ) - size += ( - sum(f.stat().st_size for f in path.rglob("*")) + path.stat().st_size - ) + path = Path(os.path.join(DATA_CATALOG_DOWNLOAD_DIR, str(resource.resource_id))) + size += sum(f.stat().st_size for f in path.rglob('*')) + path.stat().st_size shutil.rmtree(str(path)) resource.delete_instance() if size >= self.max_clear_size: @@ -275,30 +195,23 @@ def clear(self) -> int: return size def uncompress(self, resource_type: str, path: Union[Path, str]): - subprocess.check_call( - f"unzip {path + resource_type} -d {path}", shell=True, close_fds=False - ) + subprocess.check_call(f"unzip {path + resource_type} -d {path}", shell=True) # flatten the structure (max two levels) for fpath in Path(path).iterdir(): if fpath.is_dir(): for sub_file in fpath.iterdir(): new_file = os.path.join(path, sub_file.name) if os.path.exists(new_file): - raise Exception( - "Invalid resource. Shouldn't overwrite existing file" - ) + raise Exception("Invalid resource. Shouldn't overwrite existing file") os.rename(str(sub_file), new_file) shutil.rmtree(str(fpath)) - def path( - self, resource_id: str, path: Union[Path, str], is_compressed: bool - ) -> str: + def path(self, resource_id: str, path: Union[Path, str], is_compressed: bool) -> str: if not is_compressed: return path # we need to look in the folder and find the resource files = [ - fpath - for fpath in Path(path).iterdir() + fpath for fpath in Path(path).iterdir() if fpath.is_file() and not fpath.name.startswith(".") ] if len(files) == 0: @@ -328,139 +241,95 @@ class DcatReadFunc(IFunc): "should_redownload": ArgType.Boolean(optional=True), "override_drepr": ArgType.String(optional=True), } - outputs = { - "data": ArgType.DataSet(None), - "data_path": ArgType.ListString(optional=True), - } + outputs = {"data": ArgType.DataSet(None), "data_path": ArgType.ListString(optional=True)} example = { "dataset_id": "ea0e86f3-9470-4e7e-a581-df85b4a7075d", "start_time": "2020-03-02T12:30:55", "end_time": "2020-03-02T12:30:55", "lazy_load_enabled": "False", "should_redownload": "False", - "override_drepr": "/tmp/model.yml", + "override_drepr": "/tmp/model.yml" } logger = logging.getLogger(__name__) - def __init__( - self, - dataset_id: str, - start_time: datetime = None, - end_time: datetime = None, - lazy_load_enabled: bool = False, - should_redownload: bool = False, - override_drepr: str = None, - ): + def __init__(self, + dataset_id: str, + start_time: datetime = None, + end_time: datetime = None, + lazy_load_enabled: bool = False, + should_redownload: bool = False, + override_drepr: str = None + ): self.dataset_id = dataset_id self.lazy_load_enabled = lazy_load_enabled self.should_redownload = should_redownload self.resource_manager = ResourceManager.get_instance() dataset = DCatAPI.get_instance().find_dataset_by_id(dataset_id) - assert ("resource_repr" in dataset["metadata"]) or ( - "dataset_repr" in dataset["metadata"] - ), "Dataset is missing both 'resource_repr' and 'dataset_repr'" - assert not ( - ("resource_repr" in dataset["metadata"]) - and ("dataset_repr" in dataset["metadata"]) - ), "Dataset has both 'resource_repr' and 'dataset_repr'" + assert ('resource_repr' in dataset['metadata']) or ('dataset_repr' in dataset['metadata']), \ + "Dataset is missing both 'resource_repr' and 'dataset_repr'" + assert not (('resource_repr' in dataset['metadata']) and ('dataset_repr' in dataset['metadata'])), \ + "Dataset has both 'resource_repr' and 'dataset_repr'" - resources = DCatAPI.get_instance().find_resources_by_dataset_id( - dataset_id, start_time, end_time - ) + resources = DCatAPI.get_instance().find_resources_by_dataset_id(dataset_id, start_time, end_time) self.resources = OrderedDict() - if "resource_repr" in dataset["metadata"]: + if 'resource_repr' in dataset['metadata']: if override_drepr is not None: self.drepr = DRepr.parse_from_file(override_drepr) else: - self.drepr = DRepr.parse(dataset["metadata"]["resource_repr"]) + self.drepr = DRepr.parse(dataset['metadata']['resource_repr']) for resource in resources: - self.resources[resource["resource_id"]] = { - key: resource[key] for key in {"resource_data_url", "resource_type"} - } - self.repr_type = "resource_repr" + self.resources[resource['resource_id']] = {key: resource[key] for key in + {'resource_data_url', 'resource_type'}} + self.repr_type = 'resource_repr' else: # TODO: fix me!! assert len(resources) == 1 - self.resources[resources[0]["resource_id"]] = { - key: resources[0][key] for key in {"resource_data_url", "resource_type"} - } + self.resources[resources[0]['resource_id']] = {key: resources[0][key] for key in + {'resource_data_url', 'resource_type'}} if override_drepr is not None: self.drepr = DRepr.parse_from_file(override_drepr) else: - self.drepr = DRepr.parse(dataset["metadata"]["dataset_repr"]) - self.repr_type = "dataset_repr" + self.drepr = DRepr.parse(dataset['metadata']['dataset_repr']) + self.repr_type = 'dataset_repr' self.logger.debug(f"Found key '{self.repr_type}'") def exec(self) -> dict: # TODO: fix me! incorrect way to choose backend - if ( - self.get_preference("data") is None - or self.get_preference("data") == "array" - ): + if self.get_preference("data") is None or self.get_preference("data") == 'array': backend = ArrayBackend else: backend = GraphBackend if self.lazy_load_enabled: - if self.repr_type == "dataset_repr": + if self.repr_type == 'dataset_repr': resource_id, resource_metadata = list(self.resources.items())[0] - return { - "data": LazyLoadBackend( - backend, - self.drepr, - partial( - self.resource_manager.download, - resource_id, - resource_metadata, - self.should_redownload, - ), - self.resource_manager.unlink, - ) - } + return {"data": LazyLoadBackend(backend, self.drepr, partial(self.resource_manager.download, + resource_id, resource_metadata, + self.should_redownload), + self.resource_manager.unlink)} else: dataset = ShardedBackend(len(self.resources)) for resource_id, resource_metadata in self.resources.items(): - dataset.add( - LazyLoadBackend( - backend, - self.drepr, - partial( - self.resource_manager.download, - resource_id, - resource_metadata, - self.should_redownload, - ), - self.resource_manager.unlink, - partial(ShardedClassID, dataset.count), - ) - ) + dataset.add(LazyLoadBackend(backend, self.drepr, partial(self.resource_manager.download, + resource_id, resource_metadata, + self.should_redownload), + self.resource_manager.unlink, partial(ShardedClassID, dataset.count))) return {"data": dataset} else: # data_path is location of the resources in disk, for pipeline that wants to download the file - if self.repr_type == "dataset_repr": + if self.repr_type == 'dataset_repr': resource_id, resource_metadata = list(self.resources.items())[0] - resource_file = self.resource_manager.download( - resource_id, resource_metadata, self.should_redownload - ) - return { - "data": backend.from_drepr(self.drepr, resource_file), - "data_path": [resource_file], - } + resource_file = self.resource_manager.download(resource_id, resource_metadata, self.should_redownload) + return {"data": backend.from_drepr(self.drepr, resource_file), "data_path": [resource_file]} else: dataset = ShardedBackend(len(self.resources)) data_path = [] for resource_id, resource_metadata in self.resources.items(): - resource_file = self.resource_manager.download( - resource_id, resource_metadata, self.should_redownload - ) - dataset.add( - backend.from_drepr( - self.drepr, resource_file, dataset.inject_class_id - ) - ) + resource_file = self.resource_manager.download(resource_id, resource_metadata, self.should_redownload) + dataset.add(backend.from_drepr(self.drepr, resource_file, dataset.inject_class_id)) data_path.append(data_path) return {"data": dataset, "data_path": data_path} @@ -472,7 +341,97 @@ def __del__(self): def validate(self) -> bool: return True - def change_metadata( - self, metadata: Optional[Dict[str, Metadata]] - ) -> Dict[str, Metadata]: - return metadata + +# TODO: this is a temporary class, should be remove before move on with existing pipeline +class DcatReadStreamFunc(IFunc): + id = "dcat_read_stream_func" + description = """ An entry point in the pipeline. + Fetches a dataset and its metadata from the MINT Data-Catalog. + """ + func_type = IFuncType.READER + friendly_name: str = "Data Catalog Reader" + inputs = { + "dataset_id": ArgType.String, + "start_time": ArgType.DateTime(optional=True), + "end_time": ArgType.DateTime(optional=True), + "should_redownload": ArgType.Boolean(optional=True), + "override_drepr": ArgType.String(optional=True), + } + outputs = {"data": ArgType.DataSet(None), "data_path": ArgType.ListString(optional=True)} + example = { + "dataset_id": "ea0e86f3-9470-4e7e-a581-df85b4a7075d", + "start_time": "2020-03-02T12:30:55", + "end_time": "2020-03-02T12:30:55", + "should_redownload": "False", + "override_drepr": "/tmp/model.yml" + } + logger = logging.getLogger(__name__) + + def __init__(self, + dataset_id: str, + start_time: datetime = None, + end_time: datetime = None, + should_redownload: bool = False, + override_drepr: str = None + ): + self.dataset_id = dataset_id + self.should_redownload = should_redownload + self.resource_manager = ResourceManager.get_instance() + dataset = DCatAPI.get_instance().find_dataset_by_id(dataset_id) + + assert ('resource_repr' in dataset['metadata']) or ('dataset_repr' in dataset['metadata']), \ + "Dataset is missing both 'resource_repr' and 'dataset_repr'" + assert not (('resource_repr' in dataset['metadata']) and ('dataset_repr' in dataset['metadata'])), \ + "Dataset has both 'resource_repr' and 'dataset_repr'" + + resources = DCatAPI.get_instance().find_resources_by_dataset_id(dataset_id, start_time, end_time) + + self.resources = OrderedDict() + if 'resource_repr' in dataset['metadata']: + if override_drepr is not None: + self.drepr = DRepr.parse_from_file(override_drepr) + else: + self.drepr = DRepr.parse(dataset['metadata']['resource_repr']) + for resource in resources: + self.resources[resource['resource_id']] = {key: resource[key] for key in + {'resource_data_url', 'resource_type'}} + self.repr_type = 'resource_repr' + else: + # TODO: fix me!! + assert len(resources) == 1 + self.resources[resources[0]['resource_id']] = {key: resources[0][key] for key in + {'resource_data_url', 'resource_type'}} + if override_drepr is not None: + self.drepr = DRepr.parse_from_file(override_drepr) + else: + self.drepr = DRepr.parse(dataset['metadata']['dataset_repr']) + self.repr_type = 'dataset_repr' + + self.logger.debug(f"Found key '{self.repr_type}'") + + async def exec(self) -> dict: + # TODO: fix me! incorrect way to choose backend + if self.get_preference("data") is None or self.get_preference("data") == 'array': + backend = ArrayBackend + else: + backend = GraphBackend + + if self.repr_type == 'dataset_repr': + resource_id, resource_metadata = list(self.resources.items())[0] + resource_file = self.resource_manager.download(resource_id, resource_metadata, self.should_redownload) + yield {"data": backend.from_drepr(self.drepr, resource_file), "data_path": [resource_file]} + else: + # data_path is location of the resources in disk, for pipeline that wants to download the file + for resource_id, resource_metadata in tqdm(self.resources.items(), total=len(self.resources), desc='dcat_read'): + resource_file = self.resource_manager.download(resource_id, resource_metadata, self.should_redownload) + yield { + "data": backend.from_drepr(self.drepr, resource_file), + "data_path": [resource_file] + } + + def __del__(self): + for resource_id, resource_metadata in self.resources.items(): + self.resource_manager.unlink(resource_id) + + def validate(self) -> bool: + return True diff --git a/funcs/readers/read_func.py b/funcs/readers/read_func.py index c96b50e8..21d6acbb 100644 --- a/funcs/readers/read_func.py +++ b/funcs/readers/read_func.py @@ -56,3 +56,48 @@ def exec(self) -> dict: def validate(self) -> bool: return True + + +class ReadStreamFunc(IFunc): + id = "read_func" + description = """ An entry point in the pipeline. + Reads an input file (or multiple files) and a yml file describing the D-REPR layout of each file. + Return a Dataset object + """ + friendly_name: str = "Local File Reader" + func_type = IFuncType.READER + inputs = {"repr_file": ArgType.FilePath, "resource_path": ArgType.FilePath} + outputs = {"data": ArgType.DataSet(None)} + example = { + "repr_file": "./wfp_food_prices_south-sudan.repr.yml", + "resources": "./wfp_food_prices_south-sudan.csv", + } + + def __init__(self, repr_file: Union[str, Path], resource_path: Union[str, Path]): + resource_path = str(resource_path) + + self.repr = DRepr.parse_from_file(str(repr_file)) + self.resources = glob.glob(resource_path) + + assert len(self.resources) > 0 + + def exec(self) -> dict: + if self.get_preference("data") is None or self.get_preference("data") == "array": + backend = ArrayBackend + else: + backend = GraphBackend + + if len(self.resources) == 1: + return { + "data": backend.from_drepr(self.repr, self.resources[0]) + } + else: + dataset = ShardedBackend(len(self.resources)) + for resource in self.resources: + dataset.add( + backend.from_drepr(self.repr, resource, dataset.inject_class_id) + ) + return {"data": dataset} + + def validate(self) -> bool: + return True diff --git a/funcs/topoflow/topoflow/__init__.py b/funcs/topoflow/topoflow/__init__.py index 85299cc8..cd326d2f 100644 --- a/funcs/topoflow/topoflow/__init__.py +++ b/funcs/topoflow/topoflow/__init__.py @@ -1,5 +1,5 @@ -SILENT = False +SILENT = True if not(SILENT): print('Importing TopoFlow 3.6 packages:') print(' topoflow.utils') diff --git a/funcs/topoflow/topoflow/components/tests/__init__.py b/funcs/topoflow/topoflow/components/tests/__init__.py index 85299cc8..cd326d2f 100644 --- a/funcs/topoflow/topoflow/components/tests/__init__.py +++ b/funcs/topoflow/topoflow/components/tests/__init__.py @@ -1,5 +1,5 @@ -SILENT = False +SILENT = True if not(SILENT): print('Importing TopoFlow 3.6 packages:') print(' topoflow.utils') diff --git a/funcs/topoflow/topoflow/framework/emeli.py b/funcs/topoflow/topoflow/framework/emeli.py index fb6f5b8d..0158a442 100644 --- a/funcs/topoflow/topoflow/framework/emeli.py +++ b/funcs/topoflow/topoflow/framework/emeli.py @@ -187,7 +187,7 @@ parent_dir = parent_dir + os.sep examples_dir = examples_dir + os.sep -SILENT = False +SILENT = True if not(SILENT): # print ' ' print('Paths for this package:') diff --git a/funcs/topoflow/topoflow/framework/emeli_with_cfunits.py b/funcs/topoflow/topoflow/framework/emeli_with_cfunits.py index ea281b4a..9be17881 100644 --- a/funcs/topoflow/topoflow/framework/emeli_with_cfunits.py +++ b/funcs/topoflow/topoflow/framework/emeli_with_cfunits.py @@ -183,7 +183,7 @@ parent_dir = parent_dir + os.sep examples_dir = examples_dir + os.sep -SILENT = False +SILENT = True if not(SILENT): # print ' ' print('Paths for this package:') diff --git a/funcs/topoflow/topoflow_climate.py b/funcs/topoflow/topoflow_climate.py index 6393e982..21328e74 100644 --- a/funcs/topoflow/topoflow_climate.py +++ b/funcs/topoflow/topoflow_climate.py @@ -24,13 +24,14 @@ class Topoflow4ClimateWriteFunc(IFunc): description = '''A model-specific transformation. Prepare the topoflow RTS & RTI files. ''' inputs = { - "geotiff_files": ArgType.ListString, + "geotiff_files": ArgType.ListOrOneString, "cropped_geotiff_dir": ArgType.String, "output_file": ArgType.String, "bounds": ArgType.String, "xres_arcsecs": ArgType.Number, "yres_arcsecs": ArgType.Number, - "unit_multiplier": ArgType.Number(optional=True) + "unit_multiplier": ArgType.Number(optional=True), + "skip_crop_on_exist": ArgType.Boolean } outputs = {"output_file": ArgType.String} friendly_name: str = "Topoflow Climate" @@ -45,7 +46,7 @@ class Topoflow4ClimateWriteFunc(IFunc): "unit_multiplier": 1 } - def __init__(self, geotiff_files: List[str], cropped_geotiff_dir: str, output_file: str, bounds: str, xres_arcsecs: int, yres_arcsecs: int, unit_multiplier: float=1): + def __init__(self, geotiff_files: Union[str, List[str]], cropped_geotiff_dir: str, output_file: str, bounds: str, xres_arcsecs: int, yres_arcsecs: int, unit_multiplier: float=1, skip_crop_on_exist: bool=False): x_min, y_min, x_max, y_max = [float(x.strip()) for x in bounds.split(",")] assert x_max > x_min and y_min < y_max self.bounding_box = BoundingBox(x_min, y_min, x_max, y_max) @@ -54,9 +55,10 @@ def __init__(self, geotiff_files: List[str], cropped_geotiff_dir: str, output_fi self.yres_arcsecs = yres_arcsecs self.output_file = os.path.abspath(output_file) if isinstance(geotiff_files, str): - geotiff_files = glob.glob(self.geotiff_files) + geotiff_files = glob.glob(geotiff_files) self.geotiff_files = geotiff_files self.cropped_geotiff_dir = os.path.abspath(cropped_geotiff_dir) + self.skip_crop_on_exist = skip_crop_on_exist if not os.path.exists(self.cropped_geotiff_dir): Path(self.cropped_geotiff_dir).mkdir(exist_ok=True, parents=True) @@ -65,7 +67,7 @@ def __init__(self, geotiff_files: List[str], cropped_geotiff_dir: str, output_fi Path(self.cropped_geotiff_dir).mkdir(exist_ok=True, parents=True) Path(output_file).parent.mkdir(exist_ok=True, parents=True) - assert not os.path.exists(output_file) + assert not os.path.exists(output_file), output_file def exec(self) -> dict: if self.output_file.endswith(".zip"): @@ -74,13 +76,15 @@ def exec(self) -> dict: else: rts_file = self.output_file - create_rts_rti(self.geotiff_files, rts_file, self.cropped_geotiff_dir, self.bounding_box, self.xres_arcsecs, self.yres_arcsecs, self.unit_multiplier) + create_rts_rti(self.geotiff_files, rts_file, self.cropped_geotiff_dir, self.bounding_box, self.xres_arcsecs, self.yres_arcsecs, self.unit_multiplier, self.skip_crop_on_exist) if self.output_file.endswith(".zip"): # compress the outfile with ZipFile(self.output_file, 'w') as z: z.write(rts_file, os.path.basename(rts_file)) z.write(rti_file, os.path.basename(rti_file)) + os.remove(rts_file) + os.remove(rti_file) return {"output_file": self.output_file} def validate(self) -> bool: diff --git a/funcs/topoflow/topoflow_funcs.py b/funcs/topoflow/topoflow_funcs.py index 87dea2e3..e3f326b2 100644 --- a/funcs/topoflow/topoflow_funcs.py +++ b/funcs/topoflow/topoflow_funcs.py @@ -23,7 +23,7 @@ def crop_geotiff(args): return True -def create_rts_rti(tif_files, out_file, crop_dir: str, out_bounds: BoundingBox, out_xres_sec: int, out_yres_sec: int, unit_multiplier: float): +def create_rts_rti(tif_files, out_file, crop_dir: str, out_bounds: BoundingBox, out_xres_sec: int, out_yres_sec: int, unit_multiplier: float, skip_crop_on_exist: bool): """Create RTS file from TIF files. Names of TIF files must be sorted by time""" assert out_file.endswith(".rts") and len(out_file.split(".rts")) == 2 assert len(tif_files) > 0 @@ -40,6 +40,8 @@ def create_rts_rti(tif_files, out_file, crop_dir: str, out_bounds: BoundingBox, # ])) # print(res) for tif_file, out_crop_file in tqdm(zip(tif_files, out_crop_files)): + if skip_crop_on_exist and os.path.exists(out_crop_file): + continue args = (tif_file, out_crop_file, out_bounds, out_xres_sec, out_yres_sec) assert crop_geotiff(args) @@ -62,7 +64,7 @@ def create_rts_rti(tif_files, out_file, crop_dir: str, out_bounds: BoundingBox, def create_rti(tif_file, out_file): """Create RTI from the TIF file""" - import_grid.read_from_geotiff(tif_file, REPORT=True, rti_file=out_file) + import_grid.read_from_geotiff(tif_file, REPORT=False, rti_file=out_file) if __name__ == '__main__': diff --git a/funcs/writers/geotiff_write_func.py b/funcs/writers/geotiff_write_func.py index 69966de2..770ec0fe 100644 --- a/funcs/writers/geotiff_write_func.py +++ b/funcs/writers/geotiff_write_func.py @@ -1,7 +1,6 @@ import os from pathlib import Path from typing import Dict, Optional, Union -from zipfile import ZipFile from drepr.outputs.base_output_sm import BaseOutputSM from datetime import datetime, timezone @@ -17,48 +16,39 @@ class GeoTiffWriteFunc(IFunc): inputs = { "dataset": ArgType.DataSet(None), "variable_name": ArgType.String, - "output_path": ArgType.String, + "output_dir": ArgType.String, + "skip_on_exist": ArgType.Boolean, } outputs = { "output_files": ArgType.ListString } - def __init__(self, dataset: BaseOutputSM, variable_name: str, output_path: Union[str, Path]): + def __init__(self, dataset: BaseOutputSM, variable_name: str, output_dir: Union[str, Path], skip_on_exist: bool=False): self.dataset = dataset self.variable_name = variable_name - self.output_path = os.path.abspath(str(output_path)) + self.output_dir = os.path.abspath(str(output_dir)) + self.skip_on_exist = skip_on_exist - if self.output_path.endswith(".zip"): - self.dir_path = self.output_path[:-4] - else: - self.dir_path = self.output_path - - if not os.path.exists(self.output_path): - Path(self.output_path).mkdir(exist_ok=True, parents=True) + if not os.path.exists(self.output_dir): + Path(self.output_dir).mkdir(exist_ok=True, parents=True) def exec(self): rasters = CroppingTransFunc.extract_raster(self.dataset, self.variable_name) rasters = sorted(rasters, key=lambda x: x['timestamp']) - outfiles = [ - os.path.join(self.dir_path, - datetime.fromtimestamp(raster['timestamp'], tz=timezone.utc).strftime(f"%Y%m%d%H%M%S.{i}.tif")) + os.path.join(self.output_dir, datetime.fromtimestamp(raster['timestamp'], tz=timezone.utc).strftime(f"%Y%m%d%H%M%S.{i}.tif")) for i, raster in enumerate(rasters) ] + for outfile, raster in zip(outfiles, rasters): + if self.skip_on_exist and os.path.exists(outfile): + continue raster['raster'].to_geotiff(outfile) - if self.output_path.endswith(".zip"): - # compress the outfile - with ZipFile(self.output_path, 'w') as z: - for outfile in outfiles: - z.write(outfile, os.path.basename(outfile)) - return {"output_files": [self.output_path]} - else: - return {"output_files": outfiles} + return {"output_files": outfiles} def validate(self) -> bool: return True def change_metadata(self, metadata: Optional[Dict[str, Metadata]]) -> Dict[str, Metadata]: - return metadata + return metadata \ No newline at end of file diff --git a/uploaded.json b/uploaded.json new file mode 100644 index 00000000..a948e42f --- /dev/null +++ b/uploaded.json @@ -0,0 +1,10 @@ +{ + "2016.tar.gz": "http://files.mint.isi.edu/s/cIyy9bszbYFgTvU/download", + "2019.tar.gz": "http://files.mint.isi.edu/s/gHh8fOxqAix2ZIY/download", + "2017.tar.gz": "http://files.mint.isi.edu/s/Psd5W4GaQml6UPX/download", + "2012.tar.gz": "http://files.mint.isi.edu/s/kZhmDz2VXDOM2ss/download", + "2013.tar.gz": "http://files.mint.isi.edu/s/tOy0d0v09vpmG2t/download", + "2014.tar.gz": "http://files.mint.isi.edu/s/fM6w63ratIu9eCW/download", + "2018.tar.gz": "http://files.mint.isi.edu/s/4r0Np1IJTp592nX/download", + "2015.tar.gz": "http://files.mint.isi.edu/s/fkNVdWsWUKYlwaW/download" +} \ No newline at end of file From e054e193e1056ffe016c7aced1c61b990d7ade9c Mon Sep 17 00:00:00 2001 From: Binh Vu Date: Mon, 13 Jul 2020 15:23:58 -0700 Subject: [PATCH 3/4] sync change --- examples/topoflow4/dev/s00_download_gpm.py | 37 +++++++ .../dev/{run_gldas.py => s10_run_gldas.py} | 5 +- examples/topoflow4/dev/s11_run_gpm.py | 101 ++++++++++++++++++ ...{verify_result.py => s20_verify_result.py} | 9 +- examples/topoflow4/dev/s30_upload.py | 1 + .../topoflow4/dev/tf_climate_make_rts.yml | 4 +- ....yml => tf_climate_writegeotiff.gldas.yml} | 0 .../dev/tf_climate_writegeotiff.gpm.yml | 15 +++ uploaded.json | 16 +-- 9 files changed, 171 insertions(+), 17 deletions(-) create mode 100644 examples/topoflow4/dev/s00_download_gpm.py rename examples/topoflow4/dev/{run_gldas.py => s10_run_gldas.py} (96%) create mode 100644 examples/topoflow4/dev/s11_run_gpm.py rename examples/topoflow4/dev/{verify_result.py => s20_verify_result.py} (94%) create mode 100644 examples/topoflow4/dev/s30_upload.py rename examples/topoflow4/dev/{tf_climate_writegeotiff.gldas_remote.yml => tf_climate_writegeotiff.gldas.yml} (100%) create mode 100644 examples/topoflow4/dev/tf_climate_writegeotiff.gpm.yml diff --git a/examples/topoflow4/dev/s00_download_gpm.py b/examples/topoflow4/dev/s00_download_gpm.py new file mode 100644 index 00000000..94b56f04 --- /dev/null +++ b/examples/topoflow4/dev/s00_download_gpm.py @@ -0,0 +1,37 @@ +import subprocess, glob, os, shutil +from tqdm.auto import tqdm +from pathlib import Path + +files = [ + "https://files.mint.isi.edu/s/3RZwyxbi5PpcqeV/download", + "https://files.mint.isi.edu/s/cvY8E1GPC3v8Fi6/download", + "https://files.mint.isi.edu/s/RzERxVVIh2M7Qzc/download", + "https://files.mint.isi.edu/s/DswKIJadJHkwiPo/download", +] +download_dir = "/workspace/mint/MINT-Transformation/data/gpm_download" +dest_dir = "/workspace/mint/MINT-Transformation/data/GPM" + +cmds = [] + +# # download the files +# for i, file in enumerate(files): +# cmds.append(f"wget {file} -O {download_dir}/file_{i}.tar.gz") +# for cmd in tqdm(cmds): +# subprocess.check_call(cmd, shell=True) + +# extract files +# cwd = download_dir +# for file in glob.glob(download_dir + "/*.tar.gz"): +# cmds.append(f"tar -xzf {file}") +# print(cmds[-1]) +# for cmd in tqdm(cmds): +# subprocess.check_call(cmd, shell=True) + +# copy files +cwd = download_dir +for year in tqdm(range(2008, 2021)): + (Path(dest_dir) / str(year)).mkdir(exist_ok=True, parents=True) + download_files = glob.glob(download_dir + f"/*/*3IMERG.{year}*") + print(year, len(download_files)) + for file in download_files: + shutil.move(file, dest_dir + f'/{year}/') \ No newline at end of file diff --git a/examples/topoflow4/dev/run_gldas.py b/examples/topoflow4/dev/s10_run_gldas.py similarity index 96% rename from examples/topoflow4/dev/run_gldas.py rename to examples/topoflow4/dev/s10_run_gldas.py index 057301ef..83e92bbd 100644 --- a/examples/topoflow4/dev/run_gldas.py +++ b/examples/topoflow4/dev/s10_run_gldas.py @@ -26,7 +26,7 @@ commands = [] def add_geotif_command(commands, start_time, end_time, geotiff_dir): - writegeotiff_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_writegeotiff.gldas_remote.yml") + writegeotiff_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_writegeotiff.gldas.yml") cmd = f"""dotenv run python -m dtran.main exec_pipeline --config {writegeotiff_config_file} \ --dataset.start_time={start_time} \ --dataset.end_time={end_time} \ @@ -45,7 +45,8 @@ def add_rts_command(commands, start_time, end_time, geotiff_dir, geotiff_dir_cro --topoflow.skip_crop_on_exist=true \ --topoflow.xres_arcsecs={arcsec} \ --topoflow.yres_arcsecs={arcsec} \ - --topoflow.bounds="{area}" + --topoflow.bounds="{area}" \ + --topoflow.unit_multiplier=3600 """.strip() commands.append(cmd) diff --git a/examples/topoflow4/dev/s11_run_gpm.py b/examples/topoflow4/dev/s11_run_gpm.py new file mode 100644 index 00000000..54b4e982 --- /dev/null +++ b/examples/topoflow4/dev/s11_run_gpm.py @@ -0,0 +1,101 @@ +import subprocess +import os +from pathlib import Path +from dateutil import parser +from datetime import timedelta, datetime +from dataclasses import dataclass +from tqdm.auto import tqdm + + +HOME_DIR = Path(os.path.abspath(os.environ['HOME_DIR'])) +DOWNLOAD_DIR = HOME_DIR / "data/GPM" + +arcsecs = [30, 60] +areas = { + "awash": "37.829583333333, 6.654583333333, 39.934583333333, 9.374583333333", + "baro": "34.221249999999, 7.362083333332, 36.450416666666, 9.503749999999", + "shebelle": "38.159583333333, 6.319583333333, 43.559583333333, 9.899583333333", + "ganale": "39.174583333333, 5.527916666666, 41.124583333333, 7.098749999999", + "guder": "37.149583333333, 8.596250000000, 38.266250000000, 9.904583333333", + "muger": "37.807916666667, 8.929583333333, 39.032916666667, 10.112916666667", + "beko": "35.241249999999, 6.967916666666, 35.992916666666, 7.502916666666", + "alwero": "34.206249999999, 7.415416666666, 35.249583333333, 8.143749999999" +} + +run_dir = HOME_DIR / "data" / "tf_gpm" + +def add_geotif_command(commands, year, month, geotiff_dir): + writegeotiff_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_writegeotiff.gpm.yml") + resource_path = str(DOWNLOAD_DIR / str(year) / f"*3IMERG.{year}{month:02d}*") + gpm_file = str(HOME_DIR / "examples/topoflow4/dev/gpm.yml") + + cmd = f"""dotenv run python -m dtran.main exec_pipeline --config {writegeotiff_config_file} \ + --dataset.resource_path={resource_path} \ + --dataset.repr_file={gpm_file} \ + --geotiff_writer.output_dir={geotiff_dir} \ + --geotiff_writer.skip_on_exist=true \ + """.strip() + commands.append(cmd) + + +def add_rts_command(commands, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area): + makerts_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_make_rts.yml") + cmd = f"""dotenv run python -m dtran.main exec_pipeline --config {makerts_config_file} \ + --topoflow.geotiff_files='{geotiff_dir}*/*.tif' \ + --topoflow.cropped_geotiff_dir={geotiff_dir_crop} \ + --topoflow.output_file={output_file} \ + --topoflow.skip_crop_on_exist=true \ + --topoflow.xres_arcsecs={arcsec} \ + --topoflow.yres_arcsecs={arcsec} \ + --topoflow.bounds="{area}" \ + --topoflow.unit_multiplier=1 + """.strip() + commands.append(cmd) + + +for year in range(2010, 2020): + commands = [] + (run_dir / str(year)).mkdir(exist_ok=True, parents=True) + for month in range(1, 13): + s0 = parser.parse(f"{year}-{month:02d}-01T00:00:00") + if month == 12: + s1 = s0.replace(day=31, hour=23, minute=59, second=59) + else: + s1 = s0.replace(month=s0.month + 1) - timedelta(seconds=1) + + start_time = s0.isoformat() + end_time = s1.isoformat() + geotiff_dir = str(run_dir / str(year) / f"data_m{month:02d}") + add_geotif_command(commands, year, month, geotiff_dir) + + for area_name, area in areas.items(): + for arcsec in arcsecs: + geotiff_dir_crop = str(run_dir / str(year) / area_name / f"geotiff_crop_{arcsec}") + output_file = str(run_dir / str(year) / area_name / f"output_r{arcsec}_m{month:02d}.rts") + add_rts_command(commands, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area) + + start_time = f"{year}-01-01T00:00:00" + end_time = f"{year}-12-31T23:59:59" + geotiff_dir = str(run_dir / str(year) / f"data_m") + + for area_name, area in areas.items(): + for arcsec in arcsecs: + geotiff_dir_crop = str(run_dir / str(year) / area_name / f"geotiff_crop_{arcsec}") + output_file = str(run_dir / str(year) / area_name / f"output_r{arcsec}.rts") + add_rts_command(commands, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area) + + # compress the data + for area_name in areas.keys(): + input_dir = run_dir / str(year) / area_name + commands.append(f"cd {input_dir.parent} && tar -czf {input_dir.name}.tar.gz {input_dir.name}") + + # upload the file + commands.append(f""" + dotenv run python dtran/dcat/scripts/upload_files_in_batch.py upload_files \ + --server=OWNCLOUD --dir={run_dir / str(year)} \ + --ext=tar.gz --upload_dir=Topoflow/GPM_version_2/{year} + """) + commands.append(f"rm -rf {run_dir / str(year)}") + + for cmd in tqdm(commands, desc=f"run commands for year {year}"): + output = subprocess.check_output(cmd, shell=True, cwd=str(HOME_DIR)) \ No newline at end of file diff --git a/examples/topoflow4/dev/verify_result.py b/examples/topoflow4/dev/s20_verify_result.py similarity index 94% rename from examples/topoflow4/dev/verify_result.py rename to examples/topoflow4/dev/s20_verify_result.py index d5804fb8..c4da8bf3 100644 --- a/examples/topoflow4/dev/verify_result.py +++ b/examples/topoflow4/dev/s20_verify_result.py @@ -10,23 +10,22 @@ HOME_DIR = Path(os.path.abspath(os.environ['HOME_DIR'])) DOWNLOAD_DIR = Path(os.path.abspath(os.environ['DATA_CATALOG_DOWNLOAD_DIR'])) -run_dir = HOME_DIR / "data" / "tf_gldas" +# gldas is 8, gpm is 48 +n_files_per_day = 48 +run_dir = HOME_DIR / "data" / "tf_gpm" def get_size(infile): data = np.fromfile(infile, dtype=np.float32) return data.shape[0] -# gldas is 8 -n_files_per_day = 8 - n_month_days = [ (parser.parse(f'2000-{i+1:02}-01T00:00') - parser.parse(f'2000-{i:02}-01T00:00')).days for i in range(1, 12) ] n_month_days.append(31) -for year in range(2019, 2020): +for year in range(2008, 2009): n_year_days = (parser.parse(f'{year+1}-01-01T00:00') - parser.parse(f'{year}-01-01T00:00')).days n_month_days[1] = (parser.parse(f'{year}-03-01T00:00') - parser.parse(f'{year}-02-01T00:00')).days diff --git a/examples/topoflow4/dev/s30_upload.py b/examples/topoflow4/dev/s30_upload.py new file mode 100644 index 00000000..294ad7b0 --- /dev/null +++ b/examples/topoflow4/dev/s30_upload.py @@ -0,0 +1 @@ +dotenv run python dtran/dcat/scripts/upload_files_in_batch.py upload_files --server=OWNCLOUD --dir=data/tf_gldas --ext=tar.gz --upload_dir=Topoflow/GLDAS_version_2 \ No newline at end of file diff --git a/examples/topoflow4/dev/tf_climate_make_rts.yml b/examples/topoflow4/dev/tf_climate_make_rts.yml index e63f3656..0289d12d 100644 --- a/examples/topoflow4/dev/tf_climate_make_rts.yml +++ b/examples/topoflow4/dev/tf_climate_make_rts.yml @@ -9,5 +9,5 @@ adapters: bounds: "34.221249999999, 7.353749999999, 36.45458333333234, 9.503749999999" xres_arcsecs: 60 yres_arcsecs: 60 - # required for GLDAS (unit transformation) - unit_multiplier: 3600 \ No newline at end of file + # required for GLDAS (unit transformation = 3600) + unit_multiplier: 1 \ No newline at end of file diff --git a/examples/topoflow4/dev/tf_climate_writegeotiff.gldas_remote.yml b/examples/topoflow4/dev/tf_climate_writegeotiff.gldas.yml similarity index 100% rename from examples/topoflow4/dev/tf_climate_writegeotiff.gldas_remote.yml rename to examples/topoflow4/dev/tf_climate_writegeotiff.gldas.yml diff --git a/examples/topoflow4/dev/tf_climate_writegeotiff.gpm.yml b/examples/topoflow4/dev/tf_climate_writegeotiff.gpm.yml new file mode 100644 index 00000000..cf3b03e4 --- /dev/null +++ b/examples/topoflow4/dev/tf_climate_writegeotiff.gpm.yml @@ -0,0 +1,15 @@ +version: "1" +adapters: + dataset: + comment: | + Weather dataset + adapter: funcs.ReadFunc + inputs: + resource_path: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/download/*.nc4 + repr_file: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm.yml + geotiff_writer: + adapter: funcs.GeoTiffWriteFunc + inputs: + dataset: $.dataset.data + variable_name: atmosphere_water__precipitation_mass_flux + output_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/geotiff diff --git a/uploaded.json b/uploaded.json index a948e42f..527b4d72 100644 --- a/uploaded.json +++ b/uploaded.json @@ -1,10 +1,10 @@ { - "2016.tar.gz": "http://files.mint.isi.edu/s/cIyy9bszbYFgTvU/download", - "2019.tar.gz": "http://files.mint.isi.edu/s/gHh8fOxqAix2ZIY/download", - "2017.tar.gz": "http://files.mint.isi.edu/s/Psd5W4GaQml6UPX/download", - "2012.tar.gz": "http://files.mint.isi.edu/s/kZhmDz2VXDOM2ss/download", - "2013.tar.gz": "http://files.mint.isi.edu/s/tOy0d0v09vpmG2t/download", - "2014.tar.gz": "http://files.mint.isi.edu/s/fM6w63ratIu9eCW/download", - "2018.tar.gz": "http://files.mint.isi.edu/s/4r0Np1IJTp592nX/download", - "2015.tar.gz": "http://files.mint.isi.edu/s/fkNVdWsWUKYlwaW/download" + "shebelle.tar.gz": "http://files.mint.isi.edu/s/KTDn3YQ347yNqBd/download", + "awash.tar.gz": "http://files.mint.isi.edu/s/abr6goUK5Gxfysu/download", + "baro.tar.gz": "http://files.mint.isi.edu/s/ZD9ssfwt57yikFW/download", + "beko.tar.gz": "http://files.mint.isi.edu/s/9ToC9VlSqM8SdHs/download", + "muger.tar.gz": "http://files.mint.isi.edu/s/pBHNGXtYeA5ZLcE/download", + "alwero.tar.gz": "http://files.mint.isi.edu/s/38r6gMtnE6yKYkr/download", + "ganale.tar.gz": "http://files.mint.isi.edu/s/3jOjvSpVHaito5M/download", + "guder.tar.gz": "http://files.mint.isi.edu/s/xfOnnl66mPPFVWC/download" } \ No newline at end of file From 69685298414090f919731d69b034552f61fd0c63 Mon Sep 17 00:00:00 2001 From: Minh Pham Date: Tue, 14 Jul 2020 00:49:45 -0700 Subject: [PATCH 4/4] Update DEM cropping --- .gitignore | 3 +- dtran/dcat/api.py | 1 - dtran/dcat/scripts/delete.json | 2 +- dtran/dcat/scripts/register_datasets.py | 10 +- dtran/dcat/scripts/resource.json | 3 + dtran/dcat/scripts/variables.json | 8 +- environment.yml | 207 +++++++++++++++--- examples/dame/filled/dem_cropping.yml | 46 ++-- examples/dame/filled/topoflow_gldas.yml | 26 +++ ...{topoflow_climate.yml => topoflow_gpm.yml} | 0 examples/dame/scripts/dem_cropping.sh | 3 + .../dame/templates/dem_cropping.yml.template | 37 ++++ funcs/cycles/gldas2cycles.py | 3 +- funcs/dem/__init__.py | 1 + funcs/dem/crop_func.py | 51 +++++ funcs/readers/dcat_read_no_repr.py | 4 +- run | 2 +- 17 files changed, 329 insertions(+), 78 deletions(-) create mode 100644 dtran/dcat/scripts/resource.json create mode 100644 examples/dame/filled/topoflow_gldas.yml rename examples/dame/filled/{topoflow_climate.yml => topoflow_gpm.yml} (100%) create mode 100755 examples/dame/scripts/dem_cropping.sh create mode 100644 examples/dame/templates/dem_cropping.yml.template create mode 100644 funcs/dem/__init__.py create mode 100644 funcs/dem/crop_func.py diff --git a/.gitignore b/.gitignore index e91b8b1e..98cb8de4 100644 --- a/.gitignore +++ b/.gitignore @@ -114,4 +114,5 @@ examples/demo/data *node_modules* data -tmp \ No newline at end of file +tmp +env.env \ No newline at end of file diff --git a/dtran/dcat/api.py b/dtran/dcat/api.py index 3a5bba82..0de324aa 100644 --- a/dtran/dcat/api.py +++ b/dtran/dcat/api.py @@ -280,7 +280,6 @@ def handle_api_response(response: requests.Response): """ This is a convenience method to handle api responses :param response: - :param print_response: :return: """ parsed_response = response.json() diff --git a/dtran/dcat/scripts/delete.json b/dtran/dcat/scripts/delete.json index 0637a088..e171d1f6 100644 --- a/dtran/dcat/scripts/delete.json +++ b/dtran/dcat/scripts/delete.json @@ -1 +1 @@ -[] \ No newline at end of file +["b3e79dc2-8fa1-4203-ac82-b5267925191f"] \ No newline at end of file diff --git a/dtran/dcat/scripts/register_datasets.py b/dtran/dcat/scripts/register_datasets.py index bbe80447..9ec9c7ef 100644 --- a/dtran/dcat/scripts/register_datasets.py +++ b/dtran/dcat/scripts/register_datasets.py @@ -14,8 +14,8 @@ def cli(): ignore_unknown_options=True, allow_extra_args=False, )) -@click.option("--name", help="DCAT dataset name", default="test-dataset") -@click.option("--description", help="DCAT dataset description", default="test-description") +@click.option("--name", help="DCAT dataset name", prompt="Dataset name") +@click.option("--description", help="DCAT dataset description", prompt="Dataset description") @click.option("--metadata_path", help="DCAT dataset metadata file path", default=None) @click.option("--resource_path", help="DCAT dataset resources json path, should be a file name-url dict", default=None) @click.option("--resource_type", help="DCAT dataset resource type", default="zip") @@ -23,11 +23,11 @@ def cli(): def register_dataset(name, description, metadata_path, resource_path, variable_path, resource_type): """ Registers DCAT dataset with multiple resources. - Example: PYTHONPATH=$(pwd):$(pwd):$PYTHONPATH python dtran/dcat/scripts/register_datasets.py register_dataset + Example: PYTHONPATH=$(pwd):$PYTHONPATH python dtran/dcat/scripts/register_datasets.py register_dataset --resource_path=./uploaded.json --variable_path=variables.json """ - dcat = DCatAPI.get_instance("https://api.mint-data-catalog.org") + dcat = DCatAPI.get_instance() if metadata_path is None: metadata = {} @@ -85,7 +85,7 @@ def delete_dataset(dcatid, json_path): Delete specified datasets. Example: PYTHONPATH=$(pwd):$(pwd):$PYTHONPATH python dtran/dcat/scripts/register_datasets.py delete_dataset --dcatid=c4fedf48-f888-4de1-b60f-c6ac5cb1615b """ - dcat = DCatAPI.get_instance("https://api.mint-data-catalog.org") + dcat = DCatAPI.get_instance() if dcatid is None and json_path is None: raise ValueError("Please enter dataset ids to delete!") diff --git a/dtran/dcat/scripts/resource.json b/dtran/dcat/scripts/resource.json new file mode 100644 index 00000000..1905ddc7 --- /dev/null +++ b/dtran/dcat/scripts/resource.json @@ -0,0 +1,3 @@ +{ + "awash.tif": "https://data.mint.isi.edu/files/hand-dem/GIS-Oromia/Awash/Awash-border_DEM_buffer.tif" +} \ No newline at end of file diff --git a/dtran/dcat/scripts/variables.json b/dtran/dcat/scripts/variables.json index 9896ab96..dedcf17d 100644 --- a/dtran/dcat/scripts/variables.json +++ b/dtran/dcat/scripts/variables.json @@ -1,10 +1,10 @@ { "standard_variables": [ { - "ontology": "ScientificVariablesOntology", - "name": "precipitation_leg_volume_flux", - "uri": "http://www.geoscienceontology.org/svo/svl/variable/1.0.0/#atmosphere%40role%7Esource_water%40role%7Emain_precipitation__precipitation_leq_volume_flux" + "ontology": "MINT Ontology", + "name": "var_0", + "uri": "http://mint.isi.edu/var_0" } ], - "variable_names": ["test-var"] + "variable_names": ["var_0"] } \ No newline at end of file diff --git a/environment.yml b/environment.yml index f41dc5b4..53e51a34 100644 --- a/environment.yml +++ b/environment.yml @@ -1,34 +1,185 @@ -# https://docs.conda.io/projects/conda-build/en/latest/resources/package-spec.html#package-match-specifications name: mintdt channels: - conda-forge - defaults dependencies: - - python=3.8 - - pip - - gdal=3.0.4 - - scipy - - pydap=3.2.2 - - numpy - - shapely=1.7 - - dask=2.11.0 + - _libgcc_mutex=0.1=conda_forge + - _openmp_mutex=4.5=1_llvm + - affine=2.3.0=py_0 + - attrs=19.3.0=py_0 + - beautifulsoup4=4.9.1=py38h32f6830_0 + - bokeh=2.1.1=py38h32f6830_0 + - boost-cpp=1.72.0=h8e57a91_0 + - bottleneck=1.3.2=py38h8790de6_1 + - brotlipy=0.7.0=py38h1e0a361_1000 + - bzip2=1.0.8=h516909a_2 + - ca-certificates=2020.6.20=hecda079_0 + - cairo=1.16.0=hcf35c78_1003 + - certifi=2020.6.20=py38h32f6830_0 + - cffi=1.14.0=py38hd463f26_0 + - cfitsio=3.470=h3eac812_5 + - cftime=1.2.0=py38h8790de6_1 + - chardet=3.0.4=py38h32f6830_1006 + - click-plugins=1.1.1=py_0 + - cloudpickle=1.5.0=py_0 + - cryptography=2.9.2=py38h766eaa4_0 + - curl=7.71.1=he644dc0_1 + - cytoolz=0.10.1=py38h516909a_0 + - dask=2.11.0=py_0 + - dask-core=2.11.0=py_0 + - distributed=2.20.0=py38h32f6830_0 + - docopt=0.6.2=py_1 + - expat=2.2.9=he1b5a44_2 + - fontconfig=2.13.1=h86ecdb6_1001 + - freetype=2.10.2=he06d7ca_0 + - freexl=1.0.5=h14c3975_1002 + - fsspec=0.7.4=py_0 + - gdal=3.0.4=py38h172510d_10 + - geos=3.8.1=he1b5a44_0 + - geotiff=1.6.0=h05acad5_0 + - gettext=0.19.8.1=hc5be6a0_1002 + - giflib=5.2.1=h516909a_2 + - glib=2.65.0=h6f030ca_0 + - hdf4=4.2.13=hf30be14_1003 + - hdf5=1.10.6=nompi_h3c11f04_100 + - heapdict=1.0.1=py_0 + - icu=64.2=he1b5a44_1 + - idna=2.10=pyh9f0ad1d_0 + - jinja2=2.11.2=pyh9f0ad1d_0 + - jpeg=9d=h516909a_0 + - json-c=0.13.1=hbfbb72e_1002 + - kealib=1.4.13=h33137a7_1 + - krb5=1.17.1=hfafb76e_1 + - lcms2=2.11=hbd6801e_0 + - ld_impl_linux-64=2.34=h53a641e_7 + - libblas=3.8.0=17_openblas + - libcblas=3.8.0=17_openblas + - libcurl=7.71.1=hcdd3856_1 + - libdap4=3.20.6=h1d1bd15_0 + - libedit=3.1.20191231=h46ee950_1 + - libffi=3.2.1=he1b5a44_1007 + - libgcc-ng=9.2.0=h24d8f2e_2 + - libgdal=3.0.4=he6a97d6_10 + - libgfortran-ng=7.5.0=hdf63c60_6 + - libiconv=1.15=h516909a_1006 + - libkml=1.3.0=hb574062_1011 + - liblapack=3.8.0=17_openblas + - libnetcdf=4.7.4=nompi_h84807e1_104 + - libopenblas=0.3.10=h5ec1e0e_0 + - libpng=1.6.37=hed695b0_1 + - libpq=12.2=h5513abc_1 + - libspatialite=4.3.0a=h2482549_1038 + - libssh2=1.9.0=hab1572f_3 + - libstdcxx-ng=9.2.0=hdf63c60_2 + - libtiff=4.1.0=hc7e4089_6 + - libuuid=2.32.1=h14c3975_1000 + - libwebp-base=1.1.0=h516909a_3 + - libxcb=1.13=h14c3975_1002 + - libxml2=2.9.10=hee79883_0 + - libxslt=1.1.33=h31b3aaa_0 + - llvm-openmp=10.0.0=hc9558a2_0 + - locket=0.2.0=py_2 + - lxml=4.5.2=py38hbb43d70_0 + - lz4-c=1.9.2=he1b5a44_1 + - markupsafe=1.1.1=py38h1e0a361_1 + - mechanicalsoup=0.12.0=py_0 + - msgpack-python=1.0.0=py38hbf85e49_1 + - ncurses=6.1=hf484d3e_1002 + - netcdf4=1.5.3=nompi_py38hfd55d45_105 + - numpy=1.18.5=py38h8854b6b_0 + - olefile=0.46=py_0 + - openjpeg=2.3.1=h981e76c_3 + - openssl=1.1.1g=h516909a_0 + - packaging=20.4=pyh9f0ad1d_0 + - pandas=1.0.5=py38hcb8c335_0 + - partd=1.1.0=py_0 + - pcre=8.44=he1b5a44_0 + - pillow=7.2.0=py38h9776b28_1 + - pip=20.1.1=py_1 + - pixman=0.38.0=h516909a_1003 + - poppler=0.87.0=h4190859_1 + - poppler-data=0.4.9=1 + - postgresql=12.2=h8573dbc_1 + - proj=7.0.0=h966b41f_4 + - psutil=5.7.0=py38h1e0a361_1 + - pthread-stubs=0.4=h14c3975_1001 + - pycparser=2.20=pyh9f0ad1d_2 + - pydap=3.2.2=py38_1000 + - pyopenssl=19.1.0=py_1 + - pyparsing=2.4.7=pyh9f0ad1d_0 + - pyproj=2.6.1.post1=py38h7521cb9_0 + - pysocks=1.7.1=py38h32f6830_1 + - python=3.8.3=cpython_he5300dc_0 + - python_abi=3.8=1_cp38 + - pytz=2020.1=pyh9f0ad1d_0 + - pyyaml=5.3.1=py38h1e0a361_0 + - rasterio=1.1.5=py38h033e0f6_0 + - readline=8.0=h46ee950_1 + - requests=2.24.0=pyh9f0ad1d_0 + - rioxarray=0.0.31=py_0 + - scipy=1.5.0=py38h18bccfc_0 + - setuptools=49.2.0=py38h32f6830_0 + - shapely=1.7.0=py38hd168ffb_3 + - six=1.15.0=pyh9f0ad1d_0 + - snuggs=1.4.7=py_0 + - sortedcontainers=2.2.2=pyh9f0ad1d_0 + - soupsieve=2.0.1=py38h32f6830_0 + - sqlite=3.32.3=hcee41ef_1 + - tbb=2020.1=hc9558a2_0 + - tblib=1.6.0=py_0 + - tiledb=1.7.7=h8efa9f0_3 + - tk=8.6.10=hed695b0_0 + - toolz=0.10.0=py_0 + - tornado=6.0.4=py38h1e0a361_1 + - typing_extensions=3.7.4.2=py_0 + - tzcode=2020a=h516909a_0 + - urllib3=1.25.9=py_0 + - webob=1.8.6=py_0 + - wheel=0.34.2=py_1 + - xerces-c=3.2.2=h8412b87_1004 + - xorg-kbproto=1.0.7=h14c3975_1002 + - xorg-libice=1.0.10=h516909a_0 + - xorg-libsm=1.2.3=h84519dc_1000 + - xorg-libx11=1.6.9=h516909a_0 + - xorg-libxau=1.0.9=h14c3975_0 + - xorg-libxdmcp=1.1.3=h516909a_0 + - xorg-libxext=1.3.4=h516909a_0 + - xorg-libxrender=0.9.10=h516909a_1002 + - xorg-renderproto=0.11.1=h14c3975_1002 + - xorg-xextproto=7.3.0=h14c3975_1002 + - xorg-xproto=7.0.31=h14c3975_1007 + - xz=5.2.5=h516909a_1 + - yaml=0.2.5=h516909a_0 + - zict=2.0.0=py_0 + - zlib=1.2.11=h516909a_1006 + - zstd=1.4.4=h6597ccf_3 - pip: - - drepr>=2.9.2 - - ujson - - netCDF4>=1.5.3 - - arpeggio - - rdflib - - flask - - flask_cors - - requests - - tqdm - - marshmallow==3.5 - - xarray==0.15 - - networkx==2.4 - - python-dateutil==2.8 - - click==7.0 - - ccut - - matplotlib - - python-dotenv==0.12 - - peewee==3.13 - - pyocclient + - arpeggio==1.9.2 + - ccut==1.0.0 + - click==7.0 + - cligj==0.5.0 + - cycler==0.10.0 + - decorator==4.4.2 + - drepr==2.9.10 + - fiona==1.8.13.post1 + - flask==1.1.2 + - flask-cors==3.0.8 + - isodate==0.6.0 + - itsdangerous==1.1.0 + - kiwisolver==1.2.0 + - marshmallow==3.5.0 + - matplotlib==3.2.2 + - munch==2.5.0 + - networkx==2.4 + - peewee==3.13.0 + - pyocclient==0.4 + - python-dateutil==2.8.0 + - python-dotenv==0.12.0 + - rdflib==5.0.0 + - ruamel-yaml==0.16.10 + - ruamel-yaml-clib==0.2.0 + - tqdm==4.47.0 + - ujson==3.0.0 + - werkzeug==1.0.1 + - xarray==0.15.0 + - xmltodict==0.12.0 diff --git a/examples/dame/filled/dem_cropping.yml b/examples/dame/filled/dem_cropping.yml index fc42a013..af5d10b9 100644 --- a/examples/dame/filled/dem_cropping.yml +++ b/examples/dame/filled/dem_cropping.yml @@ -1,27 +1,15 @@ version: "1.0" description: Data transformation to generate daily average data from original GLDAS data sources inputs: - gldas_dataset_id: - comment: DataCatalog Dataset ID for GLDAS - value: 5babae3f-c468-4e01-862e-8b201468e3b5 - agg_function: - comment: Operation to be used for aggregation. Values can be ("sum", "average", "count") - value: average - agg_time_period: - comment: Time period for aggregation. Values can be ("minute", "hour", "day", "month", "year") - value: day - start_time: - comment: Start time to filter Resources for DataCatalog GLDAS Dataset (can also be "null" to leave this end open) - value: '2011-01-01 00:00:00' - end_time: - comment: End time to filter Resources for DataCatalog GLDAS Dataset (can also be "null" to leave this end open) - value: '2011-01-02 00:00:00' + dataset_id: + comment: DataCatalog Dataset ID for raster file + value: 3c5aa587-2e3d-49ef-ad60-532370941e87 crop_region_xmin: comment: Target region bounding box xmin coordinate value: 32.75418 crop_region_ymin: comment: Target region bounding box ymin coordinate - value: 3.22206 + value: 8.22206 crop_region_xmax: comment: Target region bounding box xmax coordinate value: 47.98942 @@ -29,29 +17,21 @@ inputs: comment: Target region bounding box ymax coordinate value: 15.15943 output_file: - comment: Path to output compressed file - value: output.zip + comment: Path to output file + value: output.tiff adapters: - gldas_read_func: - comment: My gldas read func adapter - adapter: funcs.DcatReadFunc + read_func: + comment: My geotiff read func adapter + adapter: funcs.DcatReadNoReprFunc inputs: - dataset_id: $$.gldas_dataset_id - start_time: $$.start_time - end_time: $$.end_time + dataset_id: $$.dataset_id my_crop_wrapper: comment: My cropping func wrapper adapter - adapter: funcs.CroppingTransFunc + adapter: funcs.dem.DEMCropFunc inputs: - variable_name: $$.variable_name - dataset: $.gldas_read_func.data + input_file: $.read_func.data_path + output_file: $$.output_file xmin: $$.crop_region_xmin ymin: $$.crop_region_ymin xmax: $$.crop_region_xmax ymax: $$.crop_region_ymax - geotiff_writer: - adapter: funcs.GeoTiffWriteFunc - inputs: - dataset: $.weather_data.data - variable_name: $$.variable_name - output_dir: $$.tmp_dir_geotiff diff --git a/examples/dame/filled/topoflow_gldas.yml b/examples/dame/filled/topoflow_gldas.yml new file mode 100644 index 00000000..25c4989c --- /dev/null +++ b/examples/dame/filled/topoflow_gldas.yml @@ -0,0 +1,26 @@ +adapters: + weather_data: + comment: | + Weather dataset + adapter: funcs.ReadFunc + inputs: + resource_path: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/download/*.nc4 + repr_file: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm.yml + geotiff_writer: + adapter: funcs.GeoTiffWriteFunc + inputs: + dataset: $.weather_data.data + variable_name: atmosphere_water__precipitation_mass_flux + output_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/geotiff + tf_trans: + adapter: funcs.topoflow.topoflow_climate.Topoflow4ClimateWriteFunc + inputs: + geotiff_files: $.geotiff_writer.output_files + cropped_geotiff_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/geotiff_crop + output_file: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/output.rts + bounds: "34.221249999999, 7.353749999999, 36.45458333333234, 9.503749999999" + xres_arcsecs: 60 + yres_arcsecs: 60 + # unit multiplier 1 for GPM, 3600 for GLDAS + unit_multiplier: 1 +# unit_multiplier: 3600 \ No newline at end of file diff --git a/examples/dame/filled/topoflow_climate.yml b/examples/dame/filled/topoflow_gpm.yml similarity index 100% rename from examples/dame/filled/topoflow_climate.yml rename to examples/dame/filled/topoflow_gpm.yml diff --git a/examples/dame/scripts/dem_cropping.sh b/examples/dame/scripts/dem_cropping.sh new file mode 100755 index 00000000..e06baaf5 --- /dev/null +++ b/examples/dame/scripts/dem_cropping.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +./run --config examples/dame/templates/dem_cropping.yml.template -i1 3c5aa587-2e3d-49ef-ad60-532370941e87 -p1 "32.75418" -p2 "8.22206" -p3 "47.98942" -p4 "15.15943" -o1 "output.tiff" diff --git a/examples/dame/templates/dem_cropping.yml.template b/examples/dame/templates/dem_cropping.yml.template new file mode 100644 index 00000000..a62a5289 --- /dev/null +++ b/examples/dame/templates/dem_cropping.yml.template @@ -0,0 +1,37 @@ +version: "1.0" +description: Data transformation to generate daily average data from original GLDAS data sources +inputs: + dataset_id: + comment: DataCatalog Dataset ID for raster file + value: {INPUTS1} + crop_region_xmin: + comment: Target region bounding box xmin coordinate + value: {PARAMS1} + crop_region_ymin: + comment: Target region bounding box ymin coordinate + value: {PARAMS2} + crop_region_xmax: + comment: Target region bounding box xmax coordinate + value: {PARAMS3} + crop_region_ymax: + comment: Target region bounding box ymax coordinate + value: {PARAMS4} + output_file: + comment: Path to output file + value: {OUTPUTS1} +adapters: + read_func: + comment: My geotiff read func adapter + adapter: funcs.DcatReadNoReprFunc + inputs: + dataset_id: $$.dataset_id + my_crop_wrapper: + comment: My cropping func wrapper adapter + adapter: funcs.dem.DEMCropFunc + inputs: + input_file: $.read_func.data_path + output_file: $$.output_file + xmin: $$.crop_region_xmin + ymin: $$.crop_region_ymin + xmax: $$.crop_region_xmax + ymax: $$.crop_region_ymax diff --git a/funcs/cycles/gldas2cycles.py b/funcs/cycles/gldas2cycles.py index 3af984cf..be8cc6ad 100644 --- a/funcs/cycles/gldas2cycles.py +++ b/funcs/cycles/gldas2cycles.py @@ -1,4 +1,3 @@ -import argparse import math import os import shutil @@ -8,10 +7,10 @@ import numpy as np from netCDF4 import Dataset + from dtran import IFunc, ArgType from dtran.ifunc import IFuncType from dtran.metadata import Metadata -import xarray class Gldas2CyclesFunc(IFunc): diff --git a/funcs/dem/__init__.py b/funcs/dem/__init__.py new file mode 100644 index 00000000..45569351 --- /dev/null +++ b/funcs/dem/__init__.py @@ -0,0 +1 @@ +from .crop_func import DEMCropFunc \ No newline at end of file diff --git a/funcs/dem/crop_func.py b/funcs/dem/crop_func.py new file mode 100644 index 00000000..fec93dc5 --- /dev/null +++ b/funcs/dem/crop_func.py @@ -0,0 +1,51 @@ +from typing import Optional, Dict + +import rioxarray + +from dtran import IFunc, ArgType +from dtran.ifunc import IFuncType +from dtran.metadata import Metadata + + +class DEMCropFunc(IFunc): + id = "dem_crop_func" + description = """ A reader-transformation-writer multi-adapter. + Crop a raster file by bounding box. + """ + inputs = { + "input_file": ArgType.String, + "output_file": ArgType.String, + "xmin": ArgType.Number, + "ymin": ArgType.Number, + "xmax": ArgType.Number, + "ymax": ArgType.Number, + } + outputs = {"output_file": ArgType.String} + friendly_name: str = "DEMCrop" + func_type = IFuncType.MODEL_TRANS + + def __init__(self, input_file, output_file, xmin, ymin, xmax, ymax): + self.input_file = input_file + self.output_file = output_file + self.xmin = xmin + self.ymin = ymin + self.xmax = xmax + self.ymax = ymax + + def validate(self) -> bool: + return True + + def exec(self) -> dict: + ds = rioxarray.open_rasterio(self.input_file) + mask_lon = (ds.x >= self.xmin) & (ds.x <= self.xmax) + mask_lat = (ds.y >= self.ymin) & (ds.y <= self.ymax) + + cropped_ds = ds.where(mask_lon & mask_lat, drop=True) + cropped_ds.rio.to_raster(self.output_file) + + return {"output_file": self.output_file} + + def change_metadata( + self, metadata: Optional[Dict[str, Metadata]] + ) -> Dict[str, Metadata]: + return metadata diff --git a/funcs/readers/dcat_read_no_repr.py b/funcs/readers/dcat_read_no_repr.py index d6ed0bf1..60373a72 100644 --- a/funcs/readers/dcat_read_no_repr.py +++ b/funcs/readers/dcat_read_no_repr.py @@ -28,7 +28,7 @@ class DcatReadNoReprFunc(IFunc): func_type = IFuncType.READER friendly_name: str = " Data Catalog Reader Without repr File" inputs = {"dataset_id": ArgType.String} - outputs = {"data": ArgType.String} + outputs = {"data_path": ArgType.String} example = {"dataset_id": "05c43c58-ed42-4830-9b1f-f01059c4b96f"} def __init__(self, dataset_id: str): @@ -48,7 +48,7 @@ def __init__(self, dataset_id: str): def exec(self) -> dict: data_path = self.resource_manager.download( - self.resource_id, self.resource_metadata, should_redownload=True + self.resource_id, self.resource_metadata, should_redownload=False ) return {"data_path": data_path} diff --git a/run b/run index 0c9b76b9..ec9f5bd9 100755 --- a/run +++ b/run @@ -4,7 +4,7 @@ env_file=$(readlink -f ./env.env) BASEDIR=`dirname $0` pushd /ws echo "Running DAME execution" -dotenv -f env.env run python -m dtran.dame.exec "$@" +dotenv -f $env_file run python -m dtran.dame.exec "$@" while [[ "$#" -gt 0 ]] do