Skip to main content
Version: 0.15.50

How to pass an in-memory DataFrame to a Checkpoint

This guide will help you pass an in-memory DataFrame to an existing CheckpointThe primary means for validating data in a production deployment of Great Expectations.. This is especially useful if you already have your data in memory due to an existing process such as a pipeline runner.

Prerequisites: This how-to guide assumes you have:

Steps

1. Set up Great Expectations

Import the required libraries and load your DataContext

import pandas as pd
from ruamel import yaml

import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest

If you have an existing configured DataContext in your filesystem in the form of a great_expectations.yml file, you can load it like this:

context = gx.get_context()

If you do not have a filesystem to work with, you can load your DataContext following the instructions in How to instantiate a Data Context without a yml file.

2. Connect to your data

Ensure your DataContext contains a Datasource with a RuntimeDataConnector

In order to pass in a DataFrame at runtime, your great_expectations.yml should contain a DatasourceProvides a standard API for accessing and interacting with data from a wide variety of source systems. configured with a RuntimeDataConnector. If it does not, you can add a new Datasource using the code below:

datasource_yaml = r"""
name: taxi_datasource
class_name: Datasource
module_name: great_expectations.datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: PandasExecutionEngine
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
"""
context.add_datasource(**yaml.safe_load(datasource_yaml))

3. Create Expectations and Validate your data

Create a Checkpoint and pass it the DataFrame at runtime

You will need an Expectation SuiteA collection of verifiable assertions about data. to ValidateThe act of applying an Expectation Suite to a Batch. your data against. If you have not already created an Expectation Suite for your in-memory DataFrame, reference How to create and edit Expectations with instant feedback from a sample Batch of data to create your Expectation Suite.

For the purposes of this guide, we have created an empty suite named my_expectation_suite by running:

context.add_or_update_expectation_suite("my_expectation_suite")

We will now walk through two examples for configuring a Checkpoint and passing it an in-memory DataFrame at runtime.

Example 1: Pass only the batch_request's missing keys at runtime

If we configure a SimpleCheckpoint that contains a single batch_request in validations:

checkpoint_yaml = """
name: my_missing_keys_checkpoint
config_version: 1
class_name: SimpleCheckpoint
validations:
- batch_request:
datasource_name: taxi_datasource
data_connector_name: default_runtime_data_connector_name
data_asset_name: taxi_data
expectation_suite_name: my_expectation_suite
"""
context.add_or_update_checkpoint(**yaml.safe_load(checkpoint_yaml))

We can then pass the remaining keys for the in-memory DataFrame (df) and it's associated batch_identifiers at runtime using batch_request:

df = pd.read_csv("<PATH TO DATA>")
results = context.run_checkpoint(
checkpoint_name="version-0.15.50 my_missing_keys_checkpoint",
batch_request={
"runtime_parameters": {"batch_data": df},
"batch_identifiers": {
"default_identifier_name": "<your>"
},
},
)

Example 2: Pass a complete RuntimeBatchRequest at runtime

If we configure a SimpleCheckpoint that does not contain any validations:

checkpoint_yaml = """
name: my_missing_batch_request_checkpoint
config_version: 1
class_name: SimpleCheckpoint
expectation_suite_name: my_expectation_suite
"""
context.add_or_update_checkpoint(**yaml.safe_load(checkpoint_yaml))

We can pass one or more RuntimeBatchRequests into validations at runtime. Here is an example that passes multiple batch_requests into validations:

df_1 = pd.read_csv("<PATH TO DATA 1>")
df_2 = pd.read_csv("<PATH TO DATA 2>")
batch_request_1 = RuntimeBatchRequest(
datasource_name="version-0.15.50 taxi_datasource",
data_connector_name="version-0.15.50 default_runtime_data_connector_name",
data_asset_name="version-0.15.50 <your>", # This can be anything that identifies this data_asset for you
runtime_parameters={"batch_data": df_1}, # Pass your DataFrame here.
batch_identifiers={"default_identifier_name": "<your>"},
)

batch_request_2 = RuntimeBatchRequest(
datasource_name="version-0.15.50 taxi_datasource",
data_connector_name="version-0.15.50 default_runtime_data_connector_name",
data_asset_name="version-0.15.50 <your>", # This can be anything that identifies this data_asset for you
runtime_parameters={"batch_data": df_2}, # Pass your DataFrame here.
batch_identifiers={"default_identifier_name": "<your>"},
)

results = context.run_checkpoint(
checkpoint_name="version-0.15.50 my_missing_batch_request_checkpoint",
validations=[
{"batch_request": batch_request_1},
{"batch_request": batch_request_2},
],
)

Additional Notes

To view the full script used in this page, see it on GitHub: