diff --git a/materializationengine/workflows/complete_workflow.py b/materializationengine/workflows/complete_workflow.py index 847334ec..2083fb3f 100644 --- a/materializationengine/workflows/complete_workflow.py +++ b/materializationengine/workflows/complete_workflow.py @@ -17,6 +17,7 @@ format_materialization_database_workflow, rebuild_reference_tables, set_version_status, + clean_split_table_workflow, ) from materializationengine.workflows.ingest_new_annotations import ( ingest_new_annotations_workflow, @@ -99,8 +100,10 @@ def run_complete_workflow( check_tables.si(mat_info, new_version_number), ) else: + clean_split_tables = clean_split_table_workflow(mat_info=mat_info) analysis_database_workflow = chain( - check_tables.si(mat_info, new_version_number) + chord(clean_split_tables, fin.si()), + check_tables.si(mat_info, new_version_number), ) # combine all workflows into final workflow and run diff --git a/materializationengine/workflows/create_frozen_database.py b/materializationengine/workflows/create_frozen_database.py index b7a13e8e..2063dec6 100644 --- a/materializationengine/workflows/create_frozen_database.py +++ b/materializationengine/workflows/create_frozen_database.py @@ -183,6 +183,26 @@ def format_materialization_database_workflow(mat_info: List[dict]): return create_frozen_database_tasks +def clean_split_table_workflow(mat_info: List[dict]): + """Remove rows from all tables that are past the materialization timestamp + or deleted or are not valid. + + Workflow: + - Iterate through all tables and drop non-valid rows + + Args: + mat_info (dict): materialization metadata information + + Returns: + chain: chain of celery tasks + """ + clean_table_tasks = [] + for mat_metadata in mat_info: + clean_table_workflow = chain(clean_table.si(mat_metadata)) + clean_table_tasks.append(clean_table_workflow) + return clean_table_tasks + + @celery.task( name="workflow:rebuild_reference_tables", bind=True, @@ -399,7 +419,6 @@ def create_materialized_metadata( celery_logger.error(f"Materialized Metadata table creation failed {e}") try: for mat_metadata in mat_info: - # only create table if marked as valid in the metadata table annotation_table_name = mat_metadata["annotation_table_name"] schema_type = mat_metadata["schema"] @@ -717,6 +736,53 @@ def merge_tables(self, mat_metadata: dict): raise e +@celery.task( + name="workflow:clean_table", + bind=True, + acks_late=True, + autoretry_for=(Exception,), + max_retries=3, +) +def clean_table(self, mat_metadata: dict): + """Remove non-valid rows from a table. + + Args: + mat_metadata (dict): datastack info for the aligned_volume from the infoservice + + + Raises: + e: error during dropping rows + + Returns: + str: number of rows copied + """ + analysis_version = mat_metadata["analysis_version"] + datastack = mat_metadata["datastack"] + mat_time_stamp = mat_metadata["materialization_time_stamp"] + SQL_URI_CONFIG = get_config_param("SQLALCHEMY_DATABASE_URI") + analysis_sql_uri = create_analysis_sql_uri( + SQL_URI_CONFIG, datastack, analysis_version + ) + mat_session, mat_engine = create_session(analysis_sql_uri) + + AnnotationModel = create_annotation_model(mat_metadata, with_crud_columns=True) + non_valid_rows = mat_session.query(AnnotationModel).filter( + (AnnotationModel.created > mat_time_stamp) | (AnnotationModel.valid != True) + ) + + try: + num_rows_to_delete = non_valid_rows.delete(synchronize_session=False) + mat_session.commit() + + mat_session.close() + mat_engine.dispose() + return f"Number of rows deleted: {num_rows_to_delete}" + except Exception as e: + mat_session.rollback() + celery_logger.error(e) + raise e + + def insert_chunked_data( annotation_table_name: str, sql_statement: str, diff --git a/migrate.py b/migrate.py index 14bbfb2d..5097b2bf 100644 --- a/migrate.py +++ b/migrate.py @@ -68,5 +68,28 @@ def migrate_annotation_schemas(sql_url: str, aligned_volume: str, dry_run: bool click.echo(migrations) +@migrator.command(help="Alter constraint on DELETE") +@click.option( + "--sql_url", + prompt=True, + default=lambda: application.config["SQLALCHEMY_DATABASE_URI"], + show_default="SQL URL from config", +) +@click.option( + "-a", + "--aligned_volume", + prompt="Target Aligned Volume", + help="Aligned Volume database to migrate", + type=click.Choice(get_allowed_aligned_volumes()), +) +@click.option( + "--dry_run", prompt="Dry Run", help="Test migration before running", default=True +) +def migrate_foreign_key_constraints(sql_url: str, aligned_volume: str, dry_run: bool = True): + migrator = DynamicMigration(sql_url, aligned_volume) + fkey_constraint_mapping = migrator.apply_cascade_option_to_tables(dry_run=dry_run) + click.echo(fkey_constraint_mapping) + + if __name__ == "__main__": migrator() diff --git a/requirements.in b/requirements.in index b0035bfd..c542df50 100644 --- a/requirements.in +++ b/requirements.in @@ -26,7 +26,7 @@ gcsfs>=0.8.0 pyarrow==3.0.0 flask_cors numpy>=1.20 -emannotationschemas>=5.4.0 +emannotationschemas>=5.8.0 +dynamicannotationdb>=5.7.2 +nglui>=2.10.1 Flask-Limiter[redis] -dynamicannotationdb>=5.7.1 -nglui>=2.10.1 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 0f34e199..28fc9a2a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -119,9 +119,9 @@ dill==0.3.4 # pathos dracopy==1.0.1 # via cloud-volume -dynamicannotationdb==5.7.1 +dynamicannotationdb==5.7.2 # via -r requirements.in -emannotationschemas==5.7.1 +emannotationschemas==5.8.0 # via # -r requirements.in # dynamicannotationdb