Skip to content

Commit

Permalink
Filter split tables (#107)
Browse files Browse the repository at this point in the history
* feat: add migration to support DELETE CASCADE

* feat: add clean split table workflow

* bumping dadb reqs

* bumipng ema reqs

* fix: change method name

* trying expermental DADB

* trying alternative docker git config

* trying different git option

* trying different DADB

* bumping dadb version

---------

Co-authored-by: Forrest Collman <[email protected]>
  • Loading branch information
dlbrittain and fcollman authored Apr 23, 2023
1 parent 309711b commit 9eb28f3
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 7 deletions.
5 changes: 4 additions & 1 deletion materializationengine/workflows/complete_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
format_materialization_database_workflow,
rebuild_reference_tables,
set_version_status,
clean_split_table_workflow,
)
from materializationengine.workflows.ingest_new_annotations import (
ingest_new_annotations_workflow,
Expand Down Expand Up @@ -99,8 +100,10 @@ def run_complete_workflow(
check_tables.si(mat_info, new_version_number),
)
else:
clean_split_tables = clean_split_table_workflow(mat_info=mat_info)
analysis_database_workflow = chain(
check_tables.si(mat_info, new_version_number)
chord(clean_split_tables, fin.si()),
check_tables.si(mat_info, new_version_number),
)

# combine all workflows into final workflow and run
Expand Down
68 changes: 67 additions & 1 deletion materializationengine/workflows/create_frozen_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,26 @@ def format_materialization_database_workflow(mat_info: List[dict]):
return create_frozen_database_tasks


def clean_split_table_workflow(mat_info: List[dict]):
"""Remove rows from all tables that are past the materialization timestamp
or deleted or are not valid.
Workflow:
- Iterate through all tables and drop non-valid rows
Args:
mat_info (dict): materialization metadata information
Returns:
chain: chain of celery tasks
"""
clean_table_tasks = []
for mat_metadata in mat_info:
clean_table_workflow = chain(clean_table.si(mat_metadata))
clean_table_tasks.append(clean_table_workflow)
return clean_table_tasks


@celery.task(
name="workflow:rebuild_reference_tables",
bind=True,
Expand Down Expand Up @@ -399,7 +419,6 @@ def create_materialized_metadata(
celery_logger.error(f"Materialized Metadata table creation failed {e}")
try:
for mat_metadata in mat_info:

# only create table if marked as valid in the metadata table
annotation_table_name = mat_metadata["annotation_table_name"]
schema_type = mat_metadata["schema"]
Expand Down Expand Up @@ -717,6 +736,53 @@ def merge_tables(self, mat_metadata: dict):
raise e


@celery.task(
name="workflow:clean_table",
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=3,
)
def clean_table(self, mat_metadata: dict):
"""Remove non-valid rows from a table.
Args:
mat_metadata (dict): datastack info for the aligned_volume from the infoservice
Raises:
e: error during dropping rows
Returns:
str: number of rows copied
"""
analysis_version = mat_metadata["analysis_version"]
datastack = mat_metadata["datastack"]
mat_time_stamp = mat_metadata["materialization_time_stamp"]
SQL_URI_CONFIG = get_config_param("SQLALCHEMY_DATABASE_URI")
analysis_sql_uri = create_analysis_sql_uri(
SQL_URI_CONFIG, datastack, analysis_version
)
mat_session, mat_engine = create_session(analysis_sql_uri)

AnnotationModel = create_annotation_model(mat_metadata, with_crud_columns=True)
non_valid_rows = mat_session.query(AnnotationModel).filter(
(AnnotationModel.created > mat_time_stamp) | (AnnotationModel.valid != True)
)

try:
num_rows_to_delete = non_valid_rows.delete(synchronize_session=False)
mat_session.commit()

mat_session.close()
mat_engine.dispose()
return f"Number of rows deleted: {num_rows_to_delete}"
except Exception as e:
mat_session.rollback()
celery_logger.error(e)
raise e


def insert_chunked_data(
annotation_table_name: str,
sql_statement: str,
Expand Down
23 changes: 23 additions & 0 deletions migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,28 @@ def migrate_annotation_schemas(sql_url: str, aligned_volume: str, dry_run: bool
click.echo(migrations)


@migrator.command(help="Alter constraint on DELETE")
@click.option(
"--sql_url",
prompt=True,
default=lambda: application.config["SQLALCHEMY_DATABASE_URI"],
show_default="SQL URL from config",
)
@click.option(
"-a",
"--aligned_volume",
prompt="Target Aligned Volume",
help="Aligned Volume database to migrate",
type=click.Choice(get_allowed_aligned_volumes()),
)
@click.option(
"--dry_run", prompt="Dry Run", help="Test migration before running", default=True
)
def migrate_foreign_key_constraints(sql_url: str, aligned_volume: str, dry_run: bool = True):
migrator = DynamicMigration(sql_url, aligned_volume)
fkey_constraint_mapping = migrator.apply_cascade_option_to_tables(dry_run=dry_run)
click.echo(fkey_constraint_mapping)


if __name__ == "__main__":
migrator()
6 changes: 3 additions & 3 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ gcsfs>=0.8.0
pyarrow==3.0.0
flask_cors
numpy>=1.20
emannotationschemas>=5.4.0
emannotationschemas>=5.8.0
dynamicannotationdb>=5.7.2
nglui>=2.10.1
Flask-Limiter[redis]
dynamicannotationdb>=5.7.1
nglui>=2.10.1
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ dill==0.3.4
# pathos
dracopy==1.0.1
# via cloud-volume
dynamicannotationdb==5.7.1
dynamicannotationdb==5.7.2
# via -r requirements.in
emannotationschemas==5.7.1
emannotationschemas==5.8.0
# via
# -r requirements.in
# dynamicannotationdb
Expand Down

0 comments on commit 9eb28f3

Please sign in to comment.