How to Use Great Expectations in EMR Serverless
This Guide demonstrates how to set up, initialize and run validations against your data on AWS EMR Serverless. We will cover case with RuntimeDataConnector and use S3 as metadata store.
0. Pre-requirements
- Configure great_expectations.yaml and upload to your S3 bucket or generate it dynamically from code, notice critical moment, that you need to add endpoint_url to data_doc section
config_version: 3.0
datasources:
spark_s3:
module_name: great_expectations.datasource
class_name: Datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: SparkDFExecutionEngine
data_connectors:
default_inferred_data_connector_name:
class_name: InferredAssetS3DataConnector
bucket: bucket_name
prefix: data_folder/
default_regex:
pattern: (.*)
group_names:
- data_asset_name
default_runtime_data_connector_name:
batch_identifiers:
- runtime_batch_identifier_name
module_name: great_expectations.datasource.data_connector
class_name: RuntimeDataConnector
config_variables_file_path: great_expectations/uncommitted/config_variables.yml
plugins_directory: great_expectations/plugins/
stores:
expectations_S3_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'great_expectations/expectations/'
validations_S3_store:
class_name: ValidationsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'great_expectations/uncommitted/validations/'
evaluation_parameter_store:
class_name: EvaluationParameterStore
checkpoint_S3_store:
class_name: CheckpointStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'great_expectations/checkpoints/'
expectations_store_name: expectations_S3_store
validations_store_name: validations_S3_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_S3_store
data_docs_sites:
s3_site:
class_name: SiteBuilder
show_how_to_buttons: false
store_backend:
class_name: TupleS3StoreBackend
bucket: crystall-profiling-test
boto3_options:
endpoint_url: https://bucket_name.s3.region.amazonaws.com/
site_index_builder:
class_name: DefaultSiteIndexBuilder
anonymous_usage_statistics:
enabled: True
1. Install Great Expectations
Create a Dockerfile and build it to generate virtualenv archive and upload this tar.gz output to S3 bucket. At requirements.txt you should have great_expectations package and everything else what you want to install
FROM --platform=linux/amd64 amazonlinux:2 AS base
RUN yum install -y python3
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
COPY ./requirements.txt /
RUN python3 -m pip install --upgrade pip && \
python3 -m pip install -r requirements.txt --no-cache-dir
RUN mkdir /output && venv-pack -o /output/pyspark_ge.tar.gz
FROM scratch AS export
COPY --from=base /output/pyspark_ge.tar.gz /
When you will configure a job, it's necessary to define additional params to Spark properties:
--conf spark.archives=s3://bucket/folder/pyspark_ge.tar.gz#environment
--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
Then import necessary libs:
import boto3
import yaml
from pyspark import SQLContext
import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context.types.base import (
DataContextConfig,
S3StoreBackendDefaults,
)
from great_expectations.util import get_context
2. Set up Great Expectations
Here we initialize a Spark, and read great_expectations.yaml
if __name__ == "__main__":
### critical part to reinitialize spark context
sc = gx.core.util.get_or_create_spark_application()
spark = SQLContext(sc)
spark_file = "pyspark_df.parquet"
suite_name = "version-0.15.50 pandas_spark_suite"
session = boto3.Session()
s3_client = session.client("s3")
response = s3_client.get_object(
Bucket="bucket_name",
Key="bucket_name/great_expectations/great_expectations.yml",
)
config_file = yaml.safe_load(response["Body"])
3. Connect to your data
df_spark = spark.read.parquet("s3://bucket_name/data_folder/" + spark_file)
config = DataContextConfig(
config_version=config_file["config_version"],
datasources=config_file["datasources"],
expectations_store_name=config_file["expectations_store_name"],
validations_store_name=config_file["validations_store_name"],
evaluation_parameter_store_name=config_file["evaluation_parameter_store_name"],
plugins_directory="/great_expectations/plugins",
stores=config_file["stores"],
data_docs_sites=config_file["data_docs_sites"],
config_variables_file_path=config_file["config_variables_file_path"],
anonymous_usage_statistics=config_file["anonymous_usage_statistics"],
checkpoint_store_name=config_file["checkpoint_store_name"],
store_backend_defaults=S3StoreBackendDefaults(
default_bucket_name=config_file["data_docs_sites"]["s3_site"][
"store_backend"
]["bucket"]
),
)
context_gx = get_context(project_config=config)
4. Create Expectations
expectation_suite_name = suite_name
suite = context_gx.get_expectation_suite(suite_name)
batch_request = RuntimeBatchRequest(
datasource_name="version-0.15.50 spark_s3",
data_connector_name="version-0.15.50 default_inferred_data_connector_name",
data_asset_name="version-0.15.50 datafile_name",
batch_identifiers={"runtime_batch_identifier_name": "default_identifier"},
runtime_parameters={"path": "s3a://bucket_name/path_to_file.format"},
)
validator = context_gx.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
validator.expect_column_values_to_not_be_null(
column="passenger_count"
) ## add some test
validator.save_expectation_suite(discard_failed_expectations=False)
5. Validate your data
my_checkpoint_name = "version-0.15.50 in_memory_checkpoint"
python_config = {
"name": my_checkpoint_name,
"class_name": "Checkpoint",
"config_version": 1,
"run_name_template": "%Y%m%d-%H%M%S-my-run-name-template",
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
],
"validations": [
{
"batch_request": {
"datasource_name": "spark_s3",
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": "pyspark_df",
},
"expectation_suite_name": expectation_suite_name,
}
],
}
context_gx.add_or_update_checkpoint(**python_config)
results = context_gx.run_checkpoint(
checkpoint_name=my_checkpoint_name,
run_name="version-0.15.50 run_name",
batch_request={
"runtime_parameters": {"batch_data": df_spark},
"batch_identifiers": {
"runtime_batch_identifier_name": "default_identifier"
},
},
)
validation_result_identifier = results.list_validation_result_identifiers()[0]
context_gx.build_data_docs()
6. Congratulations!
Your data docs built on S3 and you can see index.html at the bucket