How to Use Great Expectations with Prefect
This guide will help you run a Great Expectations with Prefect
Prerequisites: This how-to guide assumes you have:
Prefect is a workflow management system that enables data engineers to build robust data applications. The Prefect open source library allows users to create workflows using Python and makes it easy to take your data pipelines and add semantics like retries, logging, dynamic mapping, caching, and failure notifications. Prefect Cloud is the easy, powerful, scalable way to automate and monitor dataflows built in Prefect 1.0 — without having to worry about orchestration infrastructure.
Great Expectations validations can be used to validate data passed between tasks in your Prefect flow. By validating your data before operating on it, you can quickly find issues with your data with less debugging. Prefect makes it easy to combine Great Expectations with other services in your data stack and orchestrate them all in a predictable manner.
The RunGreatExpectationsValidation
task
With Prefect, you define your workflows with
tasks
and
flows. A Task
represents a discrete action in
a Prefect workflow. A Flow
is a container
for Tasks
. It represents an entire
workflow or application by describing the dependencies
between tasks. Prefect offers a suite of over 180
pre-built tasks in the
Prefect Task Library. The
RunGreatExpectationsValidation
task is one of these pre-built tasks. With the
RunGreatExpectationsValidation
task you
can run validations for an existing Great Expectations
project.
To use the
RunGreatExpectationsValidation
, you need
to install Prefect with the ge
extra:
pip install "prefect[ge]"
Here is an example of a flow that runs a Great Expectations validation:
from prefect import Flow, Parameter
from prefect.tasks.great_expectations import RunGreatExpectationsValidation
validation_task = RunGreatExpectationsValidation()
with Flow("ge_test") as flow:
checkpoint_name = Parameter("checkpoint_name")
prev_run_row_count = 100
validation_task(
checkpoint_name=checkpoint_name,
evaluation_parameters=dict(prev_run_row_count=prev_run_row_count),
)
flow.run(parameters={"checkpoint_name": "my_checkpoint"})
Using the
RunGreatExpectationsValidation
task is as
easy as importing the task, instantiating the task,
and calling it in your flow. In the flow above, we
parameterize our flow with the checkpoint name. This
way, we're able to reuse our flow to run
different Great Expectations validations based on the
input.
Configuring the root context directory
By default, the
RunGreatExpectationsValidation
task will
look in the current directory for a Great Expectations
project in a folder named
great_expectations
. If your
great_expectations.yml
is located in
another directory, you can configure the
RunGreatExpectationsValidation
tasks with
the context_root_dir
argument:
from prefect import Flow, Parameter
from prefect.tasks.great_expectations import RunGreatExpectationsValidation
validation_task = RunGreatExpectationsValidation()
with Flow("ge_test") as flow:
checkpoint_name = Parameter("checkpoint_name")
prev_run_row_count = 100
validation_task(
checkpoint_name=checkpoint_name,
evaluation_parameters=dict(prev_run_row_count=prev_run_row_count),
context_root_dir="../great_expectations"
)
flow.run(parameters={"checkpoint_name": "my_checkpoint"})
Using dynamic runtime configuration
The RunGreatExpectationsValidation
task
also enables runtime configuration of your validation
run. You can pass in an in memory
DataContext
via the
context
argument or pass an in memory
Checkpoint
via the
ge_checkpoint
argument.
Here is an example with an in memory
DataContext
:
import os
from pathlib import Path
import great_expectations as ge
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
)
from prefect import Flow, Parameter, task
from prefect.tasks.great_expectations import RunGreatExpectationsValidation
@task
def create_in_memory_data_context(project_path: Path, data_path: Path):
data_context = BaseDataContext(
project_config=DataContextConfig(
**{
"config_version": 3.0,
"datasources": {
"data__dir": {
"module_name": "great_expectations.datasource",
"data_connectors": {
"data__dir_example_data_connector": {
"default_regex": {
"group_names": ["data_asset_name"],
"pattern": "(.*)",
},
"base_directory": str(data_path),
"module_name": "great_expectations.datasource.data_connector",
"class_name": "InferredAssetFilesystemDataConnector",
},
"default_runtime_data_connector_name": {
"batch_identifiers": ["default_identifier_name"],
"module_name": "great_expectations.datasource.data_connector",
"class_name": "RuntimeDataConnector",
},
},
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "PandasExecutionEngine",
},
"class_name": "Datasource",
}
},
"config_variables_file_path": str(
project_path / "uncommitted" / "config_variables.yml"
),
"stores": {
"expectations_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": str(
project_path / "expectations"
),
},
},
"validations_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": str(
project_path / "uncommitted" / "validations"
),
},
},
"evaluation_parameter_store": {
"class_name": "EvaluationParameterStore"
},
"checkpoint_store": {
"class_name": "CheckpointStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"suppress_store_backend_id": True,
"base_directory": str(
project_path / "checkpoints"
),
},
},
},
"expectations_store_name": "expectations_store",
"validations_store_name": "validations_store",
"evaluation_parameter_store_name": "evaluation_parameter_store",
"checkpoint_store_name": "checkpoint_store",
"data_docs_sites": {
"local_site": {
"class_name": "SiteBuilder",
"show_how_to_buttons": True,
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": str(
project_path / "uncommitted" / "data_docs" / "local_site"
),
},
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
}
},
"anonymous_usage_statistics": {
"data_context_id": "abcdabcd-1111-2222-3333-abcdabcdabcd",
"enabled": False,
},
"notebooks": None,
"concurrency": {"enabled": False},
}
)
)
return data_context
validation_task = RunGreatExpectationsValidation()
with Flow("ge_test") as flow:
checkpoint_name = Parameter("checkpoint_name")
prev_run_row_count = 100
data_context = create_in_memory_data_context(project_path=Path.cwd(), data_path=Path.cwd().parent)
validation_task(
checkpoint_name=checkpoint_name,
evaluation_parameters=dict(prev_run_row_count=prev_run_row_count),
context=data_context
)
flow.run(parameters={"checkpoint_name": "my_checkpoint"})
Validating in memory data
Because Prefect allows first class passing of data
between tasks, you can even use the
RunGreatExpectationsValidation
task on in
memory dataframes! This means you won't need to
write to and read data from remote storage between
steps of your pipeline.
Here is an example of how to run a validation on an in
memory dataframe by passing in a
RuntimeBatchRequest
via the
checkpoint_kwargs
argument:
from great_expectations.core.batch import RuntimeBatchRequest
import pandas as pd
from prefect import Flow, Parameter, task
from prefect.tasks.great_expectations import RunGreatExpectationsValidation
validation_task = RunGreatExpectationsValidation()
@task
def create_runtime_batch_request(df: pd.DataFrame):
return RuntimeBatchRequest(
datasource_name="data__dir",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="yellow_tripdata_sample_2019-02_df",
runtime_parameters={"batch_data": df},
batch_identifiers={
"default_identifier_name": "ingestion step 1",
},
)
with Flow("ge_test") as flow:
checkpoint_name = Parameter("checkpoint_name")
prev_run_row_count = 100
df = dataframe_creation_task()
in_memory_runtime_batch_request = create_runtime_batch_request(df)
validation_task(
checkpoint_name=checkpoint_name,
evaluation_parameters=dict(prev_run_row_count=prev_run_row_count),
checkpoint_kwargs={
"validations": [
{
"batch_request": in_memory_runtime_batch_request,
"expectation_suite_name": "taxi.demo_pass",
}
]
},
)
flow.run(parameters={"checkpoint_name": "my_checkpoint"})
Where to go for more information
The flexibility that Prefect and the
RunGreatExpectationsValidation
task offer
makes it easy to incorporate data validation into your
dataflows with Great Expectations.
For more info about the
RunGreatExpectationsValidation
task,
refer to the
Prefect documentation.