How to Use Great Expectations in AWS Glue
Great Expectations is aware that the fluent datasource and yaml config workflows are undocumented. A new method for connecting to data sources will be added to this topic soon. In the meantime, use the Great Expectations Slack channel if you encounter issues and need assistance.
This Guide demonstrates how to set up, initialize and run validations against your data on AWS Glue Spark Job. We will cover case with RuntimeDataConnector and use S3 as metadata store.
0. Pre-requirements
- Configure great_expectations.yaml and upload to your S3 bucket or generate it dynamically from code
config_version: 3.0
datasources:
spark_s3:
module_name: great_expectations.datasource
class_name: Datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: SparkDFExecutionEngine
data_connectors:
default_inferred_data_connector_name:
class_name: InferredAssetS3DataConnector
bucket: bucket_name
prefix: data_folder_prefix/
default_regex:
pattern: (.*)
group_names:
- data_asset_name
default_runtime_data_connector_name:
batch_identifiers:
- runtime_batch_identifier_name
module_name: great_expectations.datasource.data_connector
class_name: RuntimeDataConnector
config_variables_file_path: great_expectations/uncommitted/config_variables.yml
plugins_directory: great_expectations/plugins/
stores:
expectations_S3_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'path_from_root/great_expectations/expectations/'
validations_S3_store:
class_name: ValidationsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'path_from_root/great_expectations/uncommitted/validations/'
evaluation_parameter_store:
class_name: EvaluationParameterStore
checkpoint_S3_store:
class_name: CheckpointStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'path_from_root/great_expectations/checkpoints/'
expectations_store_name: expectations_S3_store
validations_store_name: validations_S3_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_S3_store
data_docs_sites:
s3_site:
class_name: SiteBuilder
show_how_to_buttons: false
store_backend:
class_name: TupleS3StoreBackend
bucket: bucket_name
site_index_builder:
class_name: DefaultSiteIndexBuilder
anonymous_usage_statistics:
enabled: True
1. Install Great Expectations
You need to add to your AWS Glue Spark Job Parameters to install great expectations module. Glue at least v2
— additional-python-modules great_expectations
Then import necessary libs:
import boto3
from awsglue.context import GlueContext
from pyspark.context import SparkContext
import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.core.yaml_handler import YAMLHandler
from great_expectations.data_context.types.base import (
DataContextConfig,
S3StoreBackendDefaults,
)
from great_expectations.util import get_context
yaml = YAMLHandler()
2. Set up Great Expectations
Here we initialize a Spark and Glue, and read great_expectations.yaml
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
s3_client = boto3.client("s3")
response = s3_client.get_object(
Bucket="bucket", Key="bucket/great_expectations/great_expectations.yml"
)
config_file = yaml.load(response["Body"])
3. Connect to your data
config = DataContextConfig(
config_version=config_file["config_version"],
datasources=config_file["datasources"],
expectations_store_name=config_file["expectations_store_name"],
validations_store_name=config_file["validations_store_name"],
evaluation_parameter_store_name=config_file["evaluation_parameter_store_name"],
plugins_directory="/great_expectations/plugins",
validation_operators=config_file["validation_operators"],
stores=config_file["stores"],
data_docs_sites=config_file["data_docs_sites"],
config_variables_file_path=config_file["config_variables_file_path"],
anonymous_usage_statistics=config_file["anonymous_usage_statistics"],
checkpoint_store_name=config_file["checkpoint_store_name"],
store_backend_defaults=S3StoreBackendDefaults(
default_bucket_name=config_file["data_docs_sites"]["s3_site"]["store_backend"][
"bucket"
]
),
)
context_gx = get_context(project_config=config)
4. Create Expectations
expectation_suite_name = "version-0.16.16 suite_name"
suite = context_gx.add_expectation_suite(expectation_suite_name)
batch_request = RuntimeBatchRequest(
datasource_name="version-0.16.16 spark_s3",
data_asset_name="version-0.16.16 datafile_name",
batch_identifiers={"runtime_batch_identifier_name": "default_identifier"},
data_connector_name="version-0.16.16 default_inferred_data_connector_name",
runtime_parameters={"path": "s3a://bucket_name/path_to_file.format"},
)
validator = context_gx.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
validator.expect_column_values_to_not_be_null(
column="passenger_count"
) ## add some test
validator.save_expectation_suite(discard_failed_expectations=False)
5. Validate your data
checkpoint_config = {
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
}
checkpoint = SimpleCheckpoint(
f"_tmp_checkpoint_{expectation_suite_name}", context_gx, **checkpoint_config
)
results = checkpoint.run(result_format="SUMMARY", run_name="version-0.16.16 test")
validation_result_identifier = results.list_validation_result_identifiers()[0]
6. Congratulations!
Your data docs built on S3 and you can see index.html at the bucket