Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,780 changes: 1,606 additions & 174 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ name = "dataframely"
[dependencies]
itertools = "*"
num-format = "*"
polars = { version = ">=0.50", default-features = false, features = [] }
polars = { version = ">=0.50", default-features = false, features = [
"dtype-full",
] }
polars-arrow = ">=0.50"
polars-core = { version = ">=0.50", features = [
"dtype-array",
"dtype-struct",
], default-features = false }
polars-core = ">=0.50"
pyo3 = { version = ">=0.25", features = ["abi3-py310", "extension-module"] }
pyo3-polars = { version = ">=0.23.1", features = ["derive"] }
rand = { version = "0.9", features = ["std_rng"] }
Expand Down
19 changes: 18 additions & 1 deletion dataframely/_native.pyi
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
from typing import overload

def format_rule_failures(failures: list[tuple[str, int]]) -> str:
import polars as pl

def format_rule_failures(
failures: list[tuple[str, int]],
*,
failures_from: pl.DataFrame | None,
examples_from: pl.DataFrame | None,
primary_key_columns: list[str],
max_examples: int | None,
) -> str:
"""
Format rule failures with the same logic that produces validation errors from the
polars plugin.

Args:
failures: The name of the failures and their counts. This should only include
failures with a count of at least 1.
failures_from: The data frame containing the rule columns providing the
failures.
examples_from: The data frame containing the example rows for each failure.
primary_key_columns: The primary key columns of the schema for which to format
rule failures. This is only relevant if `failures_from` and `examples_from`
are provided and allows for better error messages for the "primary_key" rule.
max_examples: The maximum number of examples to include for each failure. This is
only relevant if `failures_from` and `examples_from` are provided.

Returns:
The formatted rule failures.
Expand Down
22 changes: 20 additions & 2 deletions dataframely/_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import polars as pl
from polars.plugins import register_plugin_function

from dataframely.config import Config

PLUGIN_PATH = Path(__file__).parent

IntoExpr: TypeAlias = pl.Expr | str
Expand Down Expand Up @@ -59,6 +61,8 @@ def all_rules_required(
*,
null_is_valid: bool = True,
schema_name: str,
data_columns: Iterable[IntoExpr] | None = None,
primary_key_columns: list[str] | None,
) -> pl.Expr:
"""Execute :mod:`~polars.all_horizontal` and `.all` for a set of rules.

Expand All @@ -76,15 +80,29 @@ def all_rules_required(
schema_name: The name of the schema being validated. This is used to produce
better error messages.
null_is_valid: Whether to treat null values as valid (i.e., `true`).
data_columns: Optional data columns to include for generating example rows in
error messages. If provided, up to 5 distinct example rows are included
for each failing rule.
primary_key_columns: Optional list of primary key columns which are used for
better error messages if data columns are provided.

Returns:
A scalar boolean expression.
"""
rules_list = [rules] if isinstance(rules, pl.Expr) else list(rules)
num_rule_columns = len(rules_list)
data_columns_list = list(data_columns) if data_columns is not None else []
return register_plugin_function(
plugin_path=PLUGIN_PATH,
function_name="all_rules_required",
args=rules,
kwargs={"null_is_valid": null_is_valid, "schema_name": schema_name},
args=[*rules_list, *data_columns_list],
kwargs={
"null_is_valid": null_is_valid,
"schema_name": schema_name,
"num_rule_columns": num_rule_columns,
"primary_key_columns": primary_key_columns or [],
"max_failure_examples": Config.options["max_failure_examples"],
},
use_abs_path=True,
is_elementwise=True,
)
26 changes: 20 additions & 6 deletions dataframely/collection/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from dataframely._storage.delta import DeltaStorageBackend
from dataframely._storage.parquet import ParquetStorageBackend
from dataframely._typing import LazyFrame, Validation
from dataframely.config import Config
from dataframely.exc import (
DeserializationError,
ValidationError,
Expand Down Expand Up @@ -409,11 +410,20 @@ def validate(
# information to properly construct a useful error message.
filtered, failures = cls.filter(data, cast=cast, eager=True)
if any(len(failure) > 0 for failure in failures.values()):
errors = {
member: format_rule_failures(list(failure.counts().items()))
for member, failure in failures.items()
if len(failure) > 0
}
errors: dict[str, str] = {}
for member, failure in failures.items():
if len(failure) == 0:
continue

counts = failure.counts()
errors[member] = format_rule_failures(
list(counts.items()),
failures_from=failure._df.select(counts.keys()),
examples_from=failure.invalid(),
primary_key_columns=cls.member_schemas()[member].primary_key(),
max_examples=Config.options["max_failure_examples"],
)

details = [
f" > Member '{member}' failed validation:\n"
+ textwrap.indent(error, " ")
Expand Down Expand Up @@ -451,7 +461,11 @@ def validate(
)
.filter(
all_rules_required(
filter_names, null_is_valid=False, schema_name=name
filter_names,
null_is_valid=False,
schema_name=name,
data_columns=cls.common_primary_key(),
primary_key_columns=cls.common_primary_key(),
)
)
.drop(filter_names)
Expand Down
10 changes: 9 additions & 1 deletion dataframely/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
from typing_extensions import Unpack


class Options(TypedDict):
class Options(TypedDict, total=False):
#: The maximum number of iterations to use for "fuzzy" sampling.
max_sampling_iterations: int
#: The maximum number of examples to include in failure messages.
max_failure_examples: int


def default_options() -> Options:
return {
"max_sampling_iterations": 10_000,
"max_failure_examples": 0,
}


Expand All @@ -40,6 +43,11 @@ def set_max_sampling_iterations(iterations: int) -> None:
:meth:`Schema.sample`."""
Config.options["max_sampling_iterations"] = iterations

@staticmethod
def set_max_failure_examples(max_examples: int) -> None:
"""Set the maximum number of examples to include in failure messages."""
Config.options["max_failure_examples"] = max_examples

@staticmethod
def restore_defaults() -> None:
"""Restore the defaults of the configuration."""
Expand Down
18 changes: 16 additions & 2 deletions dataframely/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,15 @@ def validate(
if eager:
out, failure = cls.filter(df, cast=cast, eager=True)
if len(failure) > 0:
counts = failure.counts()
raise ValidationError(
format_rule_failures(list(failure.counts().items()))
format_rule_failures(
list(counts.items()),
failures_from=failure._df.select(counts.keys()),
examples_from=failure.invalid(),
primary_key_columns=cls.primary_key(),
max_examples=Config.options["max_failure_examples"],
)
)
return out
else:
Expand All @@ -587,7 +594,14 @@ def validate(
if rules := cls._validation_rules(with_cast=False):
lf = (
lf.pipe(with_evaluation_rules, rules)
.filter(all_rules_required(rules.keys(), schema_name=cls.__name__))
.filter(
all_rules_required(
rules.keys(),
schema_name=cls.__name__,
data_columns=cls.column_names(),
primary_key_columns=cls.primary_key(),
)
)
.drop(rules.keys())
)
return lf # type: ignore
Expand Down
24 changes: 24 additions & 0 deletions docs/guides/features/lazy-validation.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,27 @@ For collections, the error message for `eager=False` is limited and non-determin
information about a single member and, if multiple members fail validation, the member that the error message refers to
may vary across executions.
```

```{note}
When the lazy frame is collected on the polars _streaming_ engine, lazy validation may not surface _all_ validation
issues: validation is aborted as soon as the first failure is encountered. As a result, both the set of rules reported
in the error message and the specific failure surfaced may be non-deterministic across executions.
```

## Including failure examples in error messages

By default, validation error messages report only the name of each failing rule and the number of rows that violated it.
For easier debugging, dataframely can additionally include a few example rows for each failing rule. This is configured
via {meth}`~dataframely.Config.set_max_failure_examples` (or the `max_failure_examples` keyword on the
{class}`~dataframely.Config` context manager) and applies to both `eager=True` and `eager=False`:

```python
import dataframely as dy

with dy.Config(max_failure_examples=5):
MySchema.validate(df)
```

For column-level rules, examples include the value in the offending column. For schema-level rules, examples include all
data columns of the schema, except for the `primary_key` rule where examples are limited to the primary key columns.
The default value of `0` disables examples entirely.
36 changes: 33 additions & 3 deletions src/polars_plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,28 @@ pub fn all_rules(inputs: &[Series]) -> PolarsResult<Series> {
struct RequiredValidationKwargs {
schema_name: String,
null_is_valid: bool,
#[serde(default)]
num_rule_columns: Option<usize>,
primary_key_columns: Option<Vec<String>>,
max_failure_examples: usize,
}

/// Reduce a set of boolean columns into a single boolean scalar, AND-ing all values.
/// Null values are treated as `true`.
/// In contrast to `all_rules`, this function raises an error if the returned value would be
/// `false`, including details about the `false` values (i.e. "rules" that failed).
/// The first `num_rule_columns` inputs are boolean rule columns; any remaining inputs are
/// data columns used to generate example rows in error messages.
#[polars_expr(output_type=Boolean)]
pub fn all_rules_required(
inputs: &[Series],
kwargs: RequiredValidationKwargs,
) -> PolarsResult<Series> {
let failures = compute_rule_failures(inputs, kwargs.null_is_valid)?;
let num_rule = kwargs.num_rule_columns.unwrap_or(inputs.len());
let rule_inputs = &inputs[..num_rule];
let data_inputs = &inputs[num_rule..];

let failures = compute_rule_failures(rule_inputs, kwargs.null_is_valid)?;

// If there's any failure, we know that validation failed and use the failure object for an
// informative error message. If no failure exists, we simply return a series with a single
Expand All @@ -85,7 +95,27 @@ pub fn all_rules_required(
return Ok(column.take_materialized_series());
}

// Aggregate failure counts into a validation error.
let error = RuleValidationError::new(failures);
// Aggregate failures into a validation error
let failures_from = DataFrame::new(
rule_inputs[0].len(),
rule_inputs
.iter()
.map(|s| s.clone().into_column())
.collect(),
)?;
let examples_from = DataFrame::new(
data_inputs[0].len(),
data_inputs
.iter()
.map(|s| s.clone().into_column())
.collect(),
)?;
let error = RuleValidationError::new(
failures,
Some(failures_from),
Some(examples_from),
kwargs.primary_key_columns.unwrap_or_default(),
kwargs.max_failure_examples,
);
Err(polars_err!(ComputeError: format!("\n{}", error.to_string(Some(&kwargs.schema_name)))))
}
Loading
Loading