Skip to content

Commit

Permalink
[ETL-616] Implement Great Expectations to run on parquet data (#139)
Browse files Browse the repository at this point in the history
* initial commit for testing

* update sample expectations

* add two data types

* correct to fitbitdailydata

* fix expectation

* add complete script

* initial cf config and template

* correct formatting, refactor triggers

* fix job name

* refactor gx code, add tests, adjust gx version

* refactor gx code, add tests, adjust gx version

* make consistent naming

* remove hardcoded args

* add integration tests, remove null rows code, add dep for urllib3<2

* change to lowercase data type

* add prod cf configs, add perm for glue role for shareable artifacts bucket

* rename, include prod ver

* add test to catch exception

* add conditional creation of triggers due to what is available in expectations json

* update README for tests, add in testing for our scripts

* chain cmd together

* update prod

* gather tests, correct key_prefix to key, add missing params to prod glue role

* remove slash

* add gx glue version as var in config
rxu17 authored Sep 13, 2024
1 parent 26e2ad8 commit 30d1873
Showing 17 changed files with 1,084 additions and 20 deletions.
35 changes: 29 additions & 6 deletions .github/workflows/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# recover github workflows

## Overview

Recover ETL has four github workflows:

- workflows/upload-and-deploy.yaml
@@ -14,35 +16,56 @@ Recover ETL has four github workflows:
| codeql-analysis | on-push from feature branch, feature branch merged into main |
| cleanup | feature branch deleted |


## upload-files

Copies pilot data sets from ingestion bucket to input data bucket for use in integration test. Note that this behavior assumes that there are files in the ingestion bucket. Could add updates to make this robust and throw an error if the ingestion bucket path is empty.

## upload-and-deploy

Here are some more detailed descriptions and troubleshooting tips for some jobs within each workflow:

### upload-files

Copies pilot data sets from ingestion bucket to input data bucket for use in integration test. Note that this behavior assumes that there are files in the ingestion bucket. Could add updates to make this robust and throw an error if the ingestion bucket path is empty.
### Current Testing Related Jobs

### nonglue-unit-tests
#### nonglue-unit-tests

See [testing](/tests/README.md) for more info on the background behind these tests. Here, both the `recover-dev-input-data` and `recover-dev-processed-data` buckets' synapse folders are tested for STS access every time something is pushed to the feature branch and when the feature branch is merged to main.

This is like an integration test and because it depends on connection to Synapse, sometimes the connection will be stalled, broken, etc. Usually this test will only take 1 min or less. Sometimes just re-running this job will do the trick.

### pytest-docker
#### pytest-docker

This sets up and uploads the two docker images to ECR repository.
**Note: A ECR repo called `pytest` would need to exist in the AWS account we are pushing docker images to prior to running this GH action.**

Some behavioral aspects to note - there were limitations with the matrix method in Github action jobs thus had to unmask account id to pass it as an output for `glue-unit-tests` to use. The matrix method at this time [see issue thread](https://github.com/orgs/community/discussions/17245) doesn't support dynamic job outputs and the workaround seemed more complex to implement, thus we weren't able to pass the path of the uploaded docker container directly and had to use a static output. This leads us to use `steps.login-ecr.outputs.registry` which contains account id directly so the output could be passed and the docker container could be found and used.

### glue-unit-tests
#### glue-unit-tests

See [testing](/tests/README.md) for more info on the background behind these tests.

For the JSON to Parquet tests sometimes there may be a scenario where a github workflow gets stopped early due to an issue/gets canceled.

With the current way when the `test_json_to_parquet.py` run, sometimes the glue table, glue crawler role and other resources may have been created already for the given branch (and didn’t get deleted because the test didn’t run all the way through) and will error out when the github workflow gets triggered again because it hits the `AlreadyExistsException`. This is currently resolved manually by deleting the resource(s) that has been created in the AWS account and re-running the github jobs that failed.

### Adding Test Commands to Github Workflow Jobs

After developing and running tests locally, you need to ensure the tests are run in the CI pipeline. To add your tests to under the `upload-and-deploy` job:

Add your test commands under the appropriate job (see above for summaries on the specific testing related jobs), for example:

```yaml
jobs:
build:
runs-on: ubuntu-latest
steps:
# Other steps...
- name: Run tests
run: |
pytest tests/
```
### sceptre-deploy-develop
### integration-test-develop-cleanup
28 changes: 19 additions & 9 deletions .github/workflows/upload-and-deploy.yaml
Original file line number Diff line number Diff line change
@@ -134,11 +134,14 @@ jobs:
pipenv install ecs_logging~=2.0
pipenv install pytest-datadir
- name: Test lambda scripts with pytest
- name: Test scripts with pytest (lambda, etc.)
run: |
pipenv run python -m pytest tests/test_s3_event_config_lambda.py -v
pipenv run python -m pytest tests/test_s3_to_glue_lambda.py -v
pipenv run python -m pytest -v tests/test_lambda_raw.py
pipenv run python -m pytest \
tests/test_s3_event_config_lambda.py \
tests/test_s3_to_glue_lambda.py \
tests/test_lambda_dispatch.py \
tests/test_consume_logs.py \
tests/test_lambda_raw.py -v
- name: Test dev synapse folders for STS access with pytest
run: >
@@ -249,18 +252,25 @@ jobs:
if: github.ref_name != 'main'
run: echo "NAMESPACE=$GITHUB_REF_NAME" >> $GITHUB_ENV

- name: Run Pytest unit tests under AWS 3.0
- name: Run Pytest unit tests under AWS Glue 3.0
if: matrix.tag_name == 'aws_glue_3'
run: |
su - glue_user --command "cd $GITHUB_WORKSPACE && python3 -m pytest tests/test_s3_to_json.py -v"
su - glue_user --command "cd $GITHUB_WORKSPACE && python3 -m pytest tests/test_compare_parquet_datasets.py -v"
su - glue_user --command "cd $GITHUB_WORKSPACE && python3 -m pytest \
tests/test_s3_to_json.py \
tests/test_compare_parquet_datasets.py -v"
- name: Run Pytest unit tests under AWS 4.0
- name: Run unit tests for JSON to Parquet under AWS Glue 4.0
if: matrix.tag_name == 'aws_glue_4'
run: >
su - glue_user --command "cd $GITHUB_WORKSPACE &&
python3 -m pytest tests/test_json_to_parquet.py --namespace $NAMESPACE -v"
- name: Run unit tests for Great Expectations on Parquet under AWS Glue 4.0
if: matrix.tag_name == 'aws_glue_4'
run: >
su - glue_user --command "cd $GITHUB_WORKSPACE &&
python3 -m pytest tests/test_run_great_expectations_on_parquet.py -v"
sceptre-deploy-develop:
name: Deploys branch using sceptre
runs-on: ubuntu-latest
@@ -287,7 +297,7 @@ jobs:
run: echo "NAMESPACE=$GITHUB_REF_NAME" >> $GITHUB_ENV

- name: "Deploy sceptre stacks to dev"
run: pipenv run sceptre --var "namespace=${{ env.NAMESPACE }}" launch develop --yes
run: pipenv run sceptre --debug --var "namespace=${{ env.NAMESPACE }}" launch develop --yes

- name: Delete preexisting S3 event notification for this namespace
uses: gagoar/invoke-aws-lambda@v3
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ template_key_prefix: "{{ var.namespace | default('main') }}/templates"
glue_python_shell_python_version: "3.9"
glue_python_shell_glue_version: "3.0"
json_to_parquet_glue_version: "4.0"
great_expectations_job_glue_version: "4.0"
default_stack_tags:
Department: DNT
Project: recover
1 change: 1 addition & 0 deletions config/develop/glue-job-role.yaml
Original file line number Diff line number Diff line change
@@ -6,5 +6,6 @@ parameters:
S3IntermediateBucketName: {{ stack_group_config.intermediate_bucket_name }}
S3ParquetBucketName: {{ stack_group_config.processed_data_bucket_name }}
S3ArtifactBucketName: {{ stack_group_config.template_bucket_name }}
S3ShareableArtifactBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }}
stack_tags:
{{ stack_group_config.default_stack_tags }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
template:
path: glue-job-run-great-expectations-on-parquet.j2
dependencies:
- develop/glue-job-role.yaml
stack_name: "{{ stack_group_config.namespace }}-glue-job-RunGreatExpectationsParquet"
parameters:
Namespace: {{ stack_group_config.namespace }}
JobDescription: Runs great expectations on a set of data
JobRole: !stack_output_external glue-job-role::RoleArn
TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }}
S3ScriptBucket: {{ stack_group_config.template_bucket_name }}
S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py'
GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}"
AdditionalPythonModules: "great_expectations~=0.18,urllib3<2"
stack_tags:
{{ stack_group_config.default_stack_tags }}
sceptre_user_data:
dataset_schemas: !file src/glue/resources/table_columns.yaml
4 changes: 4 additions & 0 deletions config/develop/namespaced/glue-workflow.yaml
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ dependencies:
- develop/namespaced/glue-job-S3ToJsonS3.yaml
- develop/namespaced/glue-job-JSONToParquet.yaml
- develop/namespaced/glue-job-compare-parquet.yaml
- develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml
- develop/glue-job-role.yaml
- develop/s3-cloudformation-bucket.yaml
parameters:
@@ -19,7 +20,10 @@ parameters:
CompareParquetMainNamespace: "main"
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
CloudformationBucketName: {{ stack_group_config.template_bucket_name }}
ShareableArtifactsBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }}
ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json"
stack_tags:
{{ stack_group_config.default_stack_tags }}
sceptre_user_data:
dataset_schemas: !file src/glue/resources/table_columns.yaml
data_values_expectations: !file src/glue/resources/data_values_expectations.json
1 change: 1 addition & 0 deletions config/prod/glue-job-role.yaml
Original file line number Diff line number Diff line change
@@ -6,5 +6,6 @@ parameters:
S3IntermediateBucketName: {{ stack_group_config.intermediate_bucket_name }}
S3ParquetBucketName: {{ stack_group_config.processed_data_bucket_name }}
S3ArtifactBucketName: {{ stack_group_config.template_bucket_name }}
S3ShareableArtifactBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }}
stack_tags:
{{ stack_group_config.default_stack_tags }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
template:
path: glue-job-run-great-expectations-on-parquet.j2
dependencies:
- prod/glue-job-role.yaml
stack_name: "{{ stack_group_config.namespace }}-glue-job-RunGreatExpectationsParquet"
parameters:
Namespace: {{ stack_group_config.namespace }}
JobDescription: Runs great expectations on a set of data
JobRole: !stack_output_external glue-job-role::RoleArn
TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }}
S3ScriptBucket: {{ stack_group_config.template_bucket_name }}
S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py'
GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}"
AdditionalPythonModules: "great_expectations~=0.18,urllib3<2"
stack_tags:
{{ stack_group_config.default_stack_tags }}
sceptre_user_data:
dataset_schemas: !file src/glue/resources/table_columns.yaml
4 changes: 4 additions & 0 deletions config/prod/namespaced/glue-workflow.yaml
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ dependencies:
- prod/namespaced/glue-job-S3ToJsonS3.yaml
- prod/namespaced/glue-job-JSONToParquet.yaml
- prod/namespaced/glue-job-compare-parquet.yaml
- prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml
- prod/glue-job-role.yaml
- prod/s3-cloudformation-bucket.yaml
parameters:
@@ -19,7 +20,10 @@ parameters:
CompareParquetMainNamespace: "main"
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
CloudformationBucketName: {{ stack_group_config.template_bucket_name }}
ShareableArtifactsBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }}
ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json"
stack_tags:
{{ stack_group_config.default_stack_tags }}
sceptre_user_data:
dataset_schemas: !file src/glue/resources/table_columns.yaml
data_values_expectations: !file src/glue/resources/data_values_expectations.json
408 changes: 408 additions & 0 deletions src/glue/jobs/run_great_expectations_on_parquet.py

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions src/glue/resources/data_values_expectations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"fitbitdailydata": {
"expectation_suite_name": "fitbitdailydata_expectations",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "Calories",
"min_value": 300,
"max_value": 25000
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "Steps",
"min_value": 1,
"max_value": 200000
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "BreathingRate",
"min_value": 4,
"max_value": 40
}
}
]
},
"healthkitv2workouts": {
"expectation_suite_name": "healthkitv2workouts_expectations",
"expectations": [
{
"expectation_type": "expect_column_to_exist",
"kwargs": {
"column": "HealthKitWorkoutKey"
}
}
]
}
}
5 changes: 5 additions & 0 deletions templates/glue-job-role.yaml
Original file line number Diff line number Diff line change
@@ -20,6 +20,9 @@ Parameters:
Type: String
Description: Name of the S3 bucket where cloudformation templates and tests are stored.

S3ShareableArtifactBucketName:
Type: String
Description: Name of the S3 bucket where the shareable artifacts (like great expectations reports) are stored.

Resources:

@@ -65,6 +68,8 @@ Resources:
- !Sub arn:aws:s3:::${S3IntermediateBucketName}/*
- !Sub arn:aws:s3:::${S3ParquetBucketName}
- !Sub arn:aws:s3:::${S3ParquetBucketName}/*
- !Sub arn:aws:s3:::${S3ShareableArtifactBucketName}
- !Sub arn:aws:s3:::${S3ShareableArtifactBucketName}/*
- PolicyName: ReadS3
PolicyDocument:
Version: '2012-10-17'
104 changes: 104 additions & 0 deletions templates/glue-job-run-great-expectations-on-parquet.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: >-
An AWS Glue job in the data catalog. An AWS Glue job encapsulates a script
that connects to your source data, processes it, and then writes it out
to your data target.

Parameters:
Namespace:
Type: String
Description: >-
The namespace string used to for the individual glue job names

JobDescription:
Type: String
Description: A fuller description of what the job does.
Default: ''

JobRole:
Type: String
Description: The name or ARN of the IAM role that will run this job.

TempS3Bucket:
Type: String
Description: The name of the S3 bucket where temporary files and logs are written.

S3ScriptBucket:
Type: String
Description: The name of the S3 bucket where the script file is located.

S3ScriptKey:
Type: String
Description: The bucket key where the script file is located.

GlueVersion:
Type: String
Description: The version of glue to use for this job

DefaultWorkerType:
Type: String
Description: >-
Which worker type to use for this job.
Default: 'Standard'

DefaultNumberOfWorkers:
Type: Number
Description: >-
How many DPUs to allot to this job. This parameter is not used for types
FitbitIntradayCombined and HealthKitV2Samples.
Default: 1

MaxRetries:
Type: Number
Description: How many times to retry the job if it fails (integer).
Default: 0 # TODO change this to 1 after initial development

TimeoutInMinutes:
Type: Number
Description: The job timeout in minutes (integer).
Default: 1200

AdditionalPythonModules:
Type: String
Description: >-
Additional python packages to install as a comma-delimited list.
Any format supported by pip3 is supported here.


Resources:

{% set datasets = [] %}
{% for v in sceptre_user_data.dataset_schemas.tables.keys() if not "Deleted" in v %}
{% set dataset = {} %}
{% do dataset.update({"type": v}) %}
{% do dataset.update({"table_name": "dataset_" + v.lower()})%}
{% do dataset.update({"stackname_prefix": "{}".format(v.replace("_",""))}) %}
{% do datasets.append(dataset) %}
{% endfor %}

{% for dataset in datasets %}
{{ dataset["stackname_prefix"] }}GreatExpectationsParquetJob:
Type: AWS::Glue::Job
Properties:
Command:
Name: glueetl
ScriptLocation: !Sub s3://${S3ScriptBucket}/${S3ScriptKey}
DefaultArguments:
--TempDir: !Sub s3://${TempS3Bucket}/tmp
--enable-continuous-cloudwatch-log: true
--enable-metrics: true
--enable-spark-ui: true
--spark-event-logs-path: !Sub s3://${TempS3Bucket}/spark-logs/${AWS::StackName}/
--job-bookmark-option: job-bookmark-disable
--job-language: python
--additional-python-modules: !Ref AdditionalPythonModules
# --conf spark.sql.adaptive.enabled
Description: !Sub "${JobDescription} for data type {{ dataset['type'] }}"
GlueVersion: !Ref GlueVersion
MaxRetries: !Ref MaxRetries
Name: !Sub "${Namespace}-{{ dataset["stackname_prefix"] }}-GreatExpectationsParquetJob"
WorkerType: !Ref DefaultWorkerType
NumberOfWorkers: !Ref DefaultNumberOfWorkers
Role: !Ref JobRole
Timeout: !Ref TimeoutInMinutes
{% endfor %}
41 changes: 38 additions & 3 deletions templates/glue-workflow.j2
Original file line number Diff line number Diff line change
@@ -82,6 +82,14 @@ Parameters:
Description: >-
The name of the bucket where the cloudformation and artifacts are stored.

ShareableArtifactsBucketName:
Type: String
Description: The name of the bucket where shareable artifacts are stored.

ExpectationSuiteKey:
Type: String
Description: The s3 key prefix of the expectation suite.

Conditions:
IsMainNamespace: !Equals [!Ref Namespace, "main"]
IsDevelopmentNamespace: !Not [!Equals [!Ref Namespace, "main"]]
@@ -270,11 +278,11 @@ Resources:
StartOnCreation: true
WorkflowName: !Ref JsonToParquetWorkflow

JsontoParquetCompleteTrigger:
CompareParquetTrigger:
Type: AWS::Glue::Trigger
Condition: IsDevelopmentNamespace
Properties:
Name: !Sub "${Namespace}-JsontoParquetCompleteTrigger"
Name: !Sub "${Namespace}-CompareParquetTrigger"
Actions:
{% for dataset in datasets %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"] }}-CompareParquetJob
@@ -287,7 +295,7 @@ Resources:
"--parquet-bucket": !Ref ParquetBucketName
"--additional-python-modules": "datacompy~=0.8 flask~=2.0 flask-cors~=3.0"
{% endfor %}
Description: This trigger runs after completion of all JSON to Parquet jobs
Description: This trigger runs the compare parquet jobs after completion of all JSON to Parquet jobs
Type: CONDITIONAL
Predicate:
Conditions:
@@ -300,6 +308,33 @@ Resources:
StartOnCreation: true
WorkflowName: !Ref JsonToParquetWorkflow


{% for dataset in datasets if dataset["data_type"].lower() in sceptre_user_data.data_values_expectations %}
{{ dataset['stackname_prefix'] }}GreatExpectationsParquetTrigger:
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-{{ dataset['stackname_prefix'] }}GreatExpectationsParquetTrigger"
Actions:
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"] }}-GreatExpectationsParquetJob
Arguments:
"--data-type": {{ dataset["data_type"].lower() }}
"--namespace": !Ref Namespace
"--cfn-bucket": !Ref CloudformationBucketName
"--parquet-bucket": !Ref ParquetBucketName
"--shareable-artifacts-bucket": !Ref ShareableArtifactsBucketName
"--expectation-suite-key": !Ref ExpectationSuiteKey
"--additional-python-modules": "great_expectations~=0.18,urllib3<2"
Description: This trigger runs the great expectation parquet job for this data type after completion of the JSON to Parquet job for this data type
Type: CONDITIONAL
Predicate:
Conditions:
- JobName: !Sub "${Namespace}-{{ dataset['stackname_prefix'] }}-Job"
State: SUCCEEDED
LogicalOperator: EQUALS
StartOnCreation: true
WorkflowName: !Ref JsonToParquetWorkflow
{% endfor %}
Outputs:
S3ToJsonWorkflowName:
2 changes: 1 addition & 1 deletion tests/Dockerfile.aws_glue_4
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM amazon/aws-glue-libs:glue_libs_4.0.0_image_01

RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0
RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0 great_expectations~=0.18
ENTRYPOINT ["bash", "-l"]
11 changes: 10 additions & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
### Running tests
### Tests in RECOVER
Tests are defined in the `tests` folder in this project.

### Running tests locally
See here for information on how to run the tests locally especially when you are adding tests.

#### Running tests using Docker
All tests can be run inside a Docker container which includes all the necessary
Glue/Spark dependencies and simulates the environment which the Glue jobs
@@ -17,6 +20,7 @@ These scripts needs to be run inside a Docker container:

- Under AWS 4.0 (Dockerfile.aws_glue_4)
- test_json_to_parquet.py (Note that these tests deploys test resources to aws and will take several min to run)
- test_run_great_expectations_on_parquet.py

Run the following commands to run tests for:

@@ -83,6 +87,8 @@ pytest with other tests because they have to be run in a Dockerfile with the AWS
#### Running tests for lambda
Run the following command from the repo root to run tests for the lambda functions (in develop).

Example:

```shell script
python3 -m pytest tests/test_s3_to_glue_lambda.py -v
```
@@ -103,3 +109,6 @@ python3 -m pytest tests/test_setup_external_storage.py
--namespace <put_namespace_here>
--test_sts_permission <put_the_type_of_permission_to_test_here>
```

#### Adding tests to Recover CI/CD
Tests are run automatically as part of `upload-and-deploy.yaml` Github workflow. See [Github Workflows README](.github/workflows/README.md#adding-test-commands-to-github-workflow-jobs) for more details on the workflow.
381 changes: 381 additions & 0 deletions tests/test_run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,381 @@
from unittest.mock import MagicMock, patch

import great_expectations as gx
import pytest
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.core.yaml_handler import YAMLHandler
from great_expectations.data_context.types.resource_identifiers import (
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from pyspark.sql import SparkSession

from src.glue.jobs import run_great_expectations_on_parquet as run_gx_on_pq


@pytest.fixture
def test_context(scope="function"):
context = gx.get_context()
yield context


@pytest.fixture(scope="function")
def test_spark():
yield SparkSession.builder.appName("BatchRequestTest").getOrCreate()


def test_create_context():
with (
patch.object(gx, "get_context") as mock_get_context,
patch.object(run_gx_on_pq, "add_datasource") as mock_add_datasource,
patch.object(
run_gx_on_pq, "add_validation_stores"
) as mock_add_validation_stores,
patch.object(run_gx_on_pq, "add_data_docs_sites") as mock_add_data_docs_sites,
):
mock_context = MagicMock()
mock_get_context.return_value = mock_context

s3_bucket = "test-bucket"
namespace = "test-namespace"
key_prefix = "test-prefix"

# Call the function
result_context = run_gx_on_pq.create_context(s3_bucket, namespace, key_prefix)

# Assert that the context returned is the mock context
assert result_context == mock_context

# Assert that the other functions were called
mock_add_datasource.assert_called_once_with(mock_context)
mock_add_validation_stores.assert_called_once_with(
mock_context, s3_bucket, namespace, key_prefix
)
mock_add_data_docs_sites.assert_called_once_with(
mock_context, s3_bucket, namespace, key_prefix
)


def test_that_add_datasource_calls_correctly():
mock_context = MagicMock()
result_context = run_gx_on_pq.add_datasource(mock_context)

# Verify that the datasource was added
mock_context.add_datasource.assert_called_once()
assert result_context == mock_context


@pytest.mark.integration
def test_that_add_datasource_adds_correctly(test_context):
# Assuming you've already added a datasource, you can list it
run_gx_on_pq.add_datasource(test_context)
datasources = test_context.list_datasources()

# Define the expected datasource name
expected_datasource_name = "spark_datasource"

# Check that the expected datasource is present and other details are correct
assert any(
ds["name"] == expected_datasource_name for ds in datasources
), f"Datasource '{expected_datasource_name}' was not added correctly."
datasource = next(
ds for ds in datasources if ds["name"] == expected_datasource_name
)
assert datasource["class_name"] == "Datasource"
assert "SparkDFExecutionEngine" in datasource["execution_engine"]["class_name"]


def test_add_validation_stores_has_expected_calls():
mock_context = MagicMock()
s3_bucket = "test-bucket"
namespace = "test-namespace"
key_prefix = "test-prefix"

with patch.object(mock_context, "add_store") as mock_add_store:
# Call the function
result_context = run_gx_on_pq.add_validation_stores(
mock_context, s3_bucket, namespace, key_prefix
)

# Verify that the validation store is added
mock_add_store.assert_called_once_with(
"validation_result_store",
{
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": s3_bucket,
"prefix": f"{namespace}/{key_prefix}",
},
},
)

assert result_context == mock_context


@pytest.mark.integration
def test_validation_store_details(test_context):
# Mock context and stores
run_gx_on_pq.add_validation_stores(
test_context,
s3_bucket="test-bucket",
namespace="test",
key_prefix="test_folder/",
)

# Run the test logic
stores = test_context.list_stores()
expected_store_name = "validation_result_store"

assert any(store["name"] == expected_store_name for store in stores)
# pulls the store we want
store_config = [store for store in stores if store["name"] == expected_store_name][
0
]

assert store_config["class_name"] == "ValidationsStore"
assert store_config["store_backend"]["class_name"] == "TupleS3StoreBackend"
assert store_config["store_backend"]["bucket"] == "test-bucket"
assert store_config["store_backend"]["prefix"] == "test/test_folder/"


def test_get_spark_df_has_expected_calls():
glue_context = MagicMock()
mock_dynamic_frame = MagicMock()
mock_spark_df = MagicMock()
mock_dynamic_frame.toDF.return_value = mock_spark_df

with patch.object(
glue_context, "create_dynamic_frame_from_options"
) as mock_create_dynamic_frame:
mock_create_dynamic_frame.return_value = mock_dynamic_frame

parquet_bucket = "test-bucket"
namespace = "test-namespace"
data_type = "test-data"

result_df = run_gx_on_pq.get_spark_df(
glue_context, parquet_bucket, namespace, data_type
)

# Verify the S3 path and the creation of the DynamicFrame
expected_path = f"s3://test-bucket/test-namespace/parquet/dataset_test-data/"
mock_create_dynamic_frame.assert_called_once_with(
connection_type="s3",
connection_options={"paths": [expected_path]},
format="parquet",
)

# Verify the conversion to DataFrame
assert result_df == mock_spark_df


def test_get_batch_request():
spark_dataset = MagicMock()
data_type = "test-data"
run_id = RunIdentifier(run_name="2023_09_04")

batch_request = run_gx_on_pq.get_batch_request(spark_dataset, data_type, run_id)

# Verify the RuntimeBatchRequest is correctly set up
assert isinstance(batch_request, RuntimeBatchRequest)
assert batch_request.data_asset_name == f"{data_type}-parquet-data-asset"
assert batch_request.batch_identifiers == {
"batch_identifier": f"{data_type}_{run_id.run_name}_batch"
}
assert batch_request.runtime_parameters == {"batch_data": spark_dataset}


@pytest.mark.integration
def test_that_get_batch_request_details_are_correct(test_spark):
# Create a simple PySpark DataFrame to simulate the dataset
data = [("Alice", 34), ("Bob", 45), ("Charlie", 29)]
columns = ["name", "age"]
spark_dataset = test_spark.createDataFrame(data, columns)

# Create a RunIdentifier
run_id = RunIdentifier(run_name="test_run_2023")

# Call the function and get the RuntimeBatchRequest
data_type = "user_data"
batch_request = run_gx_on_pq.get_batch_request(spark_dataset, data_type, run_id)

# Assertions to check that the batch request is properly populated
assert isinstance(batch_request, RuntimeBatchRequest)
assert batch_request.datasource_name == "spark_datasource"
assert batch_request.data_connector_name == "runtime_data_connector"
assert batch_request.data_asset_name == "user_data-parquet-data-asset"
assert (
batch_request.batch_identifiers["batch_identifier"]
== "user_data_test_run_2023_batch"
)
assert batch_request.runtime_parameters["batch_data"] == spark_dataset


def test_read_json_correctly_returns_expected_values():
s3_bucket = "test-bucket"
key = "test-key"

# Mock the S3 response
mock_s3_response = MagicMock()
mock_s3_response["Body"].read.return_value = '{"test_key": "test_value"}'.encode(
"utf-8"
)

# Use patch to mock the boto3 s3 client
with patch("boto3.client") as mock_s3_client:
# Mock get_object method
mock_s3_client.return_value.get_object.return_value = mock_s3_response

# Call the function
result = run_gx_on_pq.read_json(mock_s3_client.return_value, s3_bucket, key)

# Verify that the S3 client was called with the correct parameters
mock_s3_client.return_value.get_object.assert_called_once_with(
Bucket=s3_bucket, Key=key
)

# Verify the result
assert result == {"test_key": "test_value"}


def test_that_add_expectations_from_json_has_expected_call():
mock_context = MagicMock()

# Sample expectations data
expectations_data = {
"test-data": {
"expectation_suite_name": "test_suite",
"expectations": [
{
"expectation_type": "expect_column_to_exist",
"kwargs": {"column": "test_column"},
},
],
}
}

data_type = "test-data"

# Call the function
run_gx_on_pq.add_expectations_from_json(expectations_data, mock_context, data_type)

# Verify expectations were added to the context
mock_context.add_or_update_expectation_suite.assert_called_once()


def test_that_add_expectations_from_json_throws_value_error():
mock_context = MagicMock()

# Sample expectations data
expectations_data = {
"not-test-data": {
"expectation_suite_name": "test_suite",
"expectations": [
{
"expectation_type": "expect_column_to_exist",
"kwargs": {"column": "test_column"},
},
],
}
}

data_type = "test-data"
with pytest.raises(
ValueError, match="No expectations found for data type 'test-data'"
):
run_gx_on_pq.add_expectations_from_json(
expectations_data, mock_context, data_type
)


@pytest.mark.integration
def test_add_expectations_from_json_adds_details_correctly(test_context):
# Mock expectations data
expectations_data = {
"user_data": {
"expectation_suite_name": "user_data_suite",
"expectations": [
{
"expectation_type": "expect_column_to_exist",
"kwargs": {"column": "user_id"},
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {"column": "age", "min_value": 18, "max_value": 65},
},
],
}
}

data_type = "user_data"

# Call the function to add expectations
test_context = run_gx_on_pq.add_expectations_from_json(
expectations_data, test_context, data_type
)

# Retrieve the expectation suite to verify that expectations were added
expectation_suite = test_context.get_expectation_suite("user_data_suite")

assert expectation_suite.expectation_suite_name == "user_data_suite"
assert len(expectation_suite.expectations) == 2

# Verify the details of the first expectation
first_expectation = expectation_suite.expectations[0]
assert first_expectation.expectation_type == "expect_column_to_exist"
assert first_expectation.kwargs == {"column": "user_id"}

# Verify the details of the second expectation
second_expectation = expectation_suite.expectations[1]
assert second_expectation.expectation_type == "expect_column_values_to_be_between"
assert second_expectation.kwargs == {
"column": "age",
"min_value": 18,
"max_value": 65,
}


def test_that_add_validation_results_to_store_has_expected_calls():
# Mock the EphemeralDataContext and the necessary components
mock_context = MagicMock()
mock_expectation_suite = MagicMock()
mock_context.get_expectation_suite.return_value = mock_expectation_suite
mock_expectation_suite.expectation_suite_name = "test_suite"

# Mock the validation result data
validation_result = {"result": "test_result"}

# Create a mock batch identifier and run identifier
mock_batch_identifier = MagicMock(spec=RuntimeBatchRequest)
mock_run_identifier = MagicMock(spec=RunIdentifier)

# Call the function with mocked inputs
result_context = run_gx_on_pq.add_validation_results_to_store(
context=mock_context,
expectation_suite_name="test_suite",
validation_result=validation_result,
batch_identifier=mock_batch_identifier,
run_identifier=mock_run_identifier,
)

# Assert that the expectation suite was retrieved correctly
mock_context.get_expectation_suite.assert_called_once_with("test_suite")

expected_expectation_suite_identifier = ExpectationSuiteIdentifier(
expectation_suite_name="test_suite"
)
expected_validation_result_identifier = ValidationResultIdentifier(
expectation_suite_identifier=expected_expectation_suite_identifier,
batch_identifier=mock_batch_identifier,
run_id=mock_run_identifier,
)

# Verify that the validation result was added to the validations store
mock_context.validations_store.set.assert_called_once_with(
expected_validation_result_identifier, validation_result
)

# Check that the context is returned
assert result_context == mock_context

0 comments on commit 30d1873

Please sign in to comment.