How to pass an in-memory DataFrame to a Checkpoint
This guide will help you pass an in-memory DataFrame to an existing CheckpointThe primary means for validating data in a production deployment of Great Expectations.. This is especially useful if you already have your data in memory due to an existing process such as a pipeline runner.
Prerequisites: This how-to guide assumes you have:
- Completed the Getting Started Tutorial
- A working installation of Great Expectations
- Configured a Data Context.
Steps
1. Set up Great Expectations
Import the required libraries and load your DataContext
import pandas as pd
from ruamel import yaml
import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
If you have an existing configured DataContext in your
filesystem in the form of a
great_expectations.yml
file, you can load
it like this:
context = gx.get_context()
If you do not have a filesystem to work with, you can load your DataContext following the instructions in How to instantiate a Data Context without a yml file.
2. Connect to your data
Ensure your DataContext contains a Datasource with a RuntimeDataConnector
In order to pass in a DataFrame at runtime, your
great_expectations.yml
should contain a
DatasourceProvides a standard API for accessing and
interacting with data from a wide variety of
source systems.
configured with a RuntimeDataConnector
.
If it does not, you can add a new Datasource using the
code below:
- YAML
- Python
- CLI
datasource_yaml = r"""
name: taxi_datasource
class_name: Datasource
module_name: great_expectations.datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: PandasExecutionEngine
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
"""
context.add_datasource(**yaml.safe_load(datasource_yaml))
datasource_config = {
"name": "taxi_datasource",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "PandasExecutionEngine",
},
"data_connectors": {
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
},
},
}
context.add_datasource(**datasource_config)
great_expectations datasource new
After running the
CLICommand Line Interface
command above, choose option 1 for "Files
on a filesystem..." and then select whether
you will be passing a Pandas or Spark DataFrame.
Once the Jupyter Notebook opens, change the
datasource_name
to
"taxi_datasource" and run all cells to
save your Datasource configuration.
3. Create Expectations and Validate your data
Create a Checkpoint and pass it the DataFrame at runtime
You will need an Expectation SuiteA collection of verifiable assertions about data. to ValidateThe act of applying an Expectation Suite to a Batch. your data against. If you have not already created an Expectation Suite for your in-memory DataFrame, reference How to create and edit Expectations with instant feedback from a sample Batch of data to create your Expectation Suite.
For the purposes of this guide, we have created an
empty suite named my_expectation_suite
by
running:
context.add_or_update_expectation_suite("my_expectation_suite")
We will now walk through two examples for configuring
a Checkpoint
and passing it an in-memory
DataFrame at runtime.
Example 1: Pass only the
batch_request
's missing keys at
runtime
If we configure a SimpleCheckpoint
that
contains a single batch_request
in
validations
:
- YAML
- Python
checkpoint_yaml = """
name: my_missing_keys_checkpoint
config_version: 1
class_name: SimpleCheckpoint
validations:
- batch_request:
datasource_name: taxi_datasource
data_connector_name: default_runtime_data_connector_name
data_asset_name: taxi_data
expectation_suite_name: my_expectation_suite
"""
context.add_or_update_checkpoint(**yaml.safe_load(checkpoint_yaml))
checkpoint_config = {
"name": "my_missing_keys_checkpoint",
"config_version": 1,
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": {
"datasource_name": "taxi_datasource",
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": "taxi_data",
},
"expectation_suite_name": "my_expectation_suite",
}
],
}
context.add_or_update_checkpoint(**checkpoint_config)
We can then pass the remaining keys for the in-memory
DataFrame (df
) and it's associated
batch_identifiers
at runtime using
batch_request
:
df = pd.read_csv("<PATH TO DATA>")
results = context.run_checkpoint(
checkpoint_name="version-0.15.50 my_missing_keys_checkpoint",
batch_request={
"runtime_parameters": {"batch_data": df},
"batch_identifiers": {
"default_identifier_name": "<your>"
},
},
)
Example 2: Pass a complete
RuntimeBatchRequest
at runtime
If we configure a SimpleCheckpoint
that
does not contain any validations
:
- YAML
- Python
checkpoint_yaml = """
name: my_missing_batch_request_checkpoint
config_version: 1
class_name: SimpleCheckpoint
expectation_suite_name: my_expectation_suite
"""
context.add_or_update_checkpoint(**yaml.safe_load(checkpoint_yaml))
checkpoint_config = {
"name": "my_missing_batch_request_checkpoint",
"config_version": 1,
"class_name": "SimpleCheckpoint",
"expectation_suite_name": "my_expectation_suite",
}
context.add_or_update_checkpoint(**checkpoint_config)
We can pass one or more
RuntimeBatchRequest
s into
validations
at runtime. Here is an
example that passes multiple
batch_request
s into
validations
:
df_1 = pd.read_csv("<PATH TO DATA 1>")
df_2 = pd.read_csv("<PATH TO DATA 2>")
batch_request_1 = RuntimeBatchRequest(
datasource_name="version-0.15.50 taxi_datasource",
data_connector_name="version-0.15.50 default_runtime_data_connector_name",
data_asset_name="version-0.15.50 <your>", # This can be anything that identifies this data_asset for you
runtime_parameters={"batch_data": df_1}, # Pass your DataFrame here.
batch_identifiers={"default_identifier_name": "<your>"},
)
batch_request_2 = RuntimeBatchRequest(
datasource_name="version-0.15.50 taxi_datasource",
data_connector_name="version-0.15.50 default_runtime_data_connector_name",
data_asset_name="version-0.15.50 <your>", # This can be anything that identifies this data_asset for you
runtime_parameters={"batch_data": df_2}, # Pass your DataFrame here.
batch_identifiers={"default_identifier_name": "<your>"},
)
results = context.run_checkpoint(
checkpoint_name="version-0.15.50 my_missing_batch_request_checkpoint",
validations=[
{"batch_request": batch_request_1},
{"batch_request": batch_request_2},
],
)
Additional Notes
To view the full script used in this page, see it on GitHub: