How to Use Great Expectations in Databricks
Great Expectations works well with many types of Databricks workflows. This guide will help you run Great Expectations in Databricks.
Prerequisites
- Completed the Quickstart guide
- Have completed Databricks setup including having a running Databricks cluster with attached notebook
- If you are using the file based version of this guide, you'll need to have DBFS set up
We will cover a basic configuration to get you up and running quickly, and link to our other guides for more customized configurations. For example:
-
If you want to validate files stored in DBFS select
one of the "File" tabs below.
- If you are using a different file store (e.g. s3, GCS, ABS) take a look at our integration guides for those respective file stores.
- If you already have a Spark DataFrame loaded, select one of the "DataFrame" tabs below.
1. Install Great Expectations
Install Great Expectations as a notebook-scoped library by running the following command in your notebook:
%pip install great-expectations
What is a notebook-scoped library?
After that we will take care of some imports that will be used later. Choose your configuration options to show applicable imports:
- File
- DataFrame
import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint
import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint
2. Set up Great Expectations
In this guide, we will be using the Databricks File System (DBFS) for your Metadata Stores and Data DocsHuman readable documentation generated from Great Expectations metadata detailing Expectations, Validation Results, etc. store. This is a simple way to get up and running within the Databricks environment without configuring external resources. For other options for storing data see our "Metadata Stores" and "Data Docs" sections in the "How to Guides" for "Setting up Great Expectations."
What is DBFS?
Run the following code to set up a Data ContextThe primary entry point for a Great Expectations deployment, with configurations and methods for all supporting components. in code using the appropriate defaults:
- File
- DataFrame
context_root_dir = "/dbfs/great_expectations/"
context = gx.get_context(context_root_dir=context_root_dir)
context_root_dir = "/dbfs/great_expectations/"
context = gx.get_context(context_root_dir=context_root_dir)
3. Prepare your data
- File
- DataFrame
We will use our familiar NYC taxi yellow cab data, which is available as sample data in Databricks. Let's copy some example csv data to our DBFS folder for easier access using dbutils:
# Copy 3 months of data
for month in range(1, 4):
dbutils.fs.cp(
f"/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-0{month}.csv.gz",
f"/example_data/nyctaxi/tripdata/yellow/yellow_tripdata_2019-0{month}.csv.gz"
)
We will use our familiar NYC taxi yellow cab data, which is available as sample data in Databricks. Run the following code in your notebook to load a month of data as a dataframe:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz")
4. Connect to your data
- File
- DataFrame
Add the Datasource:
base_directory = "/dbfs/example_data/nyctaxi/tripdata/yellow/"
dbfs_datasource = context.sources.add_or_update_spark_dbfs(
name="version-0.16.16 my_spark_dbfs_datasource",
base_directory=base_directory,
)
Add the Data Asset:
batching_regex = r"yellow_tripdata_(?P<year>\d{4})-(?P<month>\d{2})\.csv\.gz"
csv_asset = dbfs_datasource.add_csv_asset(
name="version-0.16.16 yellow_tripdata",
batching_regex=batching_regex,
header=True,
infer_schema=True,
)
Then we build a BatchRequest
using
the DataAsset
we configured earlier
to use as a sample of data when creating
Expectations:
batch_request = csv_asset.build_batch_request()
Add the Datasource:
dataframe_datasource = context.sources.add_or_update_spark(
name="version-0.16.16 my_spark_in_memory_datasource",
)
csv_file_path = "/path/to/data/directory/yellow_tripdata_2020-08.csv"
Add the Data Asset:
df = spark.read.csv(csv_file_path, header=True)
dataframe_asset = dataframe_datasource.add_dataframe_asset(
name="version-0.16.16 yellow_tripdata",
dataframe=df,
)
Then we build a BatchRequest
using
the DataAsset
we configured earlier
to use as a sample of data when creating
Expectations:
batch_request = dataframe_asset.build_batch_request()
🚀🚀 Congratulations! 🚀🚀 You successfully connected Great Expectations with your data.
Now let's keep going to create an Expectation Suite and validate our data.5. Create Expectations
Here we will use a ValidatorUsed to run an Expectation Suite against data. to interact with our batch of data and generate an Expectation SuiteA collection of verifiable assertions about data..
Each time we evaluate an Expectation (e.g. via
validator.expect_*
), it will immediately
be Validated against your data. This instant feedback
helps you zero in on unexpected data very quickly,
taking a lot of the guesswork out of data exploration.
Also, the Expectation configuration will be stored in
the Validator. When you have run all of the
Expectations you want for this dataset, you can call
validator.save_expectation_suite()
to
save all of your Expectation configurations into an
Expectation Suite for later use in a checkpoint.
- File
- DataFrame
First we create the suite and get a
Validator
:
expectation_suite_name = "version-0.16.16 insert_your_expectation_suite_name_here"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
Then we use the Validator
to add a
few Expectations:
validator.expect_table_column_count_to_equal(value=18)
Finally we save our Expectation Suite (all of
the unique Expectation Configurations from each
run of validator.expect_*
) to our
Expectation Store:
validator.save_expectation_suite(discard_failed_expectations=False)
First we create the suite and get a
Validator
:
expectation_suite_name = "version-0.16.16 insert_your_expectation_suite_name_here"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
Then we use the Validator
to add a
few Expectations:
validator.expect_column_values_to_not_be_null(column="passenger_count")
validator.expect_column_values_to_be_between(
column="congestion_surcharge", min_value=0, max_value=1000
)
Finally we save our Expectation Suite (all of
the unique Expectation Configurations from each
run of validator.expect_*
) to our
Expectation Store:
validator.save_expectation_suite(discard_failed_expectations=False)
6. Validate your data
- File
- DataFrame
Here we will create and store a CheckpointThe primary means for validating data in a production deployment of Great Expectations. for our batch, which we can use to validate and run post-validation actions. Check out our docs on "Validating your data" for more info on how to customize your Checkpoints.
First, we create the Checkpoint configuration
mirroring our
batch_request
configuration above
and using the Expectation Suite we created:
my_checkpoint_name = "version-0.16.16 my_databricks_checkpoint"
checkpoint = SimpleCheckpoint(
name=my_checkpoint_name,
config_version=1.0,
class_name="version-0.16.16 SimpleCheckpoint",
run_name_template="%Y%m%d-%H%M%S-my-run-name-template",
data_context=context,
)
Next, we add the Checkpoint:
context.add_or_update_checkpoint(checkpoint=checkpoint)
Finally, we run the Checkpoint:
checkpoint_result = context.run_checkpoint(
checkpoint_name=my_checkpoint_name,
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
)
Here we will create and store a CheckpointThe primary means for validating data in a production deployment of Great Expectations. with no defined validations, then pass in our dataframe at runtime.
First, we create the Checkpoint configuration:
my_checkpoint_name = "version-0.16.16 my_databricks_checkpoint"
checkpoint = SimpleCheckpoint(
name=my_checkpoint_name,
config_version=1.0,
class_name="version-0.16.16 SimpleCheckpoint",
run_name_template="%Y%m%d-%H%M%S-my-run-name-template",
data_context=context,
)
Next, we add the Checkpoint:
context.add_or_update_checkpoint(checkpoint=checkpoint)
Finally, we run the Checkpoint:
checkpoint_result = context.run_checkpoint(
checkpoint_name=my_checkpoint_name,
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
)
7. Build and view Data Docs
Since we used a SimpleCheckpoint
, our
Checkpoint already contained an
UpdateDataDocsAction
which rendered our
Data DocsHuman readable documentation generated from Great
Expectations metadata detailing Expectations,
Validation Results, etc.
from the validation we just ran. That means our Data
Docs store will contain a new rendered validation
result.
How do I customize these actions?
Also, to see the full Checkpoint configuration,
you can run:
print(my_checkpoint.get_substituted_config().to_yaml_str())
Since we used DBFS for our Data Docs store, we need to download our data docs locally to view them. If you use a different store, you can host your data docs in a place where they can be accessed directly by your team. To learn more, see our documentation on Data Docs for other locations e.g. filesystem, s3, GCS, ABS.
Run the following
Databricks CLI
command to download your data docs (replacing the
paths as appropriate), then open the local copy of
index.html
to view your updated Data
Docs:
databricks fs cp -r dbfs:/great_expectations/uncommitted/data_docs/local_site/ great_expectations/uncommitted/data_docs/local_site/
Using the displayHTML
command is another
option for displaying Data Docs in a Databricks
notebook. There is a restriction, though, in that
clicking on a link in the displayed data documents
will result in an empty page. If you wish to see some
validation results, use this approach.
html = '/dbfs/great_expectations/uncommitted/data_docs/local_site/index.html'
with open(html, "r") as f:
data = "".join([l for l in f])
displayHTML(data)
8. Congratulations!
You've successfully validated your data with Great Expectations using Databricks and viewed the resulting Data Docs. Check out our other guides for more customization options and happy validating!
View the full scripts used in this page on GitHub: