"""
Data validator module for validating tabular data (pandas DataFrames) against schema definitions.
"""
import pandas as pd
import numpy as np
import yaml
import json
from datetime import datetime
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
from .exceptions import ETLForgeError
from .schema_adapter import SchemaAdapter
[docs]
class ValidationResult:
"""Container for validation results."""
def __init__(self):
self.is_valid = True
self.errors = []
self.invalid_rows = []
self.summary = {
"total_rows": 0,
"valid_rows": 0,
"invalid_rows": 0,
"columns_checked": 0,
"missing_columns": [],
"extra_columns": [],
}
[docs]
def add_error(
self,
error_type: str,
column: str,
row_idx: Optional[int] = None,
message: Optional[str] = None,
):
"""Add a validation error."""
self.is_valid = False
error = {
"type": error_type,
"column": column,
"row": row_idx,
"message": message,
}
self.errors.append(error)
if row_idx is not None and row_idx not in self.invalid_rows:
self.invalid_rows.append(row_idx)
[docs]
class DataValidator:
"""
Validates tabular data (pandas DataFrames) against a declarative schema.
This class reads a schema and validates pandas DataFrames, performing
a series of validation checks to ensure the data conforms to the
schema's specifications.
"""
def __init__(self, schema_path: Optional[Union[str, Path, dict]] = None):
"""
Initializes the DataValidator.
Args:
schema_path: The path to a YAML/JSON schema file or a dictionary
containing the schema definition.
Raises:
ETLForgeError: If the schema file cannot be found or parsed.
"""
self.schema: Dict[str, Any] = {}
if schema_path:
self.load_schema(schema_path)
[docs]
def load_schema(self, schema_path: Union[str, Path, dict]):
"""
Loads a schema from a file path or a dictionary.
The schema can be in ETLForge native format, Frictionless Table Schema,
or JSON Schema format. The format is auto-detected and converted to
ETLForge format if necessary.
Args:
schema_path: The path to a YAML/JSON schema file or a dictionary
containing the schema definition. Supports ETLForge native format,
Frictionless Table Schema, and JSON Schema.
Raises:
ETLForgeError: If the schema file is not found, has an unsupported
format, or cannot be parsed.
"""
# Use SchemaAdapter to load and auto-convert the schema
self.schema = SchemaAdapter.load_and_convert(schema_path)
self._validate_schema()
def _validate_schema(self):
"""
Validates the loaded schema for correctness and completeness.
Raises:
ETLForgeError: If the schema is invalid.
"""
if not self.schema:
raise ETLForgeError("Schema is empty or None")
if "fields" not in self.schema:
raise ETLForgeError("Schema must contain a 'fields' key")
fields = self.schema["fields"]
if not isinstance(fields, list) or len(fields) == 0:
raise ETLForgeError("Schema 'fields' must be a non-empty list")
field_names = set()
supported_types = {"int", "float", "string", "date", "category"}
for i, field in enumerate(fields):
if not isinstance(field, dict):
raise ETLForgeError(f"Field at index {i} must be a dictionary")
# Check required fields
if "name" not in field:
raise ETLForgeError(
f"Field at index {i} is missing required 'name' property"
)
if "type" not in field:
raise ETLForgeError(
f"Field '{field.get('name', i)}' is missing required 'type' property"
)
field_name = field["name"]
field_type = field["type"]
# Check for duplicate field names
if field_name in field_names:
raise ETLForgeError(f"Duplicate field name '{field_name}' found")
field_names.add(field_name)
# Validate field type
if field_type not in supported_types:
raise ETLForgeError(
f"Field '{field_name}' has unsupported type '{field_type}'. "
f"Supported types: {', '.join(sorted(supported_types))}"
)
def _validate_column_existence(self, df: pd.DataFrame, result: ValidationResult):
"""Validate that all required columns exist."""
expected_columns = {field["name"] for field in self.schema.get("fields", [])}
actual_columns = set(df.columns)
missing_columns = expected_columns - actual_columns
extra_columns = actual_columns - expected_columns
result.summary["missing_columns"] = list(missing_columns)
result.summary["extra_columns"] = list(extra_columns)
for col in missing_columns:
result.add_error(
"missing_column",
col,
message=f"Column '{col}' is missing from the data",
)
def _validate_data_types(self, df: pd.DataFrame, result: ValidationResult):
"""Validate data types for each column."""
for field in self.schema.get("fields", []):
field_name = field["name"]
field_type = field["type"].lower()
if field_name not in df.columns:
continue # Already handled in column existence validation
column_data = df[field_name]
# Skip null values for type checking unless nullable is False
non_null_data = column_data.dropna()
if field_type == "int":
invalid_mask = ~non_null_data.apply(
lambda x: isinstance(x, (int, np.integer))
or (isinstance(x, (float, np.floating)) and x.is_integer())
)
elif field_type == "float":
invalid_mask = ~non_null_data.apply(
lambda x: isinstance(x, (int, float, np.number))
)
elif field_type == "string":
invalid_mask = ~non_null_data.apply(lambda x: isinstance(x, str))
elif field_type == "date":
date_format = field.get("format", "%Y-%m-%d")
invalid_mask = ~non_null_data.apply(
lambda x: self._is_valid_date(x, date_format)
)
elif field_type == "category":
valid_values = field.get("values", [])
invalid_mask = ~non_null_data.isin(valid_values)
else:
continue
# Add errors for invalid types
invalid_indices = non_null_data[invalid_mask].index
for idx in invalid_indices:
result.add_error(
"invalid_type",
field_name,
idx,
f"Value '{df.loc[idx, field_name]}' is not of type '{field_type}'",
)
def _is_valid_date(self, value: Any, date_format: str) -> bool:
"""Check if a value is a valid date in the specified format."""
if not isinstance(value, str):
return False
try:
datetime.strptime(value, date_format)
return True
except ValueError:
return False
def _validate_constraints(self, df: pd.DataFrame, result: ValidationResult):
"""Validate field constraints."""
for field in self.schema.get("fields", []):
field_name = field["name"]
if field_name not in df.columns:
continue
column_data = df[field_name]
# Check nullable constraint
if not field.get("nullable", False):
null_mask = column_data.isnull()
null_indices = df[null_mask].index
for idx in null_indices:
result.add_error(
"null_value",
field_name,
idx,
f"Null value found in non-nullable column '{field_name}'",
)
# Check unique constraint
if field.get("unique", False):
duplicated_mask = (
column_data.duplicated(keep=False) & column_data.notnull()
)
duplicate_indices = df[duplicated_mask].index
for idx in duplicate_indices:
result.add_error(
"duplicate_value",
field_name,
idx,
f"Duplicate value '{df.loc[idx, field_name]}' in unique column '{field_name}'",
)
# Check range constraints (only for valid numeric types)
if "range" in field and field["type"].lower() in ["int", "float"]:
range_config = field["range"]
min_val = range_config.get("min")
max_val = range_config.get("max")
# Only check ranges for valid numeric values
if field["type"].lower() == "int":
valid_numeric_mask = (
column_data.apply(
lambda x: isinstance(x, (int, np.integer))
or (isinstance(x, (float, np.floating)) and x.is_integer())
)
& column_data.notnull()
)
else: # float
valid_numeric_mask = (
column_data.apply(
lambda x: isinstance(x, (int, float, np.number))
)
& column_data.notnull()
)
valid_numeric_data = column_data[valid_numeric_mask]
if min_val is not None:
below_min_mask = valid_numeric_data < min_val
below_min_indices = valid_numeric_data[below_min_mask].index
for idx in below_min_indices:
result.add_error(
"range_violation",
field_name,
idx,
f"Value '{df.loc[idx, field_name]}' is below minimum {min_val}",
)
if max_val is not None:
above_max_mask = valid_numeric_data > max_val
above_max_indices = valid_numeric_data[above_max_mask].index
for idx in above_max_indices:
result.add_error(
"range_violation",
field_name,
idx,
f"Value '{df.loc[idx, field_name]}' is above maximum {max_val}",
)
# Check categorical values
if field["type"].lower() == "category" and "values" in field:
valid_values = field["values"]
invalid_mask = (~column_data.isin(valid_values)) & column_data.notnull()
invalid_indices = df[invalid_mask].index
for idx in invalid_indices:
result.add_error(
"invalid_category",
field_name,
idx,
f"Value '{df.loc[idx, field_name]}' is not in allowed categories {valid_values}",
)
[docs]
def validate(self, df: pd.DataFrame) -> ValidationResult:
"""
Validates a pandas DataFrame against the loaded schema.
This is the main validation method. It runs all configured validation checks.
Args:
df: A pandas DataFrame to validate.
Returns:
A `ValidationResult` object containing the detailed results of
the validation run.
Raises:
ETLForgeError: If no schema has been loaded or if df is not a DataFrame.
"""
if not self.schema:
raise ETLForgeError("No schema loaded. Use load_schema() first.")
if not isinstance(df, pd.DataFrame):
raise ETLForgeError(
f"Expected pandas DataFrame, got {type(df).__name__}. "
"Please load your data into a DataFrame first using pd.read_csv() or pd.read_excel()."
)
result = ValidationResult()
result.summary["total_rows"] = len(df)
result.summary["columns_checked"] = len(self.schema.get("fields", []))
# Run all validation checks
self._validate_column_existence(df, result)
self._validate_data_types(df, result)
self._validate_constraints(df, result)
# Update summary
result.summary["invalid_rows"] = len(set(result.invalid_rows))
result.summary["valid_rows"] = (
result.summary["total_rows"] - result.summary["invalid_rows"]
)
return result
[docs]
def validate_and_report(
self,
df: pd.DataFrame,
report_path: Optional[str] = None,
) -> ValidationResult:
"""
Validates a pandas DataFrame and optionally saves a report of invalid rows.
Args:
df: A pandas DataFrame to validate.
report_path: The destination file path for the invalid rows report (CSV format).
If None, no report is saved.
Returns:
A `ValidationResult` object containing the detailed results.
Raises:
ETLForgeError: If an error occurs while writing the report file.
"""
result = self.validate(df)
if report_path and result.invalid_rows:
# Create report DataFrame with invalid rows and error details
invalid_df = df.loc[result.invalid_rows].copy()
# Add error details
error_details = []
for idx in invalid_df.index:
row_errors = [error for error in result.errors if error["row"] == idx]
error_messages = [
f"{error['type']}: {error['message']}" for error in row_errors
]
error_details.append("; ".join(error_messages))
invalid_df["validation_errors"] = error_details
# Save report as CSV
report_path_obj = Path(report_path)
try:
invalid_df.to_csv(report_path_obj, index=True)
except (IOError, PermissionError) as e:
raise ETLForgeError(
f"Failed to save validation report to {report_path}: {e}"
) from e
return result
[docs]
def print_validation_summary(self, result: ValidationResult):
"""Print a summary of validation results."""
print("\n" + "=" * 50)
print("VALIDATION SUMMARY")
print("=" * 50)
print(f"Total rows: {result.summary['total_rows']}")
print(f"Valid rows: {result.summary['valid_rows']}")
print(f"Invalid rows: {result.summary['invalid_rows']}")
print(f"Columns checked: {result.summary['columns_checked']}")
if result.summary["missing_columns"]:
print(f"Missing columns: {', '.join(result.summary['missing_columns'])}")
if result.summary["extra_columns"]:
print(f"Extra columns: {', '.join(result.summary['extra_columns'])}")
print(f"\nValidation: {'PASSED' if result.is_valid else 'FAILED'}")
if not result.is_valid:
print(f"Total errors: {len(result.errors)}")
# Group errors by type
error_types = {}
for error in result.errors:
error_type = error["type"]
if error_type not in error_types:
error_types[error_type] = 0
error_types[error_type] += 1
print("\nError breakdown:")
for error_type, count in error_types.items():
print(f" {error_type}: {count}")
print("=" * 50 + "\n")