How to add Spark support for Custom Expectations
This guide will help you implement native Spark support for your Custom ExpectationAn extension of the `Expectation` class, developed outside of the Great Expectations library..
Prerequisites: This how-to guide assumes you have:
Great Expectations supports a number of Execution EnginesA system capable of processing data to compute Metrics., including a Spark Execution Engine. These Execution Engines provide the computing resources used to calculate the MetricsA computed attribute of data such as the mean of a column. defined in the Metric class of your Custom Expectation.
If you decide to contribute your ExpectationA verifiable assertion about data., its entry in the Expectations Gallery will reflect the Execution Engines that it supports.
We will add Spark support for the Custom Expectations implemented in our guides on how to create Custom Column Aggregate Expectations and how to create Custom Column Map Expectations.
Steps
1. Specify your backends
To avoid surprises and help clearly define your Custom Expectation, it can be helpful to determine beforehand what backends you plan to support, and test them along the way.
Within the examples
defined inside your
Expectation class, the test_backends
key
specifies which backends and SQLAlchemy dialects to
run tests for. Add entries corresponding to the
functionality you want to add:
examples = [
{
"data": {"x": [1, 2, 3, 4, 5], "y": [0, -1, -2, 4, None]},
"tests": [
{
"title": "basic_positive_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column": "x",
"min_value": 4,
"strict_min": True,
"max_value": 5,
"strict_max": False,
},
"out": {"success": True},
},
{
"title": "basic_negative_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column": "y",
"min_value": -2,
"strict_min": False,
"max_value": 3,
"strict_max": True,
},
"out": {"success": False},
},
],
"test_backends": [
{
"backend": "pandas",
"dialects": None,
},
{
"backend": "sqlalchemy",
"dialects": ["sqlite", "postgresql"],
},
{
"backend": "spark",
"dialects": None,
},
],
}
]
You may have noticed that specifying
test_backends
isn't required for
successfully testing your Custom Expectation.
If not specified, Great Expectations will attempt to determine the implemented backends automatically, but wll only run SQLAlchemy tests against sqlite.
2. Implement the Spark logic for your Custom Expectation
Great Expectations provides a variety of ways to implement an Expectation in Spark. Two of the most common include:
- Defining a partial function that takes a Spark DataFrame column as input
- Directly executing queries on Spark DataFrames to determine the value of your Expectation's metric directly
- Partial Function
- Query Execution
Great Expectations allows for much of the PySpark DataFrame logic to be abstracted away by specifying metric behavior as a partial function.
To do this, we use one of the
@column_*_partial
decorators:
-
@column_aggregate_partial
for Column Aggregate Expectations -
@column_condition_partial
for Column Map Expectations -
@column_pair_condition_partial
for Column Pair Map Expectations -
@multicolumn_condition_partial
for Multicolumn Map Expectations
These decorators expect an appropriate
engine
argument. In this case,
we'll pass our
SparkDFExecutionEngine
.
The decorated method takes in a Spark
Column
object and will either
return a
pyspark.sql.functions.function
or a
pyspark.sql.Column.function
that
Great Expectations will use to generate the
appropriate SQL queries.
For our Custom Column Aggregate Expectation
ExpectColumnMaxToBeBetweenCustom
,
we're going to leverage PySpark's
max
SQL Function and the
@column_aggregate_partial
decorator.
@column_aggregate_partial(engine=SparkDFExecutionEngine)
def _spark(cls, column, _table, _column_name, **kwargs):
"""Spark Max Implementation"""
return F.max(column)
If we need a builtin function from
pyspark.sql.functions
, usually
aliased to F
, the import logic in
from
great_expectations.expectations.metrics.import_manager
import F
allows us to access these functions even when
PySpark is not installed.
Applying Python Functions
F.udf
allows us to use a Python
function as a Spark User Defined Function
for Column Map Expectations, giving us the
ability to define custom functions and apply
them to our data.Here is an example of
F.udf
applied to
ExpectColumnValuesToEqualThree
:
@column_condition_partial(engine=SparkDFExecutionEngine)
def _spark(cls, column, strftime_format, **kwargs):
def is_equal_to_three(val):
return (val == 3)
success_udf = F.udf(is_equal_to_three, sparktypes.BooleanType())
return success_udf(column)
For more on F.udf
and the
functionality it provides, see the
Apache Spark UDF documentation.
The most direct way of implementing a metric is
by computing its value by constructing or
directly executing querys using objects provided
by the @metric_*
decorators:
-
@metric_value
for Column Aggregate Expectations-
Expects an appropriate
engine
,metric_fn_type
, anddomain_type
-
Expects an appropriate
-
@metric_partial
for all Map Expectations-
Expects an appropriate
engine
,partial_fn_type
, anddomain_type
-
Expects an appropriate
Our engine
will reflect the backend
we're implementing
(SparkDFExecutionEngine
), while our
fn_type
and
domain_type
are unique to the type
of Expectation we're implementing.
These decorators enable a higher-complexity workflow, allowing you to explicitly structure your queries and make intermediate queries to your database. While this approach can result in extra roundtrips to your database, it can also unlock advanced functionality for your Custom Expectations.
For our Custom Column Map Expectation
ExpectColumnValuesToEqualThree
,
we're going to implement the
@metric_partial
decorator,
specifying the type of value we're
computing (MAP_CONDITION_FN
) and
the domain over which we're computing
(COLUMN
):
@metric_partial(
engine=SparkDFExecutionEngine,
partial_fn_type=MetricPartialFunctionTypes.MAP_CONDITION_FN,
domain_type=MetricDomainTypes.COLUMN,
)
def _spark(
cls,
execution_engine: SparkDFExecutionEngine,
metric_domain_kwargs,
metric_value_kwargs,
metrics,
runtime_configuration,
):
The decorated method takes in a valid Execution
Engine and relevant kwargs
, and
will return a tuple of:
-
A
pyspark.sql.column.Column
defining the query to be executed compute_domain_kwargs
accessor_domain_kwargs
These will be used to execute our query and compute the results of our metric.
To do this, we need to access our Compute Domain directly:
(
selectable,
compute_domain_kwargs,
accessor_domain_kwargs,
) = execution_engine.get_compute_domain(
metric_domain_kwargs, MetricDomainTypes.COLUMN
)
column_name = accessor_domain_kwargs["column"]
column = F.col(column_name)
This allows us to build and return a query to be executed, providing the result of our metric:
query = F.when(column == 3, F.lit(False)).otherwise(F.lit(True))
return (query, compute_domain_kwargs, accessor_domain_kwargs)
Because in Spark we are implementing the
window function directly, we have to return
the unexpected condition:
False
when
column == 3
, otherwise
True
.
3. Verifying your implementation
If you now run your file,
print_diagnostic_checklist()
will attempt
to execute your example cases using this new backend.
If your implementation is correctly defined, and the rest of the core logic in your Custom Expectation is already complete, you will see the following in your Diagnostic Checklist:
✔ Has at least one positive and negative example case, and all test cases pass
If you've already implemented the Pandas backend covered in our How-To guides for creating Custom Expectations and the SQLAlchemy backend covered in our guide on how to add SQLAlchemy support for Custom Expectations, you should see the following in your Diagnostic Checklist:
✔ Has core logic that passes tests for all applicable Execution Engines and SQL dialects
Congratulations!
🎉 You've successfully
implemented Spark support for a Custom
Expectation! 🎉
4. Contribution (Optional)
This guide will leave you with core functionality sufficient for contribution back to Great Expectations at an Experimental level.
If you're interested in having your contribution accepted at a Beta level, your Custom Expectation will need to support SQLAlchemy, Spark, and Pandas.
For full acceptance into the Great Expectations codebase at a Production level, we require that your Custom Expectation meets our code standards, including test coverage and style. If you believe your Custom Expectation is otherwise ready for contribution at a Production level, please submit a Pull Request, and we will work with you to ensure your Custom Expectation meets these standards.
For more information on our code standards and contribution, see our guide on Levels of Maturity for Expectations.
To view the full scripts used in this page, see them on GitHub: