From 9b8abbffd9472fa505953236d861b514f73eb7fb Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Fri, 18 Oct 2024 15:23:24 -0700 Subject: [PATCH] consolidate GX docs to a single docsite --- .../upload-and-deploy-to-prod-main.yaml | 7 +- .github/workflows/upload-and-deploy.yaml | 2 + ...job-run-great-expectations-on-parquet.yaml | 5 +- config/develop/namespaced/glue-workflow.yaml | 2 - ...job-run-great-expectations-on-parquet.yaml | 5 +- config/prod/namespaced/glue-workflow.yaml | 2 - .../jobs/run_great_expectations_on_parquet.py | 362 ++++++------------ src/glue/resources/great_expectations.yml | 58 +++ src/scripts/manage_artifacts/artifacts.py | 39 +- ...e-job-run-great-expectations-on-parquet.j2 | 28 +- templates/glue-workflow.j2 | 17 +- .../test_run_great_expectations_on_parquet.py | 272 ++----------- 12 files changed, 273 insertions(+), 526 deletions(-) create mode 100644 src/glue/resources/great_expectations.yml diff --git a/.github/workflows/upload-and-deploy-to-prod-main.yaml b/.github/workflows/upload-and-deploy-to-prod-main.yaml index 929940b8..e8cfdf39 100644 --- a/.github/workflows/upload-and-deploy-to-prod-main.yaml +++ b/.github/workflows/upload-and-deploy-to-prod-main.yaml @@ -41,7 +41,12 @@ jobs: python_version: ${{ env.PYTHON_VERSION }} - name: Copy files to templates bucket - run: python src/scripts/manage_artifacts/artifacts.py --upload --namespace $NAMESPACE --cfn_bucket ${{ vars.CFN_BUCKET }} + run: > + python src/scripts/manage_artifacts/artifacts.py + --upload + --namespace $NAMESPACE + --cfn_bucket ${{ vars.CFN_BUCKET }} + --shareable-artifacts-bucket ${{ vars.SHAREABLE_ARTIFACTS_BUCKET }} sceptre-deploy-main: diff --git a/.github/workflows/upload-and-deploy.yaml b/.github/workflows/upload-and-deploy.yaml index 5420909c..ea8b8773 100755 --- a/.github/workflows/upload-and-deploy.yaml +++ b/.github/workflows/upload-and-deploy.yaml @@ -115,6 +115,7 @@ jobs: --upload --namespace $NAMESPACE --cfn_bucket ${{ vars.CFN_BUCKET }} + --shareable-artifacts-bucket ${{ vars.SHAREABLE_ARTIFACTS_BUCKET }} nonglue-unit-tests: name: Runs unit tests that are not dependent on aws-glue package resources @@ -437,6 +438,7 @@ jobs: --upload --namespace staging --cfn_bucket ${{ vars.CFN_BUCKET }} + --shareable-artifacts-bucket ${{ vars.SHAREABLE_ARTIFACTS_BUCKET }} - name: Create directory for remote sceptre templates run: mkdir -p templates/remote/ diff --git a/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml index 1e6d461e..a99b0130 100644 --- a/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml +++ b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml @@ -7,12 +7,15 @@ parameters: Namespace: {{ stack_group_config.namespace }} JobDescription: Runs great expectations on a set of data JobRole: !stack_output_external glue-job-role::RoleArn - TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }} + ParquetBucket: {{ stack_group_config.processed_data_bucket_name }} + ShareableArtifactsBucket: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} S3ScriptBucket: {{ stack_group_config.template_bucket_name }} S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py' + ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}" AdditionalPythonModules: "great_expectations~=0.18,urllib3<2" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: dataset_schemas: !file src/glue/resources/table_columns.yaml + data_values_expectations: !file src/glue/resources/data_values_expectations.json diff --git a/config/develop/namespaced/glue-workflow.yaml b/config/develop/namespaced/glue-workflow.yaml index 6861a72e..1f51a5bd 100644 --- a/config/develop/namespaced/glue-workflow.yaml +++ b/config/develop/namespaced/glue-workflow.yaml @@ -20,8 +20,6 @@ parameters: CompareParquetMainNamespace: "main" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} CloudformationBucketName: {{ stack_group_config.template_bucket_name }} - ShareableArtifactsBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} - ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: diff --git a/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml b/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml index f0e8dd2a..d91ff9ae 100644 --- a/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml +++ b/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml @@ -7,12 +7,15 @@ parameters: Namespace: {{ stack_group_config.namespace }} JobDescription: Runs great expectations on a set of data JobRole: !stack_output_external glue-job-role::RoleArn - TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }} + ParquetBucket: {{ stack_group_config.processed_data_bucket_name }} + ShareableArtifactsBucket: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} S3ScriptBucket: {{ stack_group_config.template_bucket_name }} S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py' + ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}" AdditionalPythonModules: "great_expectations~=0.18,urllib3<2" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: dataset_schemas: !file src/glue/resources/table_columns.yaml + data_values_expectations: !file src/glue/resources/data_values_expectations.json diff --git a/config/prod/namespaced/glue-workflow.yaml b/config/prod/namespaced/glue-workflow.yaml index 3223adb9..ca20b3f0 100644 --- a/config/prod/namespaced/glue-workflow.yaml +++ b/config/prod/namespaced/glue-workflow.yaml @@ -20,8 +20,6 @@ parameters: CompareParquetMainNamespace: "main" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} CloudformationBucketName: {{ stack_group_config.template_bucket_name }} - ShareableArtifactsBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} - ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: diff --git a/src/glue/jobs/run_great_expectations_on_parquet.py b/src/glue/jobs/run_great_expectations_on_parquet.py index 68c0e990..da6b0032 100644 --- a/src/glue/jobs/run_great_expectations_on_parquet.py +++ b/src/glue/jobs/run_great_expectations_on_parquet.py @@ -1,5 +1,7 @@ import json import logging +import os +import subprocess import sys from datetime import datetime from typing import Dict @@ -39,6 +41,7 @@ def read_args() -> dict: "namespace", "data-type", "expectation-suite-key", + "gx-resources-key-prefix", ], ) for arg in args: @@ -61,145 +64,51 @@ def validate_args(value: str) -> None: return None -def create_context( - s3_bucket: str, namespace: str, key_prefix: str -) -> "EphemeralDataContext": - """Creates the data context and adds stores, - datasource and data docs configurations - - Args: - s3_bucket (str): name of s3 bucket to store to - namespace (str): namespace - key_prefix (str): s3 key prefix - - Returns: - EphemeralDataContext: context object with all - configurations - """ - context = gx.get_context() - add_datasource(context) - add_validation_stores(context, s3_bucket, namespace, key_prefix) - add_data_docs_sites(context, s3_bucket, namespace, key_prefix) - return context - - -def add_datasource(context: "EphemeralDataContext") -> "EphemeralDataContext": - """Adds the spark datasource - - Args: - context (EphemeralDataContext): data context to add to - - Returns: - EphemeralDataContext: data context object with datasource configuration - added - """ - yaml = YAMLHandler() - context.add_datasource( - **yaml.load( - """ - name: spark_datasource - class_name: Datasource - execution_engine: - class_name: SparkDFExecutionEngine - force_reuse_spark_context: true - data_connectors: - runtime_data_connector: - class_name: RuntimeDataConnector - batch_identifiers: - - batch_identifier - """ - ) - ) - return context - - -def add_validation_stores( - context: "EphemeralDataContext", +def update_data_docs_sites( + context: gx.data_context.AbstractDataContext, s3_bucket: str, namespace: str, - key_prefix: str, -) -> "EphemeralDataContext": - """Adds the validation store configurations to the context object - - Args: - context (EphemeralDataContext): data context to add to - s3_bucket (str): name of the s3 bucket to save validation results to - namespace (str): name of the namespace - key_prefix (str): s3 key prefix to save the - validation results to - - Returns: - EphemeralDataContext: data context object with validation stores' - configuration added +) -> gx.data_context.AbstractDataContext: """ - # Programmatically configure the validation result store and - # DataDocs to use S3 - context.add_store( - "validation_result_store", - { - "class_name": "ValidationsStore", - "store_backend": { - "class_name": "TupleS3StoreBackend", - "bucket": s3_bucket, - "prefix": f"{namespace}/{key_prefix}", - }, - }, - ) - return context - - -def add_data_docs_sites( - context: "EphemeralDataContext", - s3_bucket: str, - namespace: str, - key_prefix: str, -) -> "EphemeralDataContext": - """Adds the data docs sites configuration to the context object - so data docs can be saved to a s3 location. This is a special - workaround to add the data docs because we're using EphemeralDataContext - context objects and they don't store to memory. + Updates the `data_docs_sites` configuration to reflect the appropriate environment and namespace Args: - context (EphemeralDataContext): data context to add to - s3_bucket (str): name of the s3 bucket to save gx docs to - namespace (str): name of the namespace - key_prefix (str): s3 key prefix to save the - gx docs to + context (gx.data_context.AbstractDataContext): The GX data context to update + s3_bucket (str): The S3 bucket where data docs are written + namespace (str): The current namespace Returns: - EphemeralDataContext: data context object with data docs sites' - configuration added + gx.data_context.AbstractDataContext: The updated GX data context object """ - data_context_config = DataContextConfig() - data_context_config["data_docs_sites"] = { - "s3_site": { + context.update_data_docs_site( + site_name="s3_site", + site_config={ "class_name": "SiteBuilder", "store_backend": { "class_name": "TupleS3StoreBackend", "bucket": s3_bucket, - "prefix": f"{namespace}/{key_prefix}", + "prefix": f"{namespace}/great_expectation_reports/parquet/", }, "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"}, - } - } - context._project_config["data_docs_sites"] = data_context_config["data_docs_sites"] + }, + ) return context def get_spark_df( glue_context: GlueContext, parquet_bucket: str, namespace: str, data_type: str ) -> "pyspark.sql.dataframe.DataFrame": - """Reads in the parquet dataset as a Dynamic Frame and converts it - to a spark dataframe + """ + Read a data-type-specific Parquet dataset Args: - glue_context (GlueContext): the aws glue context object - parquet_bucket (str): the name of the bucket holding parquet files - namespace (str): the namespace - data_type (str): the data type name + glue_context (GlueContext): The AWS Glue context object + parquet_bucket (str): The S3 bucket containing the data-type-specific Parquet dataset + namespace (str): The associated namespace + data_type (str): The associated data type Returns: - pyspark.sql.dataframe.DataFrame: spark dataframe of the read in parquet dataset + pyspark.sql.dataframe.DataFrame: A Spark dataframe over our data-type-specific Parquet dataset """ s3_parquet_path = f"s3://{parquet_bucket}/{namespace}/parquet/dataset_{data_type}/" dynamic_frame = glue_context.create_dynamic_frame_from_options( @@ -212,29 +121,24 @@ def get_spark_df( def get_batch_request( + gx_context: gx.data_context.AbstractDataContext, spark_dataset: "pyspark.sql.dataframe.DataFrame", data_type: str, - run_id: RunIdentifier, -) -> RuntimeBatchRequest: - """Retrieves the unique metadata for this batch request +) -> gx.datasource.fluent.batch_request.BatchRequest: + """ + Get a GX batch request over a Spark dataframe Args: - spark_dataset (pyspark.sql.dataframe.DataFrame): parquet dataset as spark df - data_type (str): data type name - run_id (RunIdentifier): contains the run name and - run time metadata of this batch run + spark_dataset (pyspark.sql.dataframe.DataFrame): A Spark dataframe + data_type (str): The data type Returns: - RuntimeBatchRequest: contains metadata for the batch run request - to identify this great expectations run + BatchRequest: A batch request which can be used in conjunction + with an expectation suite to validate our data. """ - batch_request = RuntimeBatchRequest( - datasource_name="spark_datasource", - data_connector_name="runtime_data_connector", - data_asset_name=f"{data_type}-parquet-data-asset", - runtime_parameters={"batch_data": spark_dataset}, - batch_identifiers={"batch_identifier": f"{data_type}_{run_id.run_name}_batch"}, - ) + data_source = gx_context.sources.add_or_update_spark(name="parquet") + data_asset = data_source.add_dataframe_asset(name=f"{data_type}_spark_dataframe") + batch_request = data_asset.build_batch_request(dataframe=spark_dataset) return batch_request @@ -243,13 +147,13 @@ def read_json( s3_bucket: str, key: str, ) -> Dict[str, str]: - """Reads in a json object + """ + Read a JSON file from an S3 bucket Args: - s3 (boto3.client): s3 client connection - s3_bucket (str): name of the s3 bucket to read from - key (str): s3 key prefix of the - location of the json to read from + s3 (boto3.client): An S3 client + s3_bucket (str): The S3 bucket containing the JSON file + key (str): The S3 key of the JSON file Returns: Dict[str, str]: the data read in from json @@ -263,112 +167,73 @@ def read_json( def add_expectations_from_json( expectations_data: Dict[str, str], - context: "EphemeralDataContext", - data_type: str, -) -> "EphemeralDataContext": - """Adds in the read in expectations to the context object - - Args: - expectations_data (Dict[str, str]): expectations - context (EphemeralDataContext): context object - data_type (str): name of the data type - - Raises: - ValueError: thrown when no expectations exist for this data type - - Returns: - EphemeralDataContext: context object with expectations added + context: gx.data_context.AbstractDataContext, +) -> gx.data_context.AbstractDataContext: """ - # Ensure the data type exists in the JSON file - if data_type not in expectations_data: - raise ValueError(f"No expectations found for data type '{data_type}'") - - # Extract the expectation suite and expectations for the dataset - suite_data = expectations_data[data_type] - expectation_suite_name = suite_data["expectation_suite_name"] - new_expectations = suite_data["expectations"] - - # Convert new expectations from JSON format to ExpectationConfiguration objects - new_expectations_configs = [ - ExpectationConfiguration( - expectation_type=exp["expectation_type"], kwargs=exp["kwargs"] - ) - for exp in new_expectations - ] - - # Update the expectation suite in the data context - context.add_or_update_expectation_suite( - expectation_suite_name=expectation_suite_name, - expectations=new_expectations_configs, - ) - return context - - -def add_validation_results_to_store( - context: "EphemeralDataContext", - expectation_suite_name: str, - validation_result: Dict[str, str], - batch_identifier: RuntimeBatchRequest, - run_identifier: RunIdentifier, -) -> "EphemeralDataContext": - """Adds the validation results manually to the validation store. - This is a workaround for a EphemeralDataContext context object, - and for us to avoid complicating our folder structure to include - checkpoints/other more persistent data context object types - until we need that feature + Add an expectation suite with expectations to our GX data context for each data type. Args: - context (EphemeralDataContext): context object to add results to - expectation_suite_name (str): name of expectation suite - validation_result (Dict[str, str]): results outputted by gx - validator to be stored - batch_identifier (RuntimeBatchRequest): metadata containing details of - the batch request - run_identifier (RunIdentifier): metadata containing details of the gx run + expectations_data (Dict[str, str]): A mapping of data types to their expectations. + The expectations should be formatted like so: + + { + "expectation_suite_name": "string", + "expectations": { + "expectation_type": "str", + "kwargs": "readable by `ExpectationConfiguration`" + } + } + context (gx.data_context.AbstractDataContext): context object Returns: - EphemeralDataContext: context object with validation results added to + gx.data_context.AbstractDataContext: A GX data context object with expectation suites added """ - expectation_suite = context.get_expectation_suite(expectation_suite_name) - # Create an ExpectationSuiteIdentifier - expectation_suite_identifier = ExpectationSuiteIdentifier( - expectation_suite_name=expectation_suite.expectation_suite_name - ) - - # Create a ValidationResultIdentifier using the run_id, expectation suite, and batch identifier - validation_result_identifier = ValidationResultIdentifier( - expectation_suite_identifier=expectation_suite_identifier, - batch_identifier=batch_identifier, - run_id=run_identifier, - ) - - context.validations_store.set(validation_result_identifier, validation_result) + for data_type in expectations_data: + suite_data = expectations_data[data_type] + expectation_suite_name = suite_data["expectation_suite_name"] + new_expectations = suite_data["expectations"] + + # Convert new expectations from dict to ExpectationConfiguration objects + new_expectations_configs = [ + ExpectationConfiguration( + expectation_type=exp["expectation_type"], kwargs=exp["kwargs"] + ) + for exp in new_expectations + ] + + # Update the expectation suite in the data context + context.add_or_update_expectation_suite( + expectation_suite_name=expectation_suite_name, + expectations=new_expectations_configs, + ) return context def main(): args = read_args() + s3 = boto3.client("s3") + # Download GX stores and configuration + subprocess.run( + args=[ + "aws", + "s3", + "sync", + f"s3://{os.path.join(args['shareable_artifacts_bucket'], args['gx_resources_key_prefix'])}", + ".", + ], + check=True, + ) run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}") expectation_suite_name = f"{args['data_type']}_expectations" - s3 = boto3.client("s3") - context = create_context( + + # Set up Great Expectations + gx_context = gx.get_context() + logger.info("update_data_docs_site") + gx_context = update_data_docs_sites( + context=gx_context, s3_bucket=args["shareable_artifacts_bucket"], namespace=args["namespace"], - key_prefix=f"great_expectation_reports/{args['data_type']}/parquet/", - ) - glue_context = GlueContext(SparkContext.getOrCreate()) - logger.info("get_spark_df") - spark_df = get_spark_df( - glue_context=glue_context, - parquet_bucket=args["parquet_bucket"], - namespace=args["namespace"], - data_type=args["data_type"], ) - logger.info("get_batch_request") - batch_request = get_batch_request(spark_df, args["data_type"], run_id) - logger.info("add_expectations") - - # Load the JSON file with the expectations logger.info("reads_expectations_from_json") expectations_data = read_json( s3=s3, @@ -376,32 +241,37 @@ def main(): key=args["expectation_suite_key"], ) logger.info("adds_expectations_from_json") - add_expectations_from_json( + gx_context = add_expectations_from_json( expectations_data=expectations_data, - context=context, - data_type=args["data_type"], - ) - logger.info("get_validator") - validator = context.get_validator( - batch_request=batch_request, - expectation_suite_name=expectation_suite_name, + context=gx_context, ) - logger.info("validator.validate") - validation_result = validator.validate() - logger.info("validation_result: %s", validation_result) + # Set up Spark + glue_context = GlueContext(SparkContext.getOrCreate()) + logger.info("get_spark_df") + spark_df = get_spark_df( + glue_context=glue_context, + parquet_bucket=args["parquet_bucket"], + namespace=args["namespace"], + data_type=args["data_type"], + ) - add_validation_results_to_store( - context, - expectation_suite_name, - validation_result, - batch_identifier=batch_request["batch_identifiers"]["batch_identifier"], - run_identifier=run_id, + # Put the two together and validate the GX expectations + logger.info("get_batch_request") + batch_request = get_batch_request( + gx_context=gx_context, spark_dataset=spark_df, data_type=args["data_type"] ) - context.build_data_docs( - site_names=["s3_site"], + logger.info("add_or_update_checkpoint") + # The default checkpoint action list is: + # StoreValidationResultAction, StoreEvaluationParametersAction, UpdateDataDocsAction + checkpoint = gx_context.add_or_update_checkpoint( + name=f"{args['data_type']}-checkpoint", + expectation_suite_name=expectation_suite_name, + batch_request=batch_request, ) - logger.info("data docs saved!") + logger.info("run checkpoint") + checkpoint_result = checkpoint.run(run_id=run_id) + logger.info("data docs updated!") if __name__ == "__main__": diff --git a/src/glue/resources/great_expectations.yml b/src/glue/resources/great_expectations.yml new file mode 100644 index 00000000..9ce83cdc --- /dev/null +++ b/src/glue/resources/great_expectations.yml @@ -0,0 +1,58 @@ +config_version: 3.0 +stores: + expectations_store: + class_name: ExpectationsStore + store_backend: + class_name: TupleFilesystemStoreBackend + base_directory: expectations/ + validations_store: + class_name: ValidationsStore + store_backend: + class_name: TupleFilesystemStoreBackend + base_directory: uncommitted/validations/ + evaluation_parameter_store: + class_name: EvaluationParameterStore + checkpoint_store: + class_name: CheckpointStore + store_backend: + class_name: TupleFilesystemStoreBackend + suppress_store_backend_id: true + base_directory: checkpoints/ + profiler_store: + class_name: ProfilerStore + store_backend: + class_name: TupleFilesystemStoreBackend + suppress_store_backend_id: true + base_directory: profilers/ +expectations_store_name: expectations_store +validations_store_name: validations_store +evaluation_parameter_store_name: evaluation_parameter_store +checkpoint_store_name: checkpoint_store +datasources: + spark_datasource: + class_name: Datasource + execution_engine: + class_name: SparkDFExecutionEngine + force_reuse_spark_context: true + data_connectors: + runtime_data_connector: + class_name: RuntimeDataConnector + batch_identifiers: + - batch_identifier +fluent_datasources: + parquet: + type: spark + assets: {} +data_docs_sites: + s3_site: + class_name: SiteBuilder + store_backend: + class_name: TupleS3StoreBackend + bucket: recover-shareable-artifacts-vpn + prefix: main/great_expectation_reports/parquet + site_index_builder: + class_name: DefaultSiteIndexBuilder +include_rendered_content: + globally: false + expectation_suite: false + expectation_validation_result: false diff --git a/src/scripts/manage_artifacts/artifacts.py b/src/scripts/manage_artifacts/artifacts.py index cef93993..b6167080 100755 --- a/src/scripts/manage_artifacts/artifacts.py +++ b/src/scripts/manage_artifacts/artifacts.py @@ -10,6 +10,7 @@ def read_args(): parser = argparse.ArgumentParser(description="") parser.add_argument("--namespace") parser.add_argument("--cfn_bucket", required=True) + parser.add_argument("--shareable-artifacts-bucket", required=True) group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--upload", action="store_true") group.add_argument("--remove", action="store_true") @@ -18,9 +19,9 @@ def read_args(): return args -def execute_command(cmd: str): +def execute_command(cmd: list[str]): print(f'Invoking command: {" ".join(cmd)}') - subprocess.run(cmd) + subprocess.run(cmd, check=True) def upload(namespace: str, cfn_bucket: str): @@ -46,6 +47,35 @@ def upload(namespace: str, cfn_bucket: str): execute_command(cmd) +def sync(namespace: str, shareable_artifacts_bucket: str): + """Sync resources which are not version controlled to this namespace. + + In some cases, we do not want to version control some data (like Great Expectations artifacts) + but we need to duplicate this data from the main namespace to a development namespace. + + Args: + namespace (str): The development namespace + shareable_artifacts_bucket (str): The S3 bucket containing shareable artifacts + """ + # Copy Great Expectations artifacts to this namespace + source_gx_artifacts = os.path.join( + "s3://", shareable_artifacts_bucket, "main/great_expectation_resources/" + ) + target_gx_artifacts = os.path.join( + "s3://", shareable_artifacts_bucket, namespace, "great_expectation_resources/" + ) + gx_artifacts_clean_up_cmd = ["aws", "s3", "rm", "--recursive", target_gx_artifacts] + execute_command(gx_artifacts_clean_up_cmd) + gx_artifacts_sync_cmd = [ + "aws", + "s3", + "sync", + source_gx_artifacts, + target_gx_artifacts, + ] + execute_command(gx_artifacts_sync_cmd) + + def delete(namespace: str, cfn_bucket: str): """Removes all files recursively for namespace""" s3_path = os.path.join("s3://", cfn_bucket, namespace) @@ -63,6 +93,11 @@ def list_namespaces(cfn_bucket: str): def main(args): if args.upload: upload(args.namespace, args.cfn_bucket) + if args.namespace != "main": + sync( + namespace=args.namespace, + shareable_artifacts_bucket=args.shareable_artifacts_bucket, + ) elif args.remove: delete(args.namespace, args.cfn_bucket) else: diff --git a/templates/glue-job-run-great-expectations-on-parquet.j2 b/templates/glue-job-run-great-expectations-on-parquet.j2 index 455b5e89..a6c4046f 100644 --- a/templates/glue-job-run-great-expectations-on-parquet.j2 +++ b/templates/glue-job-run-great-expectations-on-parquet.j2 @@ -19,9 +19,14 @@ Parameters: Type: String Description: The name or ARN of the IAM role that will run this job. - TempS3Bucket: + ParquetBucket: Type: String - Description: The name of the S3 bucket where temporary files and logs are written. + Description: The name of the S3 bucket where Parquet data is written. This is + also where we will write temporary files and logs. + + ShareableArtifactsBucket: + Type: String + Description: The name of the bucket where shareable artifacts are stored. S3ScriptBucket: Type: String @@ -48,6 +53,10 @@ Parameters: FitbitIntradayCombined and HealthKitV2Samples. Default: 1 + ExpectationSuiteKey: + Type: String + Description: The s3 key prefix of the expectation suite. + MaxRetries: Type: Number Description: How many times to retry the job if it fails (integer). @@ -71,12 +80,11 @@ Resources: {% for v in sceptre_user_data.dataset_schemas.tables.keys() if not "Deleted" in v %} {% set dataset = {} %} {% do dataset.update({"type": v}) %} - {% do dataset.update({"table_name": "dataset_" + v.lower()})%} {% do dataset.update({"stackname_prefix": "{}".format(v.replace("_",""))}) %} {% do datasets.append(dataset) %} {% endfor %} - {% for dataset in datasets %} + {% for dataset in datasets if dataset["type"].lower() in sceptre_user_data.data_values_expectations %} {{ dataset["stackname_prefix"] }}GreatExpectationsParquetJob: Type: AWS::Glue::Job Properties: @@ -84,15 +92,21 @@ Resources: Name: glueetl ScriptLocation: !Sub s3://${S3ScriptBucket}/${S3ScriptKey} DefaultArguments: - --TempDir: !Sub s3://${TempS3Bucket}/tmp + --TempDir: !Sub s3://${ParquetBucket}/tmp --enable-continuous-cloudwatch-log: true --enable-metrics: true --enable-spark-ui: true - --spark-event-logs-path: !Sub s3://${TempS3Bucket}/spark-logs/${AWS::StackName}/ + --spark-event-logs-path: !Sub s3://${ParquetBucket}/spark-logs/${AWS::StackName}/ --job-bookmark-option: job-bookmark-disable --job-language: python --additional-python-modules: !Ref AdditionalPythonModules - # --conf spark.sql.adaptive.enabled + --data-type: {{ dataset["type"].lower() }} + --namespace: !Ref Namespace + --cfn-bucket: !Ref S3ScriptBucket + --parquet-bucket: !Ref ParquetBucket + --shareable-artifacts-bucket: !Ref ShareableArtifactsBucket + --expectation-suite-key: !Ref ExpectationSuiteKey + --gx-resources-key-prefix: !Sub "${Namespace}/great_expectation_resources/parquet/" Description: !Sub "${JobDescription} for data type {{ dataset['type'] }}" GlueVersion: !Ref GlueVersion MaxRetries: !Ref MaxRetries diff --git a/templates/glue-workflow.j2 b/templates/glue-workflow.j2 index 16d8950d..5483368e 100644 --- a/templates/glue-workflow.j2 +++ b/templates/glue-workflow.j2 @@ -40,7 +40,7 @@ Parameters: ParquetKeyPrefix: Type: String - Description: S3 key prefix where JSON datasets are stored. + Description: S3 key prefix where Parquet datasets are stored. Default: parquet GlueDatabase: @@ -82,13 +82,6 @@ Parameters: Description: >- The name of the bucket where the cloudformation and artifacts are stored. - ShareableArtifactsBucketName: - Type: String - Description: The name of the bucket where shareable artifacts are stored. - - ExpectationSuiteKey: - Type: String - Description: The s3 key prefix of the expectation suite. Conditions: IsMainNamespace: !Equals [!Ref Namespace, "main"] @@ -316,14 +309,6 @@ Resources: Name: !Sub "${Namespace}-{{ dataset['stackname_prefix'] }}GreatExpectationsParquetTrigger" Actions: - JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"] }}-GreatExpectationsParquetJob - Arguments: - "--data-type": {{ dataset["data_type"].lower() }} - "--namespace": !Ref Namespace - "--cfn-bucket": !Ref CloudformationBucketName - "--parquet-bucket": !Ref ParquetBucketName - "--shareable-artifacts-bucket": !Ref ShareableArtifactsBucketName - "--expectation-suite-key": !Ref ExpectationSuiteKey - "--additional-python-modules": "great_expectations~=0.18,urllib3<2" Description: This trigger runs the great expectation parquet job for this data type after completion of the JSON to Parquet job for this data type Type: CONDITIONAL Predicate: diff --git a/tests/test_run_great_expectations_on_parquet.py b/tests/test_run_great_expectations_on_parquet.py index 074e1407..fbf078ec 100644 --- a/tests/test_run_great_expectations_on_parquet.py +++ b/tests/test_run_great_expectations_on_parquet.py @@ -1,152 +1,30 @@ -from unittest.mock import MagicMock, patch +import unittest import great_expectations as gx +import pyspark import pytest -from great_expectations.core.batch import RuntimeBatchRequest -from great_expectations.core.run_identifier import RunIdentifier -from great_expectations.core.yaml_handler import YAMLHandler -from great_expectations.data_context.types.resource_identifiers import ( - ExpectationSuiteIdentifier, - ValidationResultIdentifier, -) -from pyspark.sql import SparkSession from src.glue.jobs import run_great_expectations_on_parquet as run_gx_on_pq @pytest.fixture -def test_context(scope="function"): +def gx_context(scope="function"): context = gx.get_context() yield context @pytest.fixture(scope="function") -def test_spark(): - yield SparkSession.builder.appName("BatchRequestTest").getOrCreate() - - -def test_create_context(): - with ( - patch.object(gx, "get_context") as mock_get_context, - patch.object(run_gx_on_pq, "add_datasource") as mock_add_datasource, - patch.object( - run_gx_on_pq, "add_validation_stores" - ) as mock_add_validation_stores, - patch.object(run_gx_on_pq, "add_data_docs_sites") as mock_add_data_docs_sites, - ): - mock_context = MagicMock() - mock_get_context.return_value = mock_context - - s3_bucket = "test-bucket" - namespace = "test-namespace" - key_prefix = "test-prefix" - - # Call the function - result_context = run_gx_on_pq.create_context(s3_bucket, namespace, key_prefix) - - # Assert that the context returned is the mock context - assert result_context == mock_context - - # Assert that the other functions were called - mock_add_datasource.assert_called_once_with(mock_context) - mock_add_validation_stores.assert_called_once_with( - mock_context, s3_bucket, namespace, key_prefix - ) - mock_add_data_docs_sites.assert_called_once_with( - mock_context, s3_bucket, namespace, key_prefix - ) - - -def test_that_add_datasource_calls_correctly(): - mock_context = MagicMock() - result_context = run_gx_on_pq.add_datasource(mock_context) - - # Verify that the datasource was added - mock_context.add_datasource.assert_called_once() - assert result_context == mock_context - - -@pytest.mark.integration -def test_that_add_datasource_adds_correctly(test_context): - # Assuming you've already added a datasource, you can list it - run_gx_on_pq.add_datasource(test_context) - datasources = test_context.list_datasources() - - # Define the expected datasource name - expected_datasource_name = "spark_datasource" - - # Check that the expected datasource is present and other details are correct - assert any( - ds["name"] == expected_datasource_name for ds in datasources - ), f"Datasource '{expected_datasource_name}' was not added correctly." - datasource = next( - ds for ds in datasources if ds["name"] == expected_datasource_name - ) - assert datasource["class_name"] == "Datasource" - assert "SparkDFExecutionEngine" in datasource["execution_engine"]["class_name"] - - -def test_add_validation_stores_has_expected_calls(): - mock_context = MagicMock() - s3_bucket = "test-bucket" - namespace = "test-namespace" - key_prefix = "test-prefix" - - with patch.object(mock_context, "add_store") as mock_add_store: - # Call the function - result_context = run_gx_on_pq.add_validation_stores( - mock_context, s3_bucket, namespace, key_prefix - ) - - # Verify that the validation store is added - mock_add_store.assert_called_once_with( - "validation_result_store", - { - "class_name": "ValidationsStore", - "store_backend": { - "class_name": "TupleS3StoreBackend", - "bucket": s3_bucket, - "prefix": f"{namespace}/{key_prefix}", - }, - }, - ) - - assert result_context == mock_context - - -@pytest.mark.integration -def test_validation_store_details(test_context): - # Mock context and stores - run_gx_on_pq.add_validation_stores( - test_context, - s3_bucket="test-bucket", - namespace="test", - key_prefix="test_folder/", - ) - - # Run the test logic - stores = test_context.list_stores() - expected_store_name = "validation_result_store" - - assert any(store["name"] == expected_store_name for store in stores) - # pulls the store we want - store_config = [store for store in stores if store["name"] == expected_store_name][ - 0 - ] - - assert store_config["class_name"] == "ValidationsStore" - assert store_config["store_backend"]["class_name"] == "TupleS3StoreBackend" - assert store_config["store_backend"]["bucket"] == "test-bucket" - assert store_config["store_backend"]["prefix"] == "test/test_folder/" +def spark_session(): + yield pyspark.sql.SparkSession.builder.appName("BatchRequestTest").getOrCreate() def test_get_spark_df_has_expected_calls(): - glue_context = MagicMock() - mock_dynamic_frame = MagicMock() - mock_spark_df = MagicMock() + glue_context = unittest.mock.MagicMock() + mock_dynamic_frame = unittest.mock.MagicMock() + mock_spark_df = unittest.mock.MagicMock() mock_dynamic_frame.toDF.return_value = mock_spark_df - with patch.object( + with unittest.mock.patch.object( glue_context, "create_dynamic_frame_from_options" ) as mock_create_dynamic_frame: mock_create_dynamic_frame.return_value = mock_dynamic_frame @@ -171,46 +49,13 @@ def test_get_spark_df_has_expected_calls(): assert result_df == mock_spark_df -def test_get_batch_request(): - spark_dataset = MagicMock() +def test_get_batch_request(gx_context): + spark_dataset = unittest.mock.MagicMock() data_type = "test-data" - run_id = RunIdentifier(run_name="2023_09_04") - - batch_request = run_gx_on_pq.get_batch_request(spark_dataset, data_type, run_id) - - # Verify the RuntimeBatchRequest is correctly set up - assert isinstance(batch_request, RuntimeBatchRequest) - assert batch_request.data_asset_name == f"{data_type}-parquet-data-asset" - assert batch_request.batch_identifiers == { - "batch_identifier": f"{data_type}_{run_id.run_name}_batch" - } - assert batch_request.runtime_parameters == {"batch_data": spark_dataset} - - -@pytest.mark.integration -def test_that_get_batch_request_details_are_correct(test_spark): - # Create a simple PySpark DataFrame to simulate the dataset - data = [("Alice", 34), ("Bob", 45), ("Charlie", 29)] - columns = ["name", "age"] - spark_dataset = test_spark.createDataFrame(data, columns) - - # Create a RunIdentifier - run_id = RunIdentifier(run_name="test_run_2023") - - # Call the function and get the RuntimeBatchRequest - data_type = "user_data" - batch_request = run_gx_on_pq.get_batch_request(spark_dataset, data_type, run_id) - - # Assertions to check that the batch request is properly populated - assert isinstance(batch_request, RuntimeBatchRequest) - assert batch_request.datasource_name == "spark_datasource" - assert batch_request.data_connector_name == "runtime_data_connector" - assert batch_request.data_asset_name == "user_data-parquet-data-asset" - assert ( - batch_request.batch_identifiers["batch_identifier"] - == "user_data_test_run_2023_batch" + batch_request = run_gx_on_pq.get_batch_request( + gx_context=gx_context, spark_dataset=spark_dataset, data_type=data_type ) - assert batch_request.runtime_parameters["batch_data"] == spark_dataset + assert isinstance(batch_request, gx.datasource.fluent.batch_request.BatchRequest) def test_read_json_correctly_returns_expected_values(): @@ -218,13 +63,13 @@ def test_read_json_correctly_returns_expected_values(): key = "test-key" # Mock the S3 response - mock_s3_response = MagicMock() + mock_s3_response = unittest.mock.MagicMock() mock_s3_response["Body"].read.return_value = '{"test_key": "test_value"}'.encode( "utf-8" ) # Use patch to mock the boto3 s3 client - with patch("boto3.client") as mock_s3_client: + with unittest.mock.patch("boto3.client") as mock_s3_client: # Mock get_object method mock_s3_client.return_value.get_object.return_value = mock_s3_response @@ -241,7 +86,7 @@ def test_read_json_correctly_returns_expected_values(): def test_that_add_expectations_from_json_has_expected_call(): - mock_context = MagicMock() + mock_context = unittest.mock.MagicMock() # Sample expectations data expectations_data = { @@ -259,39 +104,16 @@ def test_that_add_expectations_from_json_has_expected_call(): data_type = "test-data" # Call the function - run_gx_on_pq.add_expectations_from_json(expectations_data, mock_context, data_type) + run_gx_on_pq.add_expectations_from_json( + expectations_data=expectations_data, context=mock_context + ) # Verify expectations were added to the context mock_context.add_or_update_expectation_suite.assert_called_once() -def test_that_add_expectations_from_json_throws_value_error(): - mock_context = MagicMock() - - # Sample expectations data - expectations_data = { - "not-test-data": { - "expectation_suite_name": "test_suite", - "expectations": [ - { - "expectation_type": "expect_column_to_exist", - "kwargs": {"column": "test_column"}, - }, - ], - } - } - - data_type = "test-data" - with pytest.raises( - ValueError, match="No expectations found for data type 'test-data'" - ): - run_gx_on_pq.add_expectations_from_json( - expectations_data, mock_context, data_type - ) - - @pytest.mark.integration -def test_add_expectations_from_json_adds_details_correctly(test_context): +def test_add_expectations_from_json_adds_details_correctly(gx_context): # Mock expectations data expectations_data = { "user_data": { @@ -309,15 +131,13 @@ def test_add_expectations_from_json_adds_details_correctly(test_context): } } - data_type = "user_data" - # Call the function to add expectations - test_context = run_gx_on_pq.add_expectations_from_json( - expectations_data, test_context, data_type + run_gx_on_pq.add_expectations_from_json( + expectations_data=expectations_data, context=gx_context ) # Retrieve the expectation suite to verify that expectations were added - expectation_suite = test_context.get_expectation_suite("user_data_suite") + expectation_suite = gx_context.get_expectation_suite("user_data_suite") assert expectation_suite.expectation_suite_name == "user_data_suite" assert len(expectation_suite.expectations) == 2 @@ -335,47 +155,3 @@ def test_add_expectations_from_json_adds_details_correctly(test_context): "min_value": 18, "max_value": 65, } - - -def test_that_add_validation_results_to_store_has_expected_calls(): - # Mock the EphemeralDataContext and the necessary components - mock_context = MagicMock() - mock_expectation_suite = MagicMock() - mock_context.get_expectation_suite.return_value = mock_expectation_suite - mock_expectation_suite.expectation_suite_name = "test_suite" - - # Mock the validation result data - validation_result = {"result": "test_result"} - - # Create a mock batch identifier and run identifier - mock_batch_identifier = MagicMock(spec=RuntimeBatchRequest) - mock_run_identifier = MagicMock(spec=RunIdentifier) - - # Call the function with mocked inputs - result_context = run_gx_on_pq.add_validation_results_to_store( - context=mock_context, - expectation_suite_name="test_suite", - validation_result=validation_result, - batch_identifier=mock_batch_identifier, - run_identifier=mock_run_identifier, - ) - - # Assert that the expectation suite was retrieved correctly - mock_context.get_expectation_suite.assert_called_once_with("test_suite") - - expected_expectation_suite_identifier = ExpectationSuiteIdentifier( - expectation_suite_name="test_suite" - ) - expected_validation_result_identifier = ValidationResultIdentifier( - expectation_suite_identifier=expected_expectation_suite_identifier, - batch_identifier=mock_batch_identifier, - run_id=mock_run_identifier, - ) - - # Verify that the validation result was added to the validations store - mock_context.validations_store.set.assert_called_once_with( - expected_validation_result_identifier, validation_result - ) - - # Check that the context is returned - assert result_context == mock_context