diff --git a/scripts/data_importers/cmems_drifter.py b/scripts/data_importers/cmems_drifter.py index c455e762..d6f5f2d5 100644 --- a/scripts/data_importers/cmems_drifter.py +++ b/scripts/data_importers/cmems_drifter.py @@ -76,7 +76,8 @@ def main(uri: str, filename: str): print(fname) with xr.open_dataset(fname) as ds: - ds = reformat_coordinates(ds) + if ds.LATITUDE.size > 1: + ds = reformat_coordinates(ds) df = ds.to_dataframe().reset_index().dropna(axis=1, how="all").dropna() diff --git a/scripts/data_importers/cmems_glider.py b/scripts/data_importers/cmems_glider.py index 983699aa..e6fdbfe2 100644 --- a/scripts/data_importers/cmems_glider.py +++ b/scripts/data_importers/cmems_glider.py @@ -46,8 +46,8 @@ def reformat_coordinates(ds: xr.Dataset) -> xr.Dataset: return ds - def main(uri: str, filename: str): + """Import Glider NetCDF :param str uri: Database URI @@ -73,121 +73,114 @@ def main(uri: str, filename: str): for fname in filenames: print(fname) with xr.open_dataset(fname).drop_duplicates("TIME") as ds: - time_diff = np.diff(ds.TIME.data).astype("timedelta64[D]").astype(int) - breaks = np.argwhere(time_diff > 5).flatten() - deployment_times = np.split(ds.TIME, breaks + 1) - + ds = reformat_coordinates(ds) variables = [v for v in VARIABLES if v in ds.variables] - - for deployment in deployment_times: - subset = ds.sel(TIME=deployment) - subset = reformat_coordinates(subset) - - dep_date = np.datetime_as_string(deployment, unit="D")[0] - df = ( - subset[["TIME", "LATITUDE", "LONGITUDE", *variables]] - .to_dataframe() - .reset_index() - .dropna(axis=1, how="all") - .dropna() - ) - - # remove missing variables from variables list - variables = [v for v in VARIABLES if v in df.columns] - - for variable in variables: - if variable not in datatype_map: - statement = select(DataType).where( - DataType.key == subset[variable].standard_name + df = ( + ds[["TIME", "LATITUDE", "LONGITUDE", *variables]] + .to_dataframe() + .reset_index() + .dropna(axis=1, how='all') + .dropna() + ) + + # remove missing variables from variables list + variables = [v for v in VARIABLES if v in df.columns] + + for variable in variables: + if variable not in datatype_map: + statement = select(DataType).where( + DataType.key == ds[variable].standard_name + ) + dt = session.execute(statement).all() + if not dt: + dt = DataType( + key=ds[variable].standard_name, + name=ds[variable].long_name, + unit=ds[variable].units, ) - dt = session.execute(statement).all() - if not dt: - dt = DataType( - key=subset[variable].standard_name, - name=subset[variable].long_name, - unit=subset[variable].units, - ) - session.add(dt) - else: - dt = dt[0][0] - - datatype_map[variable] = dt - + session.add(dt) + else: + dt = dt[0][0] + + datatype_map[variable] = dt + + session.commit() + + p = Platform( + type=Platform.Type.glider, unique_id=f"{ds.attrs["platform_code"]}" + ) + attrs = { + "Glider Platform": ds.attrs["platform_code"], + "WMO": ds.attrs["wmo_platform_code"], + "Institution": ds.attrs["institution"], + } + p.attrs = attrs + + try: + session.add(p) session.commit() + except IntegrityError: + print("Error committing platform.") + session.rollback() + stmt = select(Platform.id).where(Platform.unique_id == ds.attrs["platform_code"]) + p.id = session.execute(stmt).first()[0] + + n_chunks = np.ceil(len(df)/1e4) + + if n_chunks < 1: + continue + + for chunk in np.array_split(df, n_chunks): + chunk["STATION_ID"] = 0 + + stations = [ + Station( + platform_id=p.id, + time=row.TIME, + latitude=row.LATITUDE, + longitude=row.LONGITUDE, + ) + for idx, row in chunk[["TIME","LATITUDE", "LONGITUDE"]].drop_duplicates().iterrows() + ] - platform_id = subset.attrs["platform_code"] - p = Platform( - type=Platform.Type.glider, unique_id=f"{platform_id}-{dep_date}" - ) - attrs = { - "Glider Platform": platform_id, - "WMO": subset.attrs["wmo_platform_code"], - "Institution": subset.attrs["institution"], - } - p.attrs = attrs - + # Using return_defaults=True here so that the stations will get + # updated with id's. It's slower, but it means that we can just + # put all the station ids into a pandas series to use when + # constructing the samples. try: - session.add(p) + session.bulk_save_objects(stations, return_defaults=True) session.commit() except IntegrityError: - print("Error committing platform.") + print("Error committing station.") session.rollback() - stmt = select(Platform.id).where( - Platform.unique_id == f"{platform_id}-{dep_date}" - ) - p.id = session.execute(stmt).first()[0] - n_chunks = np.ceil(len(df) / 1e4) + stmt = select(Station).where(Station.platform_id==p.id) + station_data = session.execute(stmt).all() - if n_chunks < 1: - continue + for station in station_data: + chunk.loc[chunk["TIME"]==station[0].time,"STATION_ID"] = station[0].id - for chunk in np.array_split(df, n_chunks): - stations = [ - Station( - platform_id=p.id, - time=row.TIME, - latitude=row.LATITUDE, - longitude=row.LONGITUDE, + samples = [ + [ + Sample( + station_id=row.STATION_ID, + depth=row.DEPTH, + value=row[variable], + datatype_key=datatype_map[variable].key, ) - for idx, row in chunk.iterrows() + for variable in variables ] + for idx, row in chunk.iterrows() + ] + try: + session.bulk_save_objects( + [item for sublist in samples for item in sublist] + ) + except IntegrityError: + print("Error committing samples.") + session.rollback() - # Using return_defaults=True here so that the stations will get - # updated with id's. It's slower, but it means that we can just - # put all the station ids into a pandas series to use when - # constructing the samples. - try: - session.bulk_save_objects(stations, return_defaults=True) - except IntegrityError: - print("Error committing station.") - session.rollback() - stmt = select(Station).where(Station.platform_id == p.id) - chunk["STATION_ID"] = session.execute(stmt).all() - - chunk["STATION_ID"] = [s.id for s in stations] - - samples = [ - [ - Sample( - station_id=row.STATION_ID, - depth=row.DEPTH, - value=row[variable], - datatype_key=datatype_map[variable].key, - ) - for variable in variables - ] - for idx, row in chunk.iterrows() - ] - try: - session.bulk_save_objects( - [item for sublist in samples for item in sublist] - ) - except IntegrityError: - print("Error committing samples.") - session.rollback() - - session.commit() + session.commit() if __name__ == "__main__":