From ee1e0090754811c33b33aac3053583ea93220dd2 Mon Sep 17 00:00:00 2001 From: pkdash Date: Tue, 7 Mar 2023 15:14:51 -0500 Subject: [PATCH] [#44] initial work for editing data for geo-feature aggregation --- hsclient/hydroshare.py | 315 ++++++++++++++++++++++++++++++----------- 1 file changed, 231 insertions(+), 84 deletions(-) diff --git a/hsclient/hydroshare.py b/hsclient/hydroshare.py index 28939b4..18a490e 100644 --- a/hsclient/hydroshare.py +++ b/hsclient/hydroshare.py @@ -2,6 +2,7 @@ import os import pathlib import pickle +import shutil import sqlite3 import tempfile import time @@ -11,7 +12,7 @@ from functools import wraps from posixpath import basename, dirname, join as urljoin, splitext from pprint import pformat -from typing import Dict, List, Union, TYPE_CHECKING +from typing import Dict, List, Union, TYPE_CHECKING, Callable from urllib.parse import quote, unquote, urlparse from zipfile import ZipFile @@ -231,7 +232,7 @@ def _download(self, save_path: str = "", unzip_to: str = None) -> str: return unzip_to return downloaded_zip - def _validate_aggregation_path(self, agg_path: str): + def _validate_aggregation_path(self, agg_path: str, for_save_data: bool = False) -> str: main_file_ext = pathlib.Path(self.main_file_path).suffix file_name = self.file(extension=main_file_ext).name file_path = urljoin(agg_path, file_name) @@ -239,9 +240,26 @@ def _validate_aggregation_path(self, agg_path: str): file_path = urljoin(file_path, file_name) if not os.path.exists(file_path): raise Exception(f"Aggregation was not found at: {agg_path}") + + if for_save_data: + if self.metadata.type == AggregationType.GeographicFeatureAggregation: + if file_path == self._data_object.path: + raise Exception(f"Aggregation path '{agg_path}' is not a valid path. This should be a path where " + f"you have the updated shape files") + else: + for aggr_file in self.files(): + aggr_file = basename(aggr_file) + if aggr_file.endswith(".shp.xml") or aggr_file.endswith(".sbn") or aggr_file.endswith(".sbx"): + # these are optional files for geo feature aggregation + continue + if not os.path.exists(os.path.join(agg_path, aggr_file)): + raise Exception(f"Aggregation path '{agg_path}' is not a valid path. " + f"Missing file '{aggr_file}'") return file_path - def _get_data_object(self, agg_path, func): + def _get_data_object(self, agg_path: str, func: Callable) -> \ + Union['pandas.DataFrame', 'fiona.Collection', 'rasterio.DatasetReader', 'xarray.Dataset']: + if self._data_object is not None and self.metadata.type != AggregationType.TimeSeriesAggregation: return self._data_object @@ -251,75 +269,21 @@ def _get_data_object(self, agg_path, func): data_object.load() data_object.close() - # cache the object for the aggregation + # cache the data object for the aggregation self._data_object = data_object return data_object - def _save_data_object(self, resource, agg_path: str, as_new_aggr=False, destination_path=""): + def _validate_aggregation_for_update(self, resource: 'Resource', agg_type: AggregationType) -> None: + if self.metadata.type != agg_type: + raise Exception(f"Not a {agg_type.value} aggregation") + if self._data_object is None: raise Exception("No data object exists for this aggregation.") - file_path = self._validate_aggregation_path(agg_path) - if self.metadata.type == AggregationType.MultidimensionalAggregation: - self._data_object.to_netcdf(file_path, format="NETCDF4") - - if self.metadata.type == AggregationType.TimeSeriesAggregation: - with closing(sqlite3.connect(file_path)) as conn: - # write the dataframe to a temp table - self._data_object.to_sql('temp', conn, if_exists='replace', index=False) - # delete the matching records from the TimeSeriesResultValues table - conn.execute("DELETE FROM TimeSeriesResultValues WHERE ResultID IN (SELECT ResultID FROM temp)") - conn.execute("INSERT INTO TimeSeriesResultValues SELECT * FROM temp") - # delete the temp table - conn.execute("DROP TABLE temp") - conn.commit() - - aggr_file_path = self.main_file_path - data_object = self._data_object - aggr_type = self.metadata.type - if not as_new_aggr: - # cache some of the metadata fields of the original aggregation to update the metadata of the - # updated aggregation - # TODO: There may be additional metadata fields that we need to consider to use for the updated aggregation - keywords = self.metadata.subjects - additional_meta = self.metadata.additional_metadata - if aggr_type == AggregationType.TimeSeriesAggregation: - title = self.metadata.title - abstract = self.metadata.abstract - - # delete this aggregation from Hydroshare - # TODO: If the creation of the replacement aggregation fails for some reason, then with the following - # delete action we will lose this aggregation from HydroShare. Need to keep a copy of the - # original aggregation locally so that we can upload that to HydroShare. - self.delete() - - # upload the updated data file to the same location as the aggregation it's replacing - this should - # create a new aggregation of the same type - resource.file_upload(file_path) - - # retrieve the updated aggregation - aggr = resource.aggregation(file__path=aggr_file_path) - - # update metadata - for kw in keywords: - if kw not in aggr.metadata.subjects: - aggr.metadata.subjects.append(kw) - aggr.metadata.additional_metadata = additional_meta - if aggr_type == AggregationType.TimeSeriesAggregation: - aggr.metadata.title = title - aggr.metadata.abstract = abstract - aggr.save() - else: - # upload the data file to the path as specified by 'destination_path' to create a - # new aggregation of the same type - resource.file_upload(file_path, destination_path=destination_path) - - # retrieve the new aggregation - aggr_path = urljoin(destination_path, os.path.basename(aggr_file_path)) - aggr = resource.aggregation(file__path=aggr_path) - - aggr._data_object = data_object - return aggr + # check this aggregation is part of the specified resource + aggr = resource.aggregation(file__path=self.main_file_path) + if aggr is None: + raise Exception("This aggregation is not part of the specified resource.") @property def metadata_file(self): @@ -484,7 +448,7 @@ def as_multi_dimensional_dataset(self, agg_path: str) -> 'xarray.Dataset': if self.metadata.type != AggregationType.MultidimensionalAggregation: raise Exception("Aggregation is not of type NetCDF") if xarray is None: - raise Exception("xarray package not found") + raise Exception("xarray package was not found") return self._get_data_object(agg_path=agg_path, func=xarray.open_dataset) @@ -502,7 +466,7 @@ def as_feature_collection(self, agg_path: str) -> 'fiona.Collection': if self.metadata.type != AggregationType.GeographicFeatureAggregation: raise Exception("Aggregation is not of type GeoFeature") if fiona is None: - raise Exception("fiona package not found") + raise Exception("fiona package was not found") return self._get_data_object(agg_path=agg_path, func=fiona.open) def as_raster_dataset(self, agg_path: str) -> 'rasterio.DatasetReader': @@ -519,17 +483,25 @@ def as_raster_dataset(self, agg_path: str) -> 'rasterio.DatasetReader': if self.metadata.type != AggregationType.GeographicRasterAggregation: raise Exception("Aggregation is not of type GeoRaster") if rasterio is None: - raise Exception("rasterio package not found") + raise Exception("rasterio package was not found") return self._get_data_object(agg_path=agg_path, func=rasterio.open) def as_data_object(self, agg_path: str, series_id: str = "") -> \ - Union['pandas.DataFrame', 'fiona.Collection', 'rasterio.DatasetReader', 'xarray.Dataset', None]: - """Load aggregation data to a relevant data object type""" + Union['pandas.DataFrame', 'fiona.Collection', 'rasterio.DatasetReader', 'xarray.Dataset']: + """ + Loads aggregation data to a relevant data object type. Data for a timeseries aggregation is loaded as pandas + DataFrame, data for a geo feature aggregation os loaded as a fiona Collection object, data for a raster + aggregation is loaded as rasterio DatasetReader object, and data for a netcdf aggregation is loaded as xarray + Dataset object. + :param agg_path: The local path where this aggregation has been downloaded previously. + :param series_id: The series_id of the timeseries result to be converted to a Dataframe object. A value for this + parameter is required only for a timeseries aggregation. + """ if self.metadata.type == AggregationType.TimeSeriesAggregation: if not series_id: - raise Exception("Please specify series_id for which the timeseries data object is needed.") + raise Exception("Provide the series_id for which the timeseries data object is needed.") return self.as_series(series_id=series_id, agg_path=agg_path) if self.metadata.type == AggregationType.MultidimensionalAggregation: return self.as_multi_dimensional_dataset(agg_path=agg_path) @@ -540,7 +512,8 @@ def as_data_object(self, agg_path: str, series_id: str = "") -> \ raise Exception(f"Data object is not supported for '{self.metadata.type}' aggregation type") - def update_netcdf_data(self, resource, agg_path: str, as_new_aggr=False, destination_path="") -> 'Aggregation': + def update_netcdf_data(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, + destination_path: str = "") -> 'Aggregation': """ Updates the netcdf file associated with this aggregation. Then uploads the updated netcdf file to create a new aggregation that replaces the original aggregation. @@ -556,16 +529,50 @@ def update_netcdf_data(self, resource, agg_path: str, as_new_aggr=False, destina # TODO: if we decide that the user will prefer to use `save_data_object` rather than this method, then # make this method as a private method. - if self.metadata.type != AggregationType.MultidimensionalAggregation: - raise Exception("Not a NetCDF aggregation") + self._validate_aggregation_for_update(resource, AggregationType.MultidimensionalAggregation) + file_path = self._validate_aggregation_path(agg_path, for_save_data=True) + self._data_object.to_netcdf(file_path, format="NETCDF4") + aggr_main_file_path = self.main_file_path + data_object = self._data_object + if not as_new_aggr: + destination_path = dirname(self.main_file_path) + + # cache some of the metadata fields of the original aggregation to update the metadata of the + # updated aggregation + keywords = self.metadata.subjects + additional_meta = self.metadata.additional_metadata + + # TODO: keep a local backup copy of the aggregation before deleting it + self.delete() + resource.file_upload(file_path, destination_path=destination_path) + + # retrieve the updated aggregation + aggr = resource.aggregation(file__path=aggr_main_file_path) - return self._save_data_object(resource, agg_path, as_new_aggr, destination_path) + # update metadata + for kw in keywords: + if kw not in aggr.metadata.subjects: + aggr.metadata.subjects.append(kw) + aggr.metadata.additional_metadata = additional_meta + aggr.save() + else: + # creating a new aggregation by uploading the updated data files + resource.file_upload(file_path, destination_path=destination_path) - def update_timeseries_data(self, resource, agg_path: str, as_new_aggr=False, - destination_path="") -> 'Aggregation': + # retrieve the new aggregation + agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path)) + aggr = resource.aggregation(file__path=agg_path) + data_object = None + + aggr._data_object = data_object + return aggr + + def update_timeseries_data(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, + destination_path: str = "") -> 'Aggregation': """ Updates the sqlite file associated with this aggregation. Then uploads the updated sqlite file - to create a new aggregation that replaces the original aggregation. + to create a new aggregation that either replaces the original aggregation or adds as a new + aggregation. :param resource: The resource object to which this aggregation belongs. :param agg_path: The local path where this aggregation has been downloaded previously. :param as_new_aggr: If True a new aggregation will be created, otherwise this aggregation will be @@ -578,12 +585,149 @@ def update_timeseries_data(self, resource, agg_path: str, as_new_aggr=False, # TODO: if we decide that the user will prefer to use `save_data_object` rather than this method, then # make this method as a private method. - if self.metadata.type != AggregationType.TimeSeriesAggregation: - raise Exception("Not a timeseries aggregation") + self._validate_aggregation_for_update(resource, AggregationType.TimeSeriesAggregation) + file_path = self._validate_aggregation_path(agg_path, for_save_data=True) + with closing(sqlite3.connect(file_path)) as conn: + # write the dataframe to a temp table + self._data_object.to_sql('temp', conn, if_exists='replace', index=False) + # delete the matching records from the TimeSeriesResultValues table + conn.execute("DELETE FROM TimeSeriesResultValues WHERE ResultID IN (SELECT ResultID FROM temp)") + conn.execute("INSERT INTO TimeSeriesResultValues SELECT * FROM temp") + # delete the temp table + conn.execute("DROP TABLE temp") + conn.commit() + + aggr_main_file_path = self.main_file_path + data_object = self._data_object + if not as_new_aggr: + destination_path = dirname(self.main_file_path) - return self._save_data_object(resource, agg_path, as_new_aggr, destination_path) + # cache some of the metadata fields of the original aggregation to update the metadata of the + # updated aggregation + keywords = self.metadata.subjects + additional_meta = self.metadata.additional_metadata + title = self.metadata.title + abstract = self.metadata.abstract + + # TODO: If the creation of the replacement aggregation fails for some reason, then with the following + # delete action we will lose this aggregation from HydroShare. Need to keep a copy of the + # original aggregation locally so that we can upload that to HydroShare if needed. + self.delete() + resource.file_upload(file_path, destination_path=destination_path) + # retrieve the updated aggregation + aggr = resource.aggregation(file__path=aggr_main_file_path) + + # update metadata + for kw in keywords: + if kw not in aggr.metadata.subjects: + aggr.metadata.subjects.append(kw) + aggr.metadata.additional_metadata = additional_meta + aggr.metadata.title = title + aggr.metadata.abstract = abstract + aggr.save() + else: + # creating a new aggregation by uploading the updated data files + resource.file_upload(file_path, destination_path=destination_path) + + # retrieve the new aggregation + agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path)) + aggr = resource.aggregation(file__path=agg_path) + data_object = None - def save_data_object(self, resource, agg_path: str, as_new_aggr=False, destination_path="") -> 'Aggregation': + aggr._data_object = data_object + return aggr + + def update_geo_feature_data(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, + destination_path: str = "") -> 'Aggregation': + """ + Updates the shape files associated with this aggregation. Then uploads all files associated with this + aggregation to create a new aggregation that either replaces the original aggregation or adds as a new + aggregation. + :param resource: The resource object to which this aggregation belongs. + :param agg_path: The local path where this aggregation has been downloaded previously. + :param as_new_aggr: If True a new aggregation will be created, otherwise this aggregation will be + updated/replaced. + :param destination_path: The destination folder path where the new aggregation will be created. This folder + path must already exist in resource. This parameter is used only when 'as_new_aggr' is True. + :return: The updated geo-feature aggregation or a new geo-feature aggregation (an instance of Aggregation) + """ + + # TODO: if we decide that the user will prefer to use `save_data_object` rather than this method, then + # make this method as a private method. + + def upload_shape_files(main_file_path, dst_path=""): + shp_file_dir_path = os.path.dirname(main_file_path) + filename_starts_with = f"{pathlib.Path(main_file_path).stem}." + shape_files = [] + for item in os.listdir(shp_file_dir_path): + if item.startswith(filename_starts_with): + file_full_path = os.path.join(shp_file_dir_path, item) + shape_files.append(file_full_path) + resource.file_upload(*shape_files, destination_path=dst_path) + + self._validate_aggregation_for_update(resource, AggregationType.GeographicFeatureAggregation) + file_path = self._validate_aggregation_path(agg_path, for_save_data=True) + aggr_main_file_path = self.main_file_path + data_object = self._data_object + if not as_new_aggr: + destination_path = dirname(self.main_file_path) + + # cache some of the metadata fields of the original aggregation to update the metadata of the + # updated aggregation + keywords = self.metadata.subjects + additional_meta = self.metadata.additional_metadata + + # TODO: keep a local backup copy of the aggregation before deleting it + self.delete() + # copy the updated shape files to the original shape file location where the user downloaded the + # aggregation previously + src_shp_file_dir_path = os.path.dirname(file_path) + tgt_shp_file_dir_path = os.path.dirname(data_object.path) + agg_path = tgt_shp_file_dir_path + filename_starts_with = f"{pathlib.Path(file_path).stem}." + + # need to close the fiona.Collection object to free up access to all the original shape files + data_object.close() + + for item in os.listdir(src_shp_file_dir_path): + if item.startswith(filename_starts_with): + src_file_full_path = os.path.join(src_shp_file_dir_path, item) + tgt_file_full_path = os.path.join(tgt_shp_file_dir_path, item) + shutil.copyfile(src_file_full_path, tgt_file_full_path) + + # upload the updated shape files to replace this aggregation + upload_shape_files(main_file_path=data_object.path, dst_path=destination_path) + + # retrieve the updated aggregation + aggr = resource.aggregation(file__path=aggr_main_file_path) + + # update aggregation metadata + for kw in keywords: + if kw not in aggr.metadata.subjects: + aggr.metadata.subjects.append(kw) + aggr.metadata.additional_metadata = additional_meta + aggr.save() + + # load aggregation data to fiona Collection object + data_object = aggr.as_data_object(agg_path=agg_path) + else: + # creating a new aggregation + # close the original fiona Collection object + data_object.close() + + # upload the updated shape files to create a new geo feature aggregation + upload_shape_files(main_file_path=file_path, dst_path=destination_path) + + # retrieve the new aggregation + agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path)) + aggr = resource.aggregation(file__path=agg_path) + data_object = None + + aggr._data_object = data_object + return aggr + + def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, + destination_path: str = "") -> 'Aggregation': """ Updates the data file(s) of this aggregation using the associated data processing object and either updates this aggregation or creates a new aggregation using the updated data files. @@ -594,7 +738,10 @@ def save_data_object(self, resource, agg_path: str, as_new_aggr=False, destinati if self.metadata.type == AggregationType.TimeSeriesAggregation: return self.update_timeseries_data(resource, agg_path, as_new_aggr, destination_path) - # TODO: Implement this functionality for Raster and GeoFeature aggregations + if self.metadata.type == AggregationType.GeographicFeatureAggregation: + return self.update_geo_feature_data(resource, agg_path, as_new_aggr, destination_path) + + # TODO: Implement this functionality for Raster aggregation raise Exception("Saving of data object is not supported for this aggregation type")