Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for v2.9.2, v2.10.1, and v2.10.3 and paging support for large metadata #38

Merged
merged 21 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ body:
- 2.6.3
- 2.7.2
- 2.8.1
- 2.9.2
- 2.10.1
- 2.10.3
- type: textarea
attributes:
label: Current Behavior
Expand Down
3 changes: 3 additions & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ body:
- 2.6.3
- 2.7.2
- 2.8.1
- 2.9.2
- 2.10.1
- 2.10.3
- type: textarea
attributes:
label: Is your feature request related to a problem? Please describe.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
run: python3 -m build

- name: Store the distribution packages
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: python-package-distributions
path: dist/
Expand Down
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [2.1.0] - 2025-02-13
### Added
- Support for MWAA v2.9.2, due to gracious contribution of [holly-evans](https://github.com/holly-evans) 🙏 🚀
- Support for MWAA v2.10.1 and v2.10.3, due to gracious contributions of [pyrr](https://github.com/pyrr) and [holly-evans](https://github.com/holly-evans) 🙏 🚀
- Performance enhancement for large metadata export to S3 using [server-side cursor](https://docs.sqlalchemy.org/en/20/orm/queryguide/api.html#orm-queryguide-yield-per)
- Performance enhancement for large metadata import using batched copy into meta-database, thanks to the gracious contribution of [mathiasflorin](https://github.com/mathiasflorin) 🙏 🚀

### Changed
- v3 to v4 for upload and download artifacts github actions

## [2.0.1] - 2024-08-21
### Changed
- Updates to the `README` and `PYPIDOC`
Expand Down Expand Up @@ -120,7 +130,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Initial commit with sample readme, code of conduct, and license


[unreleased]: https://github.com/aws-samples/mwaa-disaster-recovery/compare/v2.0.1...HEAD
[unreleased]: https://github.com/aws-samples/mwaa-disaster-recovery/compare/v2.1.0...HEAD
[2.1.0]: https://github.com/aws-samples/mwaa-disaster-recovery/compare/v2.0.1...v2.1.0
[2.0.1]: https://github.com/aws-samples/mwaa-disaster-recovery/compare/v2.0.0...v2.0.1
[2.0.0]: https://github.com/aws-samples/mwaa-disaster-recovery/compare/v1.0.1...v2.0.0
[1.0.1]: https://github.com/aws-samples/mwaa-disaster-recovery/compare/v1.0.0...v1.0.1
Expand Down
16 changes: 8 additions & 8 deletions PYPIDOC.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# MWAA Disaster Recovery

![MWAA](https://img.shields.io/badge/MWAA-2.8.1_|_2.7.2_|_2.6.3_|_2.5.1_|_2.4.3-blue)
![MWAA](https://img.shields.io/badge/MWAA-2.10.3_|_2.10.1_|_2.9.2_|_2.8.1_|_2.7.2_|_2.6.3_|_2.5.1_|_2.4.3-blue)
![Python](https://img.shields.io/badge/Python-3.7+-blue)
[![Black](https://img.shields.io/badge/Code%20Style-Black-000000.svg)](https://github.com/psf/black)
[![CodeCoverage](https://raw.githubusercontent.com/aws-samples/mwaa-disaster-recovery/python-coverage-comment-action-data/badge.svg)](https://htmlpreview.github.io/?https://github.com/aws-samples/mwaa-disaster-recovery/blob/python-coverage-comment-action-data/htmlcov/index.html)
Expand All @@ -18,15 +18,15 @@ Let's look at creating a metadata backup and restore dags, respectively, as foll

### Metadata Backup DAG

Let's assume your environment version is `2.8.1`. You can create a metadata backup dag by creating a python file in your MWAA `dags` folder as follows:
Let's assume your environment version is `2.10.3`. You can create a metadata backup dag by creating a python file in your MWAA `dags` folder as follows:

**backup_metadata.py**:
```python
# Importing DAG is necessary for DAG detection
from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8
from mwaa_dr.v_2_10.dr_factory import DRFactory_2_10

factory = DRFactory_2_8(
factory = DRFactory_2_10(
dag_id='backup',
path_prefix='data',
storage_type='S3'
Expand Down Expand Up @@ -56,9 +56,9 @@ You can create a metadata restore dag by creating a python file in your MWAA `da
**restore_metadata.py**:
```python
from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8
from mwaa_dr.v_2_10.dr_factory import DRFactory_2_10

factory = DRFactory_2_8(
factory = DRFactory_2_10(
dag_id='restore',
path_prefix='data',
storage_type='S3'
Expand Down Expand Up @@ -90,9 +90,9 @@ You can create a metadata cleanup dag by creating a python file in your MWAA `da
**cleanup_metadata.py**:
```python
from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8
from mwaa_dr.v_2_10.dr_factory import DRFactory_2_10

factory = DRFactory_2_8(
factory = DRFactory_2_10(
dag_id='cleanup',
path_prefix='data',
storage_type='S3'
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<!-- TOC ignore:true -->
# MWAA Disaster Recovery

![MWAA](https://img.shields.io/badge/MWAA-2.8.1_|_2.7.2_|_2.6.3_|_2.5.1_|_2.4.3-blue)
![MWAA](https://img.shields.io/badge/MWAA-2.10.3_|_2.10.1_|_2.9.2_|_2.8.1_|_2.7.2_|_2.6.3_|_2.5.1_|_2.4.3-blue)
![CDK](https://img.shields.io/badge/CDK-Python-orange)
![Python](https://img.shields.io/badge/Python-3.7+-blue)
[![Black](https://img.shields.io/badge/Code%20Style-Black-000000.svg)](https://github.com/psf/black)
Expand Down Expand Up @@ -77,6 +77,9 @@ This solution is a part of an AWS blog series on MWAA Disaster Recovery. Please

> [!NOTE]
> The project currently supports the following versions of MWAA:
> - **2.10.3**
> - **2.10.1**
> - **2.9.2**
> - **2.8.1**
> - **2.7.2**
> - **2.6.3**
Expand Down Expand Up @@ -324,7 +327,7 @@ Here are the required parameters that applies to both primary and secondary regi
| `AWS_ACCOUNT_ID` | `111222333444` | Your AWS account id. |
| `DR_TYPE` | `BACKUP_RESTORE`, `WARM_STANDBY` | The disaster recovery strategy to be deployed. |
| `MWAA_UPDATE_EXECUTION_ROLE` | `YES` or `NO` | Flag to denote whether to update the existing MWAA execution role with new policies for allowing task token return calls from the StepFunctions workflow in the secondary stack. See the [Automated Updates to the Execution Role](#automated-updates-to-the-execution-role) for details. |
| `MWAA_VERSION` | `2.4.3`, `2.5.1`, `2.6.3`, `2.7.2`, `2.8.1` | The deployed version of MWAA. |
| `MWAA_VERSION` | `2.4.3`, `2.5.1`, `2.6.3`, `2.7.2`, `2.8.1`, `2.9.2`, `2.10.1`, `2.10.3` | The deployed version of MWAA. |
| `PRIMARY_DAGS_BUCKET_NAME` | `mwaa-2-5-1-primary-bucket` | The name of the DAGs S3 bucket used by the environment in the primary region. |
| `PRIMARY_MWAA_ENVIRONMENT_NAME` | `mwaa-2-5-1-primary` | The name of the MWAA environment in the primary region. |
| `PRIMARY_MWAA_ROLE_ARN` | `arn:aws:...:role/service-role/primary-role` | The ARN of the execution role used by the primary MWAA environment. |
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.1
2.1.0
10 changes: 10 additions & 0 deletions assets/dags/mwaa_dr/backup_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@

factory = DRFactory_2_8(**kwargs)

elif airflow_version.startswith("2.9"):
from mwaa_dr.v_2_9.dr_factory import DRFactory_2_9

factory = DRFactory_2_9(**kwargs)

elif airflow_version.startswith("2.10"):
from mwaa_dr.v_2_10.dr_factory import DRFactory_2_10

factory = DRFactory_2_10(**kwargs)

else:
from mwaa_dr.framework.factory.default_dag_factory import DefaultDagFactory

Expand Down
10 changes: 10 additions & 0 deletions assets/dags/mwaa_dr/cleanup_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@

factory = DRFactory_2_8(**kwargs)

elif airflow_version.startswith("2.9"):
from mwaa_dr.v_2_9.dr_factory import DRFactory_2_9

factory = DRFactory_2_9(**kwargs)

elif airflow_version.startswith("2.10"):
from mwaa_dr.v_2_10.dr_factory import DRFactory_2_10

factory = DRFactory_2_10(**kwargs)

else:
from mwaa_dr.framework.factory.default_dag_factory import DefaultDagFactory

Expand Down
22 changes: 17 additions & 5 deletions assets/dags/mwaa_dr/framework/model/base_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from io import StringIO
from copy import deepcopy
from typing import Optional
from sqlalchemy import text

from airflow import settings
from mwaa_dr.framework.model.dependency_model import DependencyModel
import itertools

S3 = "S3"

Expand Down Expand Up @@ -72,7 +74,7 @@ def __init__(
export_filter: str = None,
storage_type: str = None,
path_prefix: str = None,
batch_size=5000,
batch_size=1000,
):
self.name = name
self.model = model
Expand Down Expand Up @@ -324,7 +326,8 @@ def backup(self, **context):

try:
with settings.Session() as session:
result = session.execute(sql)
stmt = text(sql).execution_options(yield_per=self.batch_size)
result = session.execute(stmt)
chunk = result.fetchmany(self.batch_size)
while chunk:
buffer = StringIO("")
Expand All @@ -347,19 +350,28 @@ def restore(self, **context):
"""
backup_file = self.read(context)

restore_sql = ""
if self.columns:
restore_sql = f"COPY {self.name} ({', '.join(self.columns)}) FROM STDIN WITH (FORMAT CSV, HEADER FALSE, DELIMITER '|')"
else:
restore_sql = f"COPY {self.name} FROM STDIN WITH (FORMAT CSV, HEADER FALSE, DELIMITER '|')"
print(f"Restore SQL: {restore_sql}")

conn = settings.engine.raw_connection()
cursor = None
try:
cursor = conn.cursor()
cursor.copy_expert(restore_sql, backup_file)
conn.commit()
insert_counter = 0
while True:
batch = list(itertools.islice(backup_file, self.batch_size))
if not batch:
break
cursor.copy_expert(restore_sql, StringIO("".join(batch)))
conn.commit()
insert_counter += len(batch)
print(f"Inserted {insert_counter} records")
finally:
if cursor:
cursor.close()
conn.close()
backup_file.close()

Expand Down
10 changes: 10 additions & 0 deletions assets/dags/mwaa_dr/restore_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@

factory = DRFactory_2_8(**kwargs)

elif airflow_version.startswith("2.9"):
from mwaa_dr.v_2_9.dr_factory import DRFactory_2_9

factory = DRFactory_2_9(**kwargs)

elif airflow_version.startswith("2.10"):
from mwaa_dr.v_2_10.dr_factory import DRFactory_2_10

factory = DRFactory_2_10(**kwargs)

else:
from mwaa_dr.framework.factory.default_dag_factory import DefaultDagFactory

Expand Down
Empty file.
125 changes: 125 additions & 0 deletions assets/dags/mwaa_dr/v_2_10/dr_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""

from mwaa_dr.framework.model.base_table import BaseTable
from mwaa_dr.framework.model.dependency_model import DependencyModel
from mwaa_dr.v_2_9.dr_factory import DRFactory_2_9


class DRFactory_2_10(DRFactory_2_9):
"""
Factory class for creating database models for Apache Airflow 2.10.1.

This class inherits from DRFactory_2_9 and extends it to support the new
features and schema changes introduced in Apache Airflow 2.10.1/2.10.3:
https://airflow.apache.org/docs/apache-airflow/2.10.1/database-erd-ref.html

Args:
dag_id (str): The ID of the DAG.
path_prefix (str, optional): The prefix for the backup/restore path. Defaults to "data".
storage_type (str, optional): The type of storage used for backup/restore. Defaults to S3.
batch_size (int, optional): The batch size for backup/restore operations. Defaults to 5000.
"""

def task_instance(self, model: DependencyModel[BaseTable]) -> BaseTable:
"""
Create a BaseTable model for the task_instance table in Apache Airflow 2.10.1.
In particular, adds the `executor` field to the 2.10.1 dag_run table.

Args:
model (DependencyModel[BaseTable]): The dependency model for the table.

Returns:
BaseTable: An instance of the BaseTable representing the 'task_instance' table.
"""

return BaseTable(
name="task_instance",
model=model,
columns=[
"dag_id",
"map_index",
"run_id",
"task_id",
"custom_operator_name",
"duration",
"end_date",
"executor", # New Field
"executor_config",
"external_executor_id",
"hostname",
"job_id",
"max_tries",
"next_kwargs",
"next_method",
"operator",
"pid",
"pool",
"pool_slots",
"priority_weight",
"queue",
"queued_by_job_id",
"queued_dttm",
"rendered_map_index",
"start_date",
"state",
"task_display_name",
"trigger_id",
"trigger_timeout",
"try_number",
"unixname",
"updated_at",
],
export_mappings={
"executor_config": "'\\x' || encode(executor_config,'hex') as executor_config"
},
export_filter="state NOT IN ('running','restarting','queued','scheduled', 'up_for_retry','up_for_reschedule')",
storage_type=self.storage_type,
path_prefix=self.path_prefix,
batch_size=self.batch_size,
)

def log(self, model: DependencyModel[BaseTable]) -> BaseTable:
"""
Create a BaseTable model for the log table in Apache Airflow 2.10.1.
In particular, adds the `try_number` field to the 2.9.2 log table.
Args:
model (DependencyModel[BaseTable]): The dependency model for the log table.

Returns:
BaseTable: The BaseTable model for the log table.
"""
return BaseTable(
name="log",
model=model,
columns=[
"dag_id",
"dttm",
"event",
"execution_date",
"extra",
"map_index",
"owner",
"owner_display_name",
"run_id",
"task_id",
"try_number", # New Field
],
storage_type=self.storage_type,
path_prefix=self.path_prefix,
batch_size=self.batch_size,
)
Loading
Loading