diff --git a/AODN/AODN-WAVE-NRT/ARDC_API_NRT/ardc_nrt/ardc_sofar_nrt.py b/AODN/AODN-WAVE-NRT/ARDC_API_NRT/ardc_nrt/ardc_sofar_nrt.py index 17f7c889..0610622f 100755 --- a/AODN/AODN-WAVE-NRT/ARDC_API_NRT/ardc_nrt/ardc_sofar_nrt.py +++ b/AODN/AODN-WAVE-NRT/ARDC_API_NRT/ardc_nrt/ardc_sofar_nrt.py @@ -23,6 +23,9 @@ from dateutil.relativedelta import relativedelta from dateutil.rrule import rrule, MONTHLY +limit_outlier = 0.02 # value (in degree) where a data point is considered outlier +limit_error = 0.1 # value (in degree) where a data point is considered erroneous + def process_wave_source_id(source_id, incoming_path=None): """ @@ -40,6 +43,22 @@ def process_wave_source_id(source_id, incoming_path=None): LOGGER.info(f'site_name: {site_name}') api_sofar = sofarApi() + + # checking for missing tokens: + if api_sofar.lookup_get_source_id_token(source_id) is not None: + pass + else: + message_add_error(source_id, "API error: missing token. Please note that institution names are case sensitive.") + return + + # warning for missing deployment date (removed buoys are kept in the metadata, but without deployment date) + if sources_id_metadata[source_id][ + 'deployment_start_date'] != "" and ardc_lookup.get_source_id_deployment_start_date(source_id) is not None: + pass + else: + message_add_warning(source_id, "Deployment start date missing. Download of data skipped.") + return + latest_timestamp_available_source_id = api_sofar.get_source_id_latest_timestamp(source_id) ardc_pickle = ardcPickle(OUTPUT_PATH) @@ -48,7 +67,7 @@ def process_wave_source_id(source_id, incoming_path=None): timestamp_start_end = get_timestamp_start_end_to_download(config.conf_dirpath, source_id, latest_timestamp_available_source_id, latest_timestamp_processed_source_id) - if not timestamp_start_end: # already up to date + if not timestamp_start_end: # data downloaded already up to date return timestamp_start, timestamp_end = timestamp_start_end @@ -56,21 +75,250 @@ def process_wave_source_id(source_id, incoming_path=None): # api call to download one month at a time start_date = timestamp_start.replace(tzinfo=datetime.timezone.utc) end_date = timestamp_end.to_pydatetime().replace(tzinfo=datetime.timezone.utc) - months_to_download = [dt for dt in rrule(MONTHLY, dtstart=start_date, until=end_date + relativedelta(months=1))][0:-1] + months_to_download = [dt for dt in rrule(MONTHLY, dtstart=start_date, until=end_date + relativedelta(months=1))][ + 0:-1] + + # check the coordinates of the api vs buoy metadata. Will be compared to the coordinates in monthly API data + data_full = api_sofar.get_source_id_wave_data_time_range(source_id, start_date, end_date) + lat_meta = sources_id_metadata[source_id]['latitude_nominal'] + lon_meta = sources_id_metadata[source_id]['longitude_nominal'] + lat_api_last = data_full.iloc[-1]['latitude'] + lon_api_last = data_full.iloc[-1]['longitude'] + diff_lat_last_api = abs(lat_api_last - lat_meta) + diff_lon_last_api = abs(lon_api_last - lon_meta) for month in months_to_download: + data = api_sofar.get_source_id_wave_data_time_range(source_id, month, month + relativedelta(months=1)) if data is None: - - LOGGER.error(f"Processing {source_id} aborted. No data available BETWEEN {month} AND {month + relativedelta(months=1)}") - return + message_add_warning(source_id, f"No data available BETWEEN {month} AND {month + relativedelta(months=1)}.") + continue if data is not None: + # Comparing the coordinates of last data in API to metadata and finding the outliers (2-10km) and + # errors (>10km) + # It logs the first and last outliers and errors and check how long for, and issues error or warning. + + lat_month_last = data.iloc[-1]['latitude'] + lon_month_last = data.iloc[-1]['longitude'] + diff_lat_last = abs(lat_month_last - lat_meta) + diff_lon_last = abs(lon_month_last - lon_meta) + + outliers_coordinates = data[ + ((abs(data['latitude'] - lat_meta) > limit_outlier) & ( + abs(data['latitude'] - lat_meta) < limit_error)) | + ((abs(data['longitude'] - lon_meta) > limit_outlier) & ( + abs(data['longitude'] - lon_meta) < limit_error))] + error_coordinates = data[ + (abs(data['latitude'] - lat_meta) >= limit_error) | (abs(data['longitude'] - lon_meta) >= limit_error)] + + if outliers_coordinates.empty: + pass + else: + first_outlier = outliers_coordinates.iloc[0]['timestamp'].strftime("%Y-%m-%d %H:%M:%S") + last_outlier = outliers_coordinates.iloc[-1]['timestamp'].strftime("%Y-%m-%d %H:%M:%S") + + # if there is data that has not been downloaded before the coordinates issue, it will get it + partial_data_outlier = data[data['timestamp'] < outliers_coordinates.iloc[0]['timestamp']] + + if error_coordinates.empty: + pass + else: + first_error_coordinates = error_coordinates.iloc[0]['timestamp'].strftime("%Y-%m-%d %H:%M:%S") + last_error_coordinates = error_coordinates.iloc[-1]['timestamp'].strftime("%Y-%m-%d %H:%M:%S") + time_error = (datetime.datetime.strptime(last_error_coordinates, "%Y-%m-%d %H:%M:%S") - + datetime.datetime.strptime(first_error_coordinates, "%Y-%m-%d %H:%M:%S")) + error_month = month + error_month.strftime("%Y-%m-%d %H:%M:%S") + + # if there is data that has not been downloaded before the coordinates issue, it will get it + partial_data_error = data[data['timestamp'] < error_coordinates.iloc[0]['timestamp']] + + if outliers_coordinates.empty and error_coordinates.empty: + # no outlier, no error, no problem + pass + elif error_coordinates.empty: + # some outliers, but generally no massive issue. + # if the last data of the month is ok, considering it to be a glitch or temporary move, + # just log a warning saying that outliers between X and Y. + # if the last (of the month) is not ok but the last (of the full data) is, also temporary move + # (this is necessary in cases where the last data of a month is incorrect, otherwise that month will be skipped forever + # if the last is not ok, skip with an error and say that the data download will not resume at that + # site until buoy is back where it should be (in general it is only for one day or 2) + if (diff_lat_last < limit_outlier and diff_lon_last < limit_outlier): + message_add_warning(source_id, + f"Outliers (<10km) between {first_outlier} and {last_outlier}, but buoy back at location.") + + # list_warning_sites[f"{source_id} ( {site_name} )"] = \ + # (f"Outliers (<10km) between {first_outlier} and {last_outlier}, but buoy back at location.") + elif (diff_lat_last_api < limit_outlier and diff_lon_last_api < limit_outlier): + message_add_warning(source_id, + f"Outliers (<10km) between {first_outlier} and {last_outlier}, but buoy back at location during following month(s).") + + else: + if partial_data_outlier.empty: + pass + else: + template_dirpath = config.conf_dirpath + process_wave_dataframe(partial_data_outlier, source_id, template_dirpath, OUTPUT_PATH, + incoming_path) + + message_add_error(source_id, + f"Last data point ({last_outlier}) is an outlier (<10km). " + f"Data until {first_outlier} has been downloaded, but download for later data, will only " + f"resume when buoy is back at location." + ) + + return + + else: + # coordinates are erroneous (>limit_error or 10km) generally means there is an issue with location + # if the last coordinate is good and the difference between first and last error is less than 2 days, + # it indicates that the buoy was moved and placed back (possibly to clean or do punctual repairs). + # A warning is logged asking to contact facility to check and the data download goes as usual + # if the last coordinates is not good but difference is less than 7 days, + # does not download and gives error "check again after one week) + # if the coordinates are erroneous for more than a week, fully stops the download and will require manual input + if time_error.days < 7 and ((diff_lat_last < limit_outlier and diff_lon_last < limit_outlier) or + (diff_lat_last_api < limit_outlier and diff_lon_last_api < limit_outlier)): + + message_add_warning(source_id, + f"Coordinates errors (>10km) between {first_error_coordinates} and {last_error_coordinates}, " + f"but buoy back at location (within a week of first incidence). " + f"This usually indicates the buoy was moved to land for cleaning or repairs. " + f"The data has been downloaded nonetheless, monitor carefully for future errors." + ) + else: + if partial_data_error.empty: + pass + else: + template_dirpath = config.conf_dirpath + process_wave_dataframe(partial_data_error, source_id, template_dirpath, OUTPUT_PATH, + incoming_path) + + message_add_error(source_id, + f"Coordinates errors (>10km) since {first_error_coordinates}. " + f" Data until {first_error_coordinates} has been downloaded, but download for later data " + f"will not occur until buoy is back at location (within a week) or coordinates rectified " + f"(current coordinates of API are: lat: {error_coordinates.iloc[-1]['latitude']}" + f", lon: {error_coordinates.iloc[-1]['longitude']}).") + return + + if time_error.days < 7: + message_add_error(source_id, + f"Coordinates errors (>10km) since {first_error_coordinates}. " + f"The download has been stopped until buoy is back at location (within a week) or coordinates rectified " + f"(current coordinates of API are: lat: {error_coordinates.iloc[-1]['latitude']}" + f" lon: {error_coordinates.iloc[-1]['longitude']})." + ) + return + else: + message_add_error(source_id, + f"Coordinates errors (>10km) from {first_error_coordinates} to {last_error_coordinates}. " + f" The download has been stopped since {first_error_coordinates}. " + f"Please contact {sources_id_metadata[source_id]['institution']} to check buoy, or " + f"manually change the deployment start date." + f"(current coordinates of API are: lat: {error_coordinates.iloc[-1]['latitude']}" + f", lon: {error_coordinates.iloc[-1]['longitude']})." + ) + return + template_dirpath = config.conf_dirpath process_wave_dataframe(data, source_id, template_dirpath, OUTPUT_PATH, incoming_path) +def message_add_warning(source_id, message: str): + sources_id_metadata = ardc_lookup.get_sources_id_metadata() + site_name = sources_id_metadata[source_id]['site_name'] + list_warning_sites[f"{source_id} ( {site_name} )"] = message + LOGGER.info(message) + + +def message_add_error(source_id, message: str): + sources_id_metadata = ardc_lookup.get_sources_id_metadata() + site_name = sources_id_metadata[source_id]['site_name'] + list_errors_sites[f"{source_id} ( {site_name} )"] = message + LOGGER.error(message) + + +def message_final_logger(type: str): + # creating a message(s) for successful, errors and/or warnings, to be displayed at end of log and/or notified (email) + message_final = '\n \n ******************** \n \n' + str(datetime.datetime.now()) + ' : \n' + message_error = '' + message_warning = '' + + if not list_errors_sites: + message_final = message_final + 'The data download was completed successfully' + else: + message_error = message_error + 'Partial data download. The following buoys were skipped due to errors:\n' + ",\n".join( + "{!r}: {!r}".format(k, v) for k, v in list_errors_sites.items()) + message_final = message_final + message_error + + if not list_warning_sites: + pass + else: + message_warning = message_warning + '\n \nThe following warnings were raised: \n' + ",\n".join( + "{!r}: {!r}".format(k, v) for k, v in list_warning_sites.items()) + ( + "\n * Note that 'deployment start date missing' usually denotes a buoy IMOS does not " + "handle anymore but could indicate missing metadata.") + message_final = message_final + message_warning + + message_final = message_final + '\n \n ******************** \n \n' + + if type == "error": + return message_error + elif type == "warning": + return message_warning + else: + return message_final + +def check_unique_ids(sources_id_metadata): + # check for duplicates + dupli_id = [] + dupli_name = [] + unique_source_id = [] + unique_site_name = [] + + for source_id in sources_id_metadata.keys(): + if source_id not in unique_source_id: + unique_source_id.append(source_id) + else: + dupli_id.append(source_id) + continue + site_name = sources_id_metadata[source_id]['site_name'] + + if site_name not in unique_site_name: + unique_site_name.append(site_name) + else: + dupli_name.append(source_id) + + if not dupli_id: + pass + else: + for source_id in dupli_id: + unique_source_id.remove(source_id) + message_add_error(source_id, "The spotter id is duplicated in metadata and data was not downloaded.") + + if not dupli_name: + pass + else: + for source_id in dupli_name: + # # this bit considers that a duplicated name is nota problem, but sends a warning. + # # if the error is preferred, use the commented code instead. + + message_add_warning(source_id, + "The location name is duplicated, the data was still downloaded. Please remove old deployment from metadata, and check for duplicates" + ) + # # if error., comment warning message above and uncomment the following + # unique_source_id.remove(source_id) + # message_add_error(source_id, + # "The location name is duplicated and data was not downloaded for either. Please remove old deployment from metadata." + # ) + + return(unique_source_id) + + if __name__ == "__main__": vargs = args() @@ -93,7 +341,20 @@ def process_wave_source_id(source_id, incoming_path=None): ardc_lookup = lookup(api_config) sources_id_metadata = ardc_lookup.get_sources_id_metadata() - for source_id in sources_id_metadata.keys(): - process_wave_source_id(source_id, incoming_path=vargs.incoming_path) + list_errors_sites = {} + list_warning_sites = {} + + # check for duplicates: + unique_source_id = check_unique_ids(sources_id_metadata) + + for source_id in unique_source_id: + try: + process_wave_source_id(source_id, incoming_path=vargs.incoming_path) + except Exception as e: + message_add_error(source_id, e) + + LOGGER.info(message_final_logger(type="final")) + # LOGGER.info(message_final_logger(type="error")) + # # The message_final can be used in either sumo logic or notifications. diff --git a/AODN/AODN-WAVE-NRT/ARDC_API_NRT/ardc_nrt/config/sofar/template_aqualink.json b/AODN/AODN-WAVE-NRT/ARDC_API_NRT/ardc_nrt/config/sofar/template_aqualink.json new file mode 100644 index 00000000..c792bda6 --- /dev/null +++ b/AODN/AODN-WAVE-NRT/ARDC_API_NRT/ardc_nrt/config/sofar/template_aqualink.json @@ -0,0 +1,185 @@ +{ + "_dimensions": { + "TIME": null, + "timeSeries": 1 + }, + "_variables": { + "timeSeries": { + "_datatype": "int16", + "_dimensions": ["timeSeries"], + "long_name": "unique identifier for each feature instance", + "cf_role": "timeseries_id" + }, + "TIME": { + "_datatype": "float64", + "_dimensions": ["TIME"], + "units": "days since 1950-01-01 00:00:00 UTC", + "calendar": "gregorian", + "standard_name": "time", + "long_name": "time", + "axis": "T", + "valid_min": 0.0, + "valid_max": 90000.0 + }, + "LATITUDE": { + "_FillValue": -9999.0, + "_datatype": "float64", + "_dimensions": ["TIME"], + "standard_name": "latitude", + "long_name": "latitude", + "units": "degrees_north", + "axis": "Y", + "valid_min": -90.0, + "valid_max": 90.0, + "reference_datum": "WGS84 coordinate reference system; EPSG:4326" + }, + "LONGITUDE": { + "_FillValue": -9999.0, + "_datatype": "float64", + "_dimensions": ["TIME"], + "standard_name": "longitude", + "long_name": "longitude", + "units": "degrees_east", + "axis": "X", + "valid_min": -180.0, + "valid_max": 180.0, + "reference_datum": "WGS84 coordinate reference system; EPSG:4326" + }, + "WPFM": { + "_dimensions": ["TIME"], + "_datatype": "float64", + "standard_name": "sea_surface_wave_mean_period_from_variance_spectral_density_first_frequency_moment", + "long_name": "sea surface wave spectral mean period", + "units": "s", + "_FillValue": -9999.0, + "valid_min": 0.0, + "valid_max": 50.0, + "method": "Spectral analysis method", + "ancillary_variable": "WAVE_quality_control", + "coordinates": "TIME LATITUDE LONGITUDE" + }, + "WPPE": { + "_dimensions": ["TIME"], + "_datatype": "float64", + "standard_name": "sea_surface_wave_period_at_variance_spectral_density_maximum", + "long_name": "peak wave spectral period", + "units": "s", + "_FillValue": -9999.0, + "valid_min": 0.0, + "valid_max": 50.0, + "method": "Spectral analysis method", + "coordinates": "TIME LATITUDE LONGITUDE", + "ancillary_variable": "WAVE_quality_control", + "comment": "Period of the peak of the energy spectrum" + }, + "SSWMD": { + "_dimensions": ["TIME"], + "_datatype": "float64", + "standard_name": "sea_surface_wave_from_direction", + "long_name": "spectral sea surface wave mean direction", + "units": "Degrees", + "magnetic_declination": 12.86, + "compass_correction_applied": 13, + "positive": "clockwise", + "reference_datum": "true north", + "method": "Spectral analysis method", + "_FillValue": -9999.0, + "valid_min": 0.0, + "valid_max": 360.0, + "comment": "Direction (related to the magnetic north) from which the mean period waves are coming from", + "ancillary_variable": "WAVE_quality_control", + "coordinates": "TIME LATITUDE LONGITUDE" + }, + "WPDI": { + "_dimensions": ["TIME"], + "_datatype": "float64", + "standard_name": "sea_surface_wave_from_direction_at_variance_spectral_density_maximum", + "long_name": "spectral peak wave direction", + "units": "Degrees", + "magnetic_declination": 12.86, + "compass_correction_applied": 13, + "reference_datum": "true north", + "positive": "clockwise", + "_FillValue": -9999.0, + "valid_min": 0.0, + "valid_max": 360.0, + "method": "Spectral analysis method", + "ancillary_variable": "WAVE_quality_control", + "comment": "Direction (related to the magnetic north) from which the peak period waves are coming from", + "coordinates": "TIME LATITUDE LONGITUDE" + }, + "WMDS": { + "_dimensions": ["TIME"], + "_datatype": "float64", + "standard_name": "sea_surface_wave_directional_spread", + "long_name": "spectral sea surface wave mean directional spread", + "units": "Degrees", + "positive": "clockwise", + "_FillValue": -9999.0, + "valid_min": 0.0, + "valid_max": 360.0, + "method": "Spectral analysis method", + "ancillary_variable": "WAVE_quality_control", + "coordinates": "TIME LATITUDE LONGITUDE" + }, + "WPDS": { + "_dimensions": ["TIME"], + "_datatype": "float64", + "standard_name": "sea_surface_wave_directional_spread_at_variance_spectral_density_maximum", + "long_name": "spectral sea surface wave peak directional spread", + "units": "Degrees", + "_FillValue": -9999.0, + "valid_min": 0.0, + "valid_max": 360.0, + "reference_datum": "true north", + "positive": "clockwise", + "method": "Spectral analysis method", + "ancillary_variable": "WAVE_quality_control", + "coordinates": "TIME LATITUDE LONGITUDE" + }, + "WSSH": { + "_dimensions": ["TIME"], + "_datatype": "float64", + "standard_name": "sea_surface_wave_significant_height", + "long_name": "sea surface wave spectral significant height", + "units": "m", + "_FillValue": -9999.0, + "valid_min": 0.0, + "valid_max": 100.0, + "method": "Spectral analysis method", + "ancillary_variable": "WAVE_quality_control", + "coordinates": "TIME LATITUDE LONGITUDE" + }, + "WAVE_quality_control": { + "_dimensions": [ + "TIME" + ], + "_datatype": "b", + "long_name": "primary Quality Control flag for wave variables", + "_FillValue": -127, + "valid_min": 1, + "valid_max": 9, + "flag_values": [1,2,3,4,9], + "flag_meanings": "good not_evaluated questionable bad missing", + "quality_control_convention": "Ocean Data Standards, UNESCO 2013 - IOC Manuals and Guides, 54, Volume 3 Version 1" + } + }, + "Conventions": "CF-1.6", + "abstract": "Near real-time in-situ wave data data collected from a range of sites along Western Australia by a combination of Sofar Spotter and Datawell Mark IV wave buoys operated by UWA", + "acknowledgement": "Any users of IMOS data are required to clearly acknowledge the source of the material in the format: \\\"Data was sourced from Australia’s Integrated Marine Observing System (IMOS) - IMOS is enabled by the National Collaborative Research Infrastructure Strategy (NCRIS)\\\". This wave data was collected and quality controlled by the University of Western Australia in collaboration with Aqualink. This dataset is an output from the Catching Oz Waves project, which received investment (https://doi.org/10.47486/DP748) from the Australian Research Data Commons (ARDC). The ARDC is funded by the National Collaborative Research Infrastructure Strategy (NCRIS).", + "citation": "The citation to be used in publications using the dataset should follow the format: “University of Western Australia (UWA) [year-of-data-downloaded], Wave buoys Observations - Australia - near real-time, Downloaded from [url] on [date-of-download]", + "author": "AODN", + "author_email": "info@aodn.org.au", + "data_centre": "Australian Ocean Data Network (AODN)", + "data_centre_email": "info@aodn.org.au", + "disclaimer": "Data, products and services from UWA are provided \\\"as is\\\" without any warranty as to fitness for a particular purpose.", + "license": "http://creativecommons.org/licenses/by/4.0/", + "references": "http://www.imos.org.au", + "standard_name_vocabulary": "NetCDF Climate and Forecast CF Standard Name Table Version 78", + "project":"Catching Oz Waves", + "principal_investigator": "Jeff Hansen", + "principal_investigator_email": "jeff.hansen@uwa.edu.au", + "geospatial_lat_units": "degrees_north", + "geospatial_lon_units": "degrees-east", + "cdm_data_type": "Station" +} diff --git a/AODN/AODN-WAVE-NRT/ARDC_API_NRT/test_ardc_nrt/test_lib_common_lookup.py b/AODN/AODN-WAVE-NRT/ARDC_API_NRT/test_ardc_nrt/test_lib_common_lookup.py index 0a4d0d93..18395a6f 100644 --- a/AODN/AODN-WAVE-NRT/ARDC_API_NRT/test_ardc_nrt/test_lib_common_lookup.py +++ b/AODN/AODN-WAVE-NRT/ARDC_API_NRT/test_ardc_nrt/test_lib_common_lookup.py @@ -49,6 +49,23 @@ def test_lookup_get_source_id_deployment_start_date(self): self.assertEqual(pandas.Timestamp('2020-01-01 00:00:00+0000', tz='UTC'), val_function) + def test_lookup_unique_spotters_id(self): + # Checking that the spotter ids are unique. Requires "source_id_metadata.json" in the test folder to be current + val_function = self.ardc_lookup.get_sources_id_metadata() + spotter_ids = val_function.keys() + unique_spotter_ids = set(spotter_ids) + self.assertEqual(len(spotter_ids), len(unique_spotter_ids)) + + # def test_lookup_unique_spotters_locations(self): + # # Checking that the spotter locations are unique. Requires "source_id_metadata.json" in the test folder to be current + # # will return an error if old deployments are kept in the metadata. + # val_function = self.ardc_lookup.get_sources_id_metadata() + # spotter_names = val_function.loc["site_name"] + # unique_spotter_names = set(spotter_names) + # self.assertEqual(len(spotter_names), len(unique_spotter_names)) + + + if __name__ == '__main__': unittest.main()