Skip to content

Commit

Permalink
get GX config template from S3
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Oct 29, 2024
1 parent 9b8abbf commit 4648031
Showing 1 changed file with 57 additions and 52 deletions.
109 changes: 57 additions & 52 deletions src/glue/jobs/run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
import json
import logging
import os
import subprocess
import sys
from datetime import datetime
from typing import Dict

import boto3
import great_expectations as gx
import pyspark
import yaml
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.core.yaml_handler import YAMLHandler
from great_expectations.data_context.types.base import DataContextConfig
from great_expectations.data_context.types.resource_identifiers import (
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from pyspark.context import SparkContext


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -64,35 +56,47 @@ def validate_args(value: str) -> None:
return None


def update_data_docs_sites(
context: gx.data_context.AbstractDataContext,
s3_bucket: str,
def configure_gx_config(
gx_config_bucket: str,
gx_config_key: str,
shareable_artifacts_bucket: str,
namespace: str,
) -> gx.data_context.AbstractDataContext:
"""
Updates the `data_docs_sites` configuration to reflect the appropriate environment and namespace
) -> None:
"""Download and configure a `great_expectations.yml` file locally
This function will download a `great_expectations.yml` file from S3 to a `gx` directory.
This file will be automatically be used to configue the GX data context when calling
`gx.get_context()`.
Args:
context (gx.data_context.AbstractDataContext): The GX data context to update
s3_bucket (str): The S3 bucket where data docs are written
gx_config_bucket (str): S3 bucket containing the `great_expectations.yml` file.
gx_config_key (str): S3 key where this file is located.
shareable_artifacts_bucket (str): S3 bucket where shareable artifacts are written.
namespace (str): The current namespace
Returns:
gx.data_context.AbstractDataContext: The updated GX data context object
"""
context.update_data_docs_site(
site_name="s3_site",
site_config={
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": s3_bucket,
"prefix": f"{namespace}/great_expectation_reports/parquet/",
},
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
},
gx_config_path = "gx/great_expectations.yml"
os.makedirs("gx", exist_ok=True)
s3_client = boto3.client("s3")
logger.info(
f"Downloading s3://{gx_config_bucket}/{gx_config_key} to {gx_config_path}"
)
return context
s3_client.download_file(
Bucket=gx_config_bucket, Key=gx_config_key, Filename=gx_config_path
)
with open(gx_config_path, "rb") as gx_config_obj:
gx_config = yaml.safe_load(gx_config_obj)
# fmt: off
gx_config["stores"]["validations_store"]["store_backend"]["bucket"] = shareable_artifacts_bucket
gx_config["stores"]["validations_store"]["store_backend"]["prefix"] = (
gx_config["stores"]["validations_store"]["store_backend"]["prefix"].format(namespace=namespace)
)
gx_config["data_docs_sites"]["s3_site"]["store_backend"]["bucket"] = shareable_artifacts_bucket
gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"] = (
gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"].format(namespace=namespace)
)
# fmt: on
with open(gx_config_path, "w", encoding="utf-8") as gx_config_obj:
yaml.dump(gx_config, gx_config_obj)


def get_spark_df(
Expand Down Expand Up @@ -195,7 +199,7 @@ def add_expectations_from_json(

# Convert new expectations from dict to ExpectationConfiguration objects
new_expectations_configs = [
ExpectationConfiguration(
gx.core.expectation_configuration.ExpectationConfiguration(
expectation_type=exp["expectation_type"], kwargs=exp["kwargs"]
)
for exp in new_expectations
Expand All @@ -211,29 +215,30 @@ def add_expectations_from_json(

def main():
args = read_args()
# args = {
# "cfn_bucket": "recover-dev-cloudformation",
# "data_type": "fitbitdailydata",
# "expectation_suite_key": "etl-686/src/glue/resources/data_values_expectations.json",
# "gx_config_key": "etl-686/src/glue/resources/great_expectations.yml",
# "namespace": "etl-686",
# "parquet_bucket": "recover-dev-processed-data",
# "shareable_artifacts_bucket": "recover-dev-shareable-artifacts-vpn",
# }
s3 = boto3.client("s3")
# Download GX stores and configuration
subprocess.run(
args=[
"aws",
"s3",
"sync",
f"s3://{os.path.join(args['shareable_artifacts_bucket'], args['gx_resources_key_prefix'])}",
".",
],
check=True,
run_id = gx.core.run_identifier.RunIdentifier(
run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
expectation_suite_name = f"{args['data_type']}_expectations"

# Set up Great Expectations
gx_context = gx.get_context()
logger.info("update_data_docs_site")
gx_context = update_data_docs_sites(
context=gx_context,
s3_bucket=args["shareable_artifacts_bucket"],
logger.info("configure_gx_config")
configure_gx_config(
gx_config_bucket=args["cfn_bucket"],
gx_config_key=args["gx_config_key"],
shareable_artifacts_bucket=args["shareable_artifacts_bucket"],
namespace=args["namespace"],
)
gx_context = gx.get_context()
logger.info("reads_expectations_from_json")
expectations_data = read_json(
s3=s3,
Expand All @@ -247,7 +252,7 @@ def main():
)

# Set up Spark
glue_context = GlueContext(SparkContext.getOrCreate())
glue_context = GlueContext(pyspark.context.SparkContext.getOrCreate())
logger.info("get_spark_df")
spark_df = get_spark_df(
glue_context=glue_context,
Expand Down

0 comments on commit 4648031

Please sign in to comment.