Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch import metadata #35

Closed
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [2.0.2] - 2024-08-21
### Changed
- Use batch import

## [2.0.1] - 2024-08-21
### Changed
- Updates to the `README` and `PYPIDOC`
Expand Down
20 changes: 18 additions & 2 deletions assets/dags/mwaa_dr/framework/model/base_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,27 @@ def restore(self, **context):
print(f"Restore SQL: {restore_sql}")

conn = settings.engine.raw_connection()
cursor = None
try:
cursor = conn.cursor()
cursor.copy_expert(restore_sql, backup_file)
conn.commit()
insert_counter = 0
with backup_file as file:
batch = []
for line in file:
mathiasflorin marked this conversation as resolved.
Show resolved Hide resolved
batch.append(line)
if len(batch) == self.batch_size:
cursor.copy_expert(restore_sql, StringIO("".join(batch)))
conn.commit()
insert_counter += self.batch_size
batch = []
if batch:
insert_counter += len(batch)
cursor.copy_expert(restore_sql, StringIO("".join(batch)))
conn.commit()
print(f"Inserted {insert_counter} records")
finally:
if cursor:
cursor.close()
conn.close()
backup_file.close()

Expand Down
5 changes: 4 additions & 1 deletion assets/dags/mwaa_dr/v_2_4/dr_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def dag_run(self, model: DependencyModel[BaseTable]) -> BaseTable:
"start_date",
"state",
],
export_filter="dag_id != 'backup_metadata'",
mathiasflorin marked this conversation as resolved.
Show resolved Hide resolved
export_mappings={"conf": "'\\x' || encode(conf,'hex') as conf"},
storage_type=self.storage_type,
path_prefix=self.path_prefix,
Expand Down Expand Up @@ -234,7 +235,7 @@ def task_instance(self, model: DependencyModel[BaseTable]) -> BaseTable:
export_mappings={
"executor_config": "'\\x' || encode(executor_config,'hex') as executor_config"
},
export_filter="state NOT IN ('running','restarting','queued','scheduled', 'up_for_retry','up_for_reschedule')",
export_filter="state NOT IN ('running','restarting','queued','scheduled', 'up_for_retry','up_for_reschedule') AND dag_id != 'backup_metadata'",
storage_type=self.storage_type,
path_prefix=self.path_prefix,
batch_size=self.batch_size,
Expand Down Expand Up @@ -311,6 +312,7 @@ def task_fail(self, model: DependencyModel[BaseTable]) -> BaseTable:
"start_date",
"task_id",
],
export_filter="dag_id != 'backup_metadata'",
storage_type=self.storage_type,
path_prefix=self.path_prefix,
batch_size=self.batch_size,
Expand Down Expand Up @@ -369,6 +371,7 @@ def job(self, model: DependencyModel[BaseTable]) -> BaseTable:
"state",
"unixname",
],
export_filter="dag_id != 'backup_metadata'",
storage_type=self.storage_type,
path_prefix=self.path_prefix,
batch_size=self.batch_size,
Expand Down
Loading