From f709de5e49dfc6222471f1f063a0708a5aa18e91 Mon Sep 17 00:00:00 2001 From: Philip Date: Wed, 9 Oct 2024 08:11:49 -0700 Subject: [PATCH] RME scrape tweaking and logging --- lib/riverscapes/.vscode/launch.json | 4 +- lib/riverscapes/riverscapes/scrape-rme.py | 357 +++++++--------------- 2 files changed, 119 insertions(+), 242 deletions(-) diff --git a/lib/riverscapes/.vscode/launch.json b/lib/riverscapes/.vscode/launch.json index d199d16e..640abeb4 100644 --- a/lib/riverscapes/.vscode/launch.json +++ b/lib/riverscapes/.vscode/launch.json @@ -117,7 +117,9 @@ "args": [ "production", "{env:DATA_ROOT}/rme-scrape", - "colorado_rme_scrape", + "2024CONUS,colorado_rme_scrape", + // "--huc_filter", + // "14", "--delete", ] }, diff --git a/lib/riverscapes/riverscapes/scrape-rme.py b/lib/riverscapes/riverscapes/scrape-rme.py index 643c9139..7907b727 100644 --- a/lib/riverscapes/riverscapes/scrape-rme.py +++ b/lib/riverscapes/riverscapes/scrape-rme.py @@ -1,22 +1,27 @@ """ -Demo script to download files from Data Exchange +RME scrape. + +This script unpivots the DGO metrics from the RME output GeoPackages and stores them in a single output +feature class using the IGO points as geometries. + +1) Searches Data Exchange for RME projects with the specified tags (and optional HUC filter) +2) Downloads the RME output GeoPackages +3) Scrapes the metrics from the RME output GeoPackages into a single output GeoPackage +4) Optionally deletes the downloaded GeoPackages """ -import csv +from typing import Dict import shutil -import glob import re import os import sqlite3 import logging -import subprocess import argparse -from osgeo import ogr +from osgeo import ogr, osr from rsxml import dotenv, Logger, safe_makedirs from riverscapes import RiverscapesAPI, RiverscapesSearchParams # RegEx for finding the RME output GeoPackages RME_OUTPUT_GPKG_REGEX = r'.*riverscapes_metrics\.gpkg' -# RCAT_OUTPUT_GPKG_REGEX = r'.*rcat\.gpkg' def scrape_rme(rs_api: RiverscapesAPI, search_params: RiverscapesSearchParams, download_dir: str, output_gpkg: str, delete_downloads: bool) -> None: @@ -25,11 +30,15 @@ def scrape_rme(rs_api: RiverscapesAPI, search_params: RiverscapesSearchParams, d """ log = Logger('Scrape RME') + gpkg_driver = ogr.GetDriverByName("GPKG") + target_ds = None + target_layer = None + metric_col_types = {} + metric_ids = {} # Loop over all projects yielded by the search for project, _stats, _searchtotal in rs_api.search(search_params, progress_bar=True, page_size=100): try: - # Attempt to retrieve the huc10 from the project metadata if it exists huc10 = None for key in ['HUC10', 'huc10', 'HUC', 'huc']: @@ -48,7 +57,25 @@ def scrape_rme(rs_api: RiverscapesAPI, search_params: RiverscapesSearchParams, d huc_dir = os.path.join(download_dir, huc10) rme_gpkg = download_file(rs_api, project.id, os.path.join(huc_dir, 'rme'), RME_OUTPUT_GPKG_REGEX) - scrape_huc(huc10, rme_gpkg, project.id, output_gpkg) + if not os.path.isfile(output_gpkg): + # Get the metric columns from the RME GeoPackage + with sqlite3.connect(rme_gpkg) as rme_conn: + rme_curs = rme_conn.cursor() + rme_curs.execute('SELECT metric_id, field_name, data_type FROM metrics') + for row in rme_curs.fetchall(): + metric_col_types[row[1]] = row[2] + metric_ids[row[0]] = row[1] + + # Create the output GeoPackage with the IGOs feature class + create_gpkg(output_gpkg, metric_col_types) + + if target_ds is None: + target_ds = gpkg_driver.Open(output_gpkg, 1) # 1 means read/write mode + target_layer = target_ds.GetLayer('igos') + + scrape_huc(huc10, rme_gpkg, metric_ids, gpkg_driver, target_layer) + + target_ds.ExecuteSQL(f"INSERT INTO hucs (huc, rme_project_id) VALUES ('{huc10}', '{project.id}')") except Exception as e: log.error(f'Error scraping HUC {huc10}: {e}') @@ -60,6 +87,9 @@ def scrape_rme(rs_api: RiverscapesAPI, search_params: RiverscapesSearchParams, d except Exception as e: log.error(f'Error deleting download directory {huc_dir}: {e}') + target_layer = None + target_ds = None + def download_file(rs_api: RiverscapesAPI, project_id: str, download_dir: str, regex: str) -> str: ''' @@ -112,273 +142,117 @@ def continue_with_huc(huc10: str, output_gpkg: str) -> bool: return True curs.execute('SELECT huc FROM hucs WHERE huc = ? LIMIT 1', [huc10]) - return curs.fetchone() is None + if curs.fetchone() is None: + return True + else: + log = Logger('Scrape RME') + log.info(f'HUC {huc10} already scraped. Skipping...') return False -def scrape_huc(huc10: str, rme_gpkg: str, rme_guid: str, output_gpkg: str) -> None: +def create_gpkg(output_gpkg: str, metric_cols: Dict[str, str]) -> None: + ''' + Creates the output GeoPackage with the IGOs feature class + uses DGO metrics as fields + ''' - log = Logger('Scrape HUC') + driver = ogr.GetDriverByName("GPKG") + data_source = driver.CreateDataSource(output_gpkg) + spatial_ref = osr.SpatialReference() + spatial_ref.ImportFromEPSG(4326) + layer = data_source.CreateLayer("igos", spatial_ref, ogr.wkbPoint) - create_side_tables = not os.path.isfile(output_gpkg) + field_huc = ogr.FieldDefn("huc", ogr.OFTString) + field_huc.SetWidth(10) + layer.CreateField(field_huc) - # Perform OG2OGR to append the IGO features to the output GeoPackage - # First time through, this will create the output GeoPackage - cmd = f'ogr2ogr -f GPKG -makevalid -append "{output_gpkg}" "{rme_gpkg}" igos' - log.debug(f'EXECUTING: {cmd}') - subprocess.call([cmd], shell=True, cwd=os.path.dirname(output_gpkg)) + field_level_path = ogr.FieldDefn("level_path", ogr.OFTString) + field_level_path.SetWidth(50) + layer.CreateField(field_level_path) - driver = ogr.GetDriverByName('GPKG') - data_source = driver.Open(output_gpkg, update=1) # update=0 for read-only mode - layer = data_source.GetLayerByName('igos') + field_seg_distance = ogr.FieldDefn("seg_distance", ogr.OFTReal) + layer.CreateField(field_seg_distance) - if create_side_tables is True: - # Add the HUC column to the IGOs feature class - field_name = 'huc' - field_definition = ogr.FieldDefn(field_name, ogr.OFTString) - if layer.CreateField(field_definition) == 0: - log.info(f"Field '{field_name}' added successfully to the feature class igos.") - else: - log.error(f"Failed to add field '{field_name}'.") - raise Exception(f"Failed to add field '{field_name}'.") + for field_name, field_type in metric_cols.items(): + oft_type = ogr.OFTString + if field_type.lower() == 'integer': + oft_type = ogr.OFTInteger + elif field_type.lower() == 'real': + oft_type = ogr.OFTReal - # Create the non-spatial tables now the output GeoPackage exists - create_output_tables(output_gpkg) + field = ogr.FieldDefn(field_name, oft_type) + layer.CreateField(field) - # The HUC code is not on the RME igos feature class. We need to update it. - # Store the HUC code with the IGOs that were just inserted. - # Its a feature class so we need to use OGR to do this! - data_source.ExecuteSQL(f'UPDATE igos SET huc = {huc10} WHERE huc IS NULL') layer = None data_source = None - # now copy the remaining data from the source database to the output database - with sqlite3.connect(rme_gpkg) as in_conn: - in_cursor = in_conn.cursor() - - with sqlite3.connect(output_gpkg) as out_conn: - out_cursor = out_conn.cursor() - - # DGOs are done manually because we don't need the geometry - process_rme_dgos(in_cursor, out_cursor, huc10) - # process_rcat_dgos(rcat_pgkg, out_cursor, huc) - - for prefix in ['dgo']: - process_rme_metric_values(in_cursor, out_cursor, prefix, huc10) - - out_cursor.execute('INSERT INTO hucs (huc, rme_project_id) VALUES (?, ?)', [huc10, rme_guid]) - out_conn.commit() - - -def process_rme_dgos(in_cursor, out_cursor, huc: str) -> None: - """ - Use SQL to copy the DGOs from the RME to the output GeoPackage with their geometries - """ - - in_cursor.execute(''' - SELECT - ? huc, - level_path, - seg_distance, - FCode, - low_lying_floodplain_area, - low_lying_floodplain_prop, - active_channel_area, - active_channel_prop, - elevated_floodplain_area, - elevated_floodplain_prop, - floodplain_area, - floodplain_prop, - centerline_length, - segment_area, - integrated_width - FROM dgos - WHERE level_path IS NOT NULL - AND seg_distance IS NOT NULL - ''', [huc]) - - out_cursor.executemany(''' - INSERT INTO dgos ( - huc, - level_path, - seg_distance, - FCode, - low_lying_floodplain_area, - low_lying_floodplain_prop, - active_channel_area, - active_channel_prop, - elevated_floodplain_area, - elevated_floodplain_prop, - floodplain_area, - floodplain_prop, - centerline_length, - segment_area, - integrated_width) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ''', in_cursor.fetchall()) - - -# def process_rcat_dgos(rcat_gpkg: str, out_cursor, huc: str) -> None: - -# with sqlite3.connect(rcat_gpkg) as conn: -# in_cursor = conn.cursor() -# in_cursor.execute(''' -# SELECT -# ? huc, -# level_path, -# seg_distance, -# FCode, -# centerline_length, -# segment_area, -# LUI, -# FloodplainAccess, -# FromConifer, -# FromDevegetated, -# FromGrassShrubland, -# NoChange, -# GrassShrubland, -# Devegetation, -# Conifer, -# Invasive, -# Development, -# Agriculture, -# NonRiparian, -# ExistingRiparianMean, -# HistoricRiparianMean, -# RiparianDeparture, -# Condition -# FROM DGOAttributes -# WHERE level_path IS NOT NULL -# AND seg_distance IS NOT NULL -# ''', [huc]) - -# out_cursor.executemany(''' -# INSERT INTO rcat_dgos ( -# huc, -# level_path, -# seg_distance, -# FCode, -# centerline_length, -# segment_area, -# LUI, -# FloodplainAccess, -# FromConifer, -# FromDevegetated, -# FromGrassShrubland, -# NoChange, -# GrassShrubland, -# Devegetation, -# Conifer, -# Invasive, -# Development, -# Agriculture, -# NonRiparian, -# ExistingRiparianMean, -# HistoricRiparianMean, -# RiparianDeparture, -# Condition) -# VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) -# ''', in_cursor.fetchall()) - - -def process_rme_metric_values(in_cursor, out_cursor, prefix: str, huc: str) -> None: - """ - Use SQL to copy the metric values from the RME to the output GeoPackage - """ - - in_cursor.execute(f''' - SELECT ? huc, level_path, seg_distance, metric_id, metric_value - FROM {prefix}_metric_values v INNER JOIN {prefix}s i on v.{prefix}_id = i.fid - ''', [huc]) - - out_cursor.executemany(f''' - INSERT INTO {prefix}_metric_values (huc, level_path, seg_distance, metric_id, metric_value) - VALUES (?, ?, ?, ?, ?) - ''', in_cursor.fetchall()) - - -def get_table_columns(cursor, table_name): - """Retrieve column names for a given table.""" + # Create the hucs table to keep track of progress + with sqlite3.connect(output_gpkg) as conn: + curs = conn.cursor() + curs.execute(''' + CREATE TABLE hucs ( + huc TEXT PRIMARY KEY NOT NULL, + rme_project_id TEXT, + scraped_on DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + ''') - cursor.execute(f"PRAGMA table_info({table_name})") - columns = cursor.fetchall() - return [col[1] for col in columns] # Column names are in the second field of each row + print(f'GeoPackage created with the "igos" point layer: {output_gpkg}') -def create_output_tables(outputs_gpkg: str) -> None: +def scrape_huc(huc10: str, rme_gpkg: str, metric_ids: Dict[int, str], driver: ogr.Driver, target_layer: ogr.Layer) -> None: ''' - Create the schema of the output RME scrape GeoPackage and load lookup data. - Assumes that the GeoPackage has already been created. + Perform the actual scrape on a single HUC + Creates new IGO features with DGO metrics as fields ''' - schema_dir = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'packages', 'rme', 'rme', 'database') - if not os.path.isdir(schema_dir): - raise FileNotFoundError(f'Could not find database directory {schema_dir}') - - # GeoPackage should already be created when creating feature classes with OGR - if not os.path.isfile(outputs_gpkg): - raise FileNotFoundError(f'Could not find output GeoPackage {outputs_gpkg}') - - # Create non-spatial tables - with sqlite3.connect(outputs_gpkg) as conn: - cursor = conn.cursor() - with open(os.path.join(schema_dir, 'rme_scrape_schema.sql'), encoding='utf-8') as sqlfile: - sql_commands = sqlfile.read() - cursor.executescript(sql_commands) - conn.commit() - - # Load Measurements and Metrics data table records - load_lookup_data(outputs_gpkg, os.path.join(schema_dir, 'data_metrics')) - + with sqlite3.connect(rme_gpkg) as rme_conn: + rme_curs = rme_conn.cursor() -def load_lookup_data(db_path, csv_dir): - """Load the database lookup data from CSV files. - This gets called both during database creation during BRAT build, - but also during refresh of lookup data at the start of BRAT Run so that - the database has the latest hydrologic equations and other BRAT parameters + source_ds = driver.Open(rme_gpkg, 0) + source_layer = source_ds.GetLayer('igos') - Args: - db_path (str): Full path to SQLite database - csv_dir (str): Full path to the root folder containing CSV lookup files - """ + # Get the feature definition from the target layer + target_layer_defn = target_layer.GetLayerDefn() - conn = sqlite3.connect(db_path) - conn.row_factory = dict_factory - curs = conn.cursor() + # Loop over features in the source layer + for source_feature in source_layer: - log = Logger('Database') + level_path = source_feature.GetField('level_path') + seg_distance = source_feature.GetField('seg_distance') - if not os.path.isdir(csv_dir): - raise Exception(f'csv_dir path was not a valid directory: {csv_dir}') + # Create a new feature for the target layer using the target layer's definition + target_feature = ogr.Feature(target_layer_defn) - # Load lookup table data into the database - dir_search = os.path.join(csv_dir, '**', '*.csv') - for file_name in glob.glob(dir_search, recursive=True): - table_name = os.path.splitext(os.path.basename(file_name))[0] - with open(file_name, mode='r', encoding='utf8') as csvfile: - d = csv.DictReader(csvfile) - sql = 'INSERT OR REPLACE INTO {0} ({1}) VALUES ({2})'.format(table_name, ','.join(d.fieldnames), ','.join('?' * len(d.fieldnames))) + # Copy the geometry from the source feature to the target feature + geom = source_feature.GetGeometryRef() + target_feature.SetGeometry(geom.Clone()) - to_db = [[i[col] for col in d.fieldnames] for i in d] - curs.executemany(sql, to_db) - log.info('{:,} records loaded into {} lookup data table'.format(curs.rowcount, table_name)) + target_feature.SetField('huc', huc10) + target_feature.SetField('level_path', level_path) + target_feature.SetField('seg_distance', seg_distance) - conn.commit() + # Retrieve the DGO metric values and store them on the IGO feature + rme_curs.execute(''' + SELECT dmv.metric_id, dmv.metric_value + FROM dgos d INNER JOIN dgo_metric_values dmv ON d.fid = dmv.dgo_id + WHERE (dmv.metric_value IS NOT NULL) + AND (d.level_path = ?) + AND (d.seg_distance = ?)''', [level_path, seg_distance]) + for row in rme_curs.fetchall(): + target_feature.SetField(metric_ids[row[0]], row[1]) -def dict_factory(cursor, row): - """Convert the database row into a dictionary.""" - d = {} - for idx, col in enumerate(cursor.description): - d[col[0]] = row[idx] - return d + # Add the feature to the target layer + target_layer.CreateFeature(target_feature) + target_feature = None def main(): - """ + ''' Scrape RME projects. Combine IGOs with their geometries. Include DGO metrics only. - """ + ''' parser = argparse.ArgumentParser() parser.add_argument('stage', help='Environment: staging or production', type=str) @@ -403,6 +277,7 @@ def main(): 'projectTypeId': 'rs_metric_engine', }) + # Optional HUC filter if args.huc_filter != '' and args.huc_filter != '.': search_params.meta = { "HUC": args.huc_filter