How to Use Great Expectations with Airflow
Learn how to run a Great Expectations checkpoint in Apache Airflow, and how to use an Expectation Suite within an Airflow directed acyclic graphs (DAG) to trigger a data asset validation.
Airflow is a data orchestration tool for creating and
maintaining data pipelines through DAGs written in
Python. DAGs complete work through operators, which
are templates that encapsulate a specific type of
work. This document explains how to use the
GreatExpectationsOperator
to perform data
quality work in an Airflow DAG.
Before you create your DAG, make sure you have a Data Context and Checkpoint configured. A Data Context represents a Great Expectations project. It organizes storage and access for Expectation Suites, Datasources, notification settings, and data fixtures. Checkpoints provide a convenient abstraction for bundling the validation of a Batch (or Batches) of data against an Expectation Suite (or several), as well as the actions that should be taken after the validation.
This guide focuses on using Great Expectations with Airflow in a self-hosted environment. To use Great Expectations with Airflow within Astronomer, see Orchestrate Great Expectations with Airflow.
Prerequisites
Install the GreatExpectationsOperator
Run the following command to install the Great Expectations provider in your Airflow environment:
pip install airflow-provider-great-expectations==0.1.1
GX recommends specifying a version when installing the package. To make use of the latest Great Expectations provider for Airflow, specify version 0.1.0 or later.
The current Great Expectations release requires
Airflow 2.1 or later. If you're still running
Airflow 1.x, you need to upgrade to 2.1 or later
before using the
GreatExpectationsOperator
.
Use the GreatExpectationsOperator
Before you can use the
GreatExpectationsOperator
, you need to
import it into your DAG. Depending on how you're
using the operator, you might need to import the
DataContextConfig
,
CheckpointConfig
, or
BatchRequest
classes. To import the Great
Expectations provider and config and batch classes in
a given DAG, add the following line to the top of the
DAG file in your dags
directory:
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context.types.base import (
DataContextConfig,
CheckpointConfig
)
To use the operator in the DAG, define an instance of
the GreatExpectationsOperator
class and
assign it to a variable. In the following example, two
different instances of the operator are defined to
complete two different steps in a data quality check
workflow:
ge_data_context_root_dir_with_checkpoint_name_pass = GreatExpectationsOperator(
task_id="ge_data_context_root_dir_with_checkpoint_name_pass",
data_context_root_dir=ge_root_dir,
checkpoint_name="version-0.16.16 taxi.pass.chk",
)
ge_data_context_config_with_checkpoint_config_pass = GreatExpectationsOperator(
task_id="ge_data_context_config_with_checkpoint_config_pass",
data_context_config=example_data_context_config,
checkpoint_config=example_checkpoint_config,
)
After you define your work with operators, you define
a
relationship
to specify the order that your DAG completes the work.
For example, adding the following code to your DAG
ensures that your name pass
task has to
complete before your config pass
task can
start:
ge_data_context_root_dir_with_checkpoint_name_pass >> ge_data_context_config_with_checkpoint_config_pass
Operator parameters
The operator has several optional parameters, but it
always requires a
data_context_root_dir
or a
data_context_config
and a
checkpoint_name
or
checkpoint_config
.
The data_context_root_dir
should point to
the great_expectations
project directory
that was generated when you created the project. If
you're using an in-memory
data_context_config
, a
DataContextConfig
must be defined. See
this example.
A checkpoint_name
references a checkpoint
in the project CheckpointStore defined in the
DataContext (which is often the
great_expectations/checkpoints/
path), so
that a
checkpoint_name = "version-0.16.16
taxi.pass.chk"
would reference the file
great_expectations/checkpoints/taxi/pass/chk.yml
. With a checkpoint_name
,
checkpoint_kwargs
can be passed to the
operator to specify additional, overwriting
configurations. A checkpoint_config
can
be passed to the operator in place of a name, and is
defined like
this example.
For a full list of parameters, see GreatExpectationsOperator.
Connections and backends
The GreatExpectationsOperator
can run a
checkpoint on a dataset stored in any backend that is
compatible with Great Expectations. All that’s needed
to get the Operator to point to an external dataset is
to set up an
Airflow Connection
to the Datasource, and adding the connection to your
Great Expectations project. If you're using a
DataContextConfig
or
CheckpointConfig
, ensure that the
"datasources"
field references
your backend connection name.