Source code for etl_forge.schema_adapter

"""
Schema adapter module for converting established schema standards to ETLForge format.

This module provides adapters for:
- Frictionless Table Schema (https://specs.frictionlessdata.io/table-schema/)
- JSON Schema (https://json-schema.org/)

These adapters allow ETLForge to consume schemas defined in widely-adopted
standards, improving interoperability with other data tools.
"""

from typing import Dict, Any, List, Optional, Union
from pathlib import Path
import json
import yaml
from .exceptions import ETLForgeError


[docs] class SchemaAdapter: """ Base class for schema adapters. Schema adapters convert schemas from established standards to ETLForge's internal format. """
[docs] @staticmethod def detect_schema_type(schema: Dict[str, Any]) -> str: """ Detect the type of schema based on its structure. Args: schema: A dictionary containing the schema definition. Returns: One of: 'etlforge', 'frictionless', 'jsonschema', or 'unknown' """ if not isinstance(schema, dict): return "unknown" # ETLForge native format if "fields" in schema and isinstance(schema.get("fields"), list): # Check if it looks like ETLForge format (has 'type' with our types) fields = schema["fields"] if fields and isinstance(fields[0], dict): first_field = fields[0] if "type" in first_field: etlforge_types = {"int", "float", "string", "date", "category"} if first_field["type"] in etlforge_types: return "etlforge" # Frictionless uses different type names frictionless_types = { "integer", "number", "boolean", "object", "array", "datetime", "time", "year", "yearmonth", "duration", "geopoint", "geojson", } if ( first_field["type"] in frictionless_types or first_field["type"] == "string" ): # Could be either, check for frictionless-specific fields if any( key in first_field for key in ["constraints", "format", "rdfType"] ): return "frictionless" # Check if it has ETLForge-specific fields if any( key in first_field for key in [ "range", "faker_template", "null_rate", "values", ] ): return "etlforge" return "frictionless" # Default to frictionless if unclear # JSON Schema format if "$schema" in schema or "properties" in schema: return "jsonschema" return "unknown"
[docs] @staticmethod def load_and_convert(schema_path: Union[str, Path, dict]) -> Dict[str, Any]: """ Load a schema from a file or dict and convert to ETLForge format. This method auto-detects the schema type and applies the appropriate conversion. Args: schema_path: The path to a schema file or a dictionary. Returns: A schema dictionary in ETLForge format. Raises: ETLForgeError: If the schema cannot be loaded or converted. """ if isinstance(schema_path, dict): schema = schema_path else: schema_path_obj = Path(schema_path) if not schema_path_obj.exists(): raise ETLForgeError(f"Schema file not found at: {schema_path}") suffix = schema_path_obj.suffix.lower() try: with open(schema_path_obj, "r", encoding="utf-8") as file: if suffix in [".yaml", ".yml"]: schema = yaml.safe_load(file) or {} elif suffix == ".json": schema = json.load(file) or {} else: raise ETLForgeError(f"Unsupported schema file format: {suffix}") except (IOError, yaml.YAMLError, json.JSONDecodeError) as e: raise ETLForgeError(f"Failed to load or parse schema file: {e}") from e schema_type = SchemaAdapter.detect_schema_type(schema) if schema_type == "etlforge": return schema elif schema_type == "frictionless": return FrictionlessAdapter.convert(schema) elif schema_type == "jsonschema": return JsonSchemaAdapter.convert(schema) else: # Try to use as-is, validation will catch issues return schema
[docs] class FrictionlessAdapter: """ Adapter for Frictionless Table Schema. Frictionless Table Schema is a standard for describing tabular data. Spec: https://specs.frictionlessdata.io/table-schema/ Supported Frictionless types and their ETLForge mappings: - integer -> int - number -> float - string -> string - date/datetime/time -> date - boolean -> category (with values [True, False]) - array/object -> Not supported (raises error) Supported constraints: - required -> nullable (inverted) - unique -> unique - minimum/maximum -> range.min/range.max - minLength/maxLength -> length.min/length.max - enum -> values (for category type) - pattern -> Not directly supported (logged as warning) """ # Type mapping from Frictionless to ETLForge TYPE_MAP = { "integer": "int", "number": "float", "string": "string", "date": "date", "datetime": "date", "time": "date", "boolean": "category", "year": "int", "yearmonth": "string", }
[docs] @classmethod def convert(cls, schema: Dict[str, Any]) -> Dict[str, Any]: """ Convert a Frictionless Table Schema to ETLForge format. Args: schema: A Frictionless Table Schema dictionary. Returns: An ETLForge-compatible schema dictionary. Raises: ETLForgeError: If the schema contains unsupported types. """ if not isinstance(schema, dict): raise ETLForgeError("Frictionless schema must be a dictionary") if "fields" not in schema: raise ETLForgeError("Frictionless schema must contain 'fields' key") etl_fields = [] for field in schema["fields"]: etl_field = cls._convert_field(field) etl_fields.append(etl_field) etl_schema = {"fields": etl_fields} # Preserve any metadata if "missingValues" in schema: etl_schema["_frictionless_missingValues"] = schema["missingValues"] if "primaryKey" in schema: etl_schema["_frictionless_primaryKey"] = schema["primaryKey"] return etl_schema
@classmethod def _convert_field(cls, field: Dict[str, Any]) -> Dict[str, Any]: """Convert a single Frictionless field to ETLForge format.""" if not isinstance(field, dict): raise ETLForgeError("Each field must be a dictionary") if "name" not in field: raise ETLForgeError("Each field must have a 'name' property") name = field["name"] frictionless_type = field.get("type", "string") # Check for unsupported types if frictionless_type in ["array", "object", "geopoint", "geojson", "duration"]: raise ETLForgeError( f"Field '{name}' has unsupported Frictionless type '{frictionless_type}'. " f"ETLForge only supports tabular data types." ) # Map the type etl_type = cls.TYPE_MAP.get(frictionless_type, "string") etl_field: Dict[str, Any] = { "name": name, "type": etl_type, } # Handle constraints constraints = field.get("constraints", {}) # Required/nullable (inverted logic) if "required" in constraints: etl_field["nullable"] = not constraints["required"] else: etl_field["nullable"] = True # Unique constraint if constraints.get("unique"): etl_field["unique"] = True # Numeric range constraints if etl_type in ["int", "float"]: range_config = {} if "minimum" in constraints: range_config["min"] = constraints["minimum"] if "maximum" in constraints: range_config["max"] = constraints["maximum"] if range_config: etl_field["range"] = range_config # String length constraints if etl_type == "string": length_config = {} if "minLength" in constraints: length_config["min"] = constraints["minLength"] if "maxLength" in constraints: length_config["max"] = constraints["maxLength"] if length_config: etl_field["length"] = length_config # Enum values (converts to category type) if "enum" in constraints: etl_field["type"] = "category" etl_field["values"] = constraints["enum"] # Boolean type special handling if frictionless_type == "boolean": etl_field["values"] = ["true", "false"] # Date format handling if etl_type == "date" and "format" in field: # Frictionless uses different format strings, try to convert common ones frictionless_format = field["format"] etl_field["format"] = cls._convert_date_format(frictionless_format) # Handle date range (Frictionless doesn't have this, but check for constraints) if etl_type == "date": range_config = {} if "minimum" in constraints: range_config["start"] = str(constraints["minimum"]) if "maximum" in constraints: range_config["end"] = str(constraints["maximum"]) if range_config: etl_field["range"] = range_config # Preserve description as a comment if "description" in field: etl_field["_description"] = field["description"] return etl_field @staticmethod def _convert_date_format(frictionless_format: str) -> str: """Convert Frictionless date format to Python strftime format.""" # Frictionless uses 'default' for ISO 8601 or custom patterns format_map = { "default": "%Y-%m-%d", "any": "%Y-%m-%d", "%Y-%m-%d": "%Y-%m-%d", "%Y/%m/%d": "%Y/%m/%d", "%d-%m-%Y": "%d-%m-%Y", "%d/%m/%Y": "%d/%m/%Y", "%m/%d/%Y": "%m/%d/%Y", } return format_map.get(frictionless_format, "%Y-%m-%d")
[docs] @classmethod def to_frictionless(cls, etl_schema: Dict[str, Any]) -> Dict[str, Any]: """ Convert an ETLForge schema to Frictionless Table Schema format. Args: etl_schema: An ETLForge schema dictionary. Returns: A Frictionless Table Schema dictionary. """ if "fields" not in etl_schema: raise ETLForgeError("ETLForge schema must contain 'fields' key") # Reverse type mapping reverse_type_map = { "int": "integer", "float": "number", "string": "string", "date": "date", "category": "string", } frictionless_fields = [] for field in etl_schema["fields"]: etl_type = field.get("type", "string") frictionless_type = reverse_type_map.get(etl_type, "string") fl_field: Dict[str, Any] = { "name": field["name"], "type": frictionless_type, } constraints: Dict[str, Any] = {} # Nullable -> required (inverted) if "nullable" in field: constraints["required"] = not field["nullable"] # Unique if field.get("unique"): constraints["unique"] = True # Range constraints if "range" in field: if "min" in field["range"]: constraints["minimum"] = field["range"]["min"] if "max" in field["range"]: constraints["maximum"] = field["range"]["max"] # Length constraints if "length" in field: if "min" in field["length"]: constraints["minLength"] = field["length"]["min"] if "max" in field["length"]: constraints["maxLength"] = field["length"]["max"] # Category values -> enum if etl_type == "category" and "values" in field: constraints["enum"] = field["values"] if constraints: fl_field["constraints"] = constraints # Date format if etl_type == "date" and "format" in field: fl_field["format"] = field["format"] # Description if "_description" in field: fl_field["description"] = field["_description"] frictionless_fields.append(fl_field) return {"fields": frictionless_fields}
[docs] class JsonSchemaAdapter: """ Adapter for JSON Schema. JSON Schema is a widely-adopted standard for describing JSON data. Spec: https://json-schema.org/ This adapter supports JSON Schema Draft-07 and later for describing tabular data where each row is an object with properties. Supported JSON Schema types and their ETLForge mappings: - integer -> int - number -> float - string -> string (or date if format is date/date-time) - boolean -> category (with values [true, false]) - array/object -> Not supported as field types Supported keywords: - required -> nullable (inverted) - minimum/maximum -> range.min/range.max - exclusiveMinimum/exclusiveMaximum -> adjusted range - minLength/maxLength -> length.min/length.max - enum -> category type with values - format (date, date-time, email, etc.) -> type hints """ TYPE_MAP = { "integer": "int", "number": "float", "string": "string", "boolean": "category", }
[docs] @classmethod def convert(cls, schema: Dict[str, Any]) -> Dict[str, Any]: """ Convert a JSON Schema to ETLForge format. Args: schema: A JSON Schema dictionary describing a tabular row structure. Returns: An ETLForge-compatible schema dictionary. Raises: ETLForgeError: If the schema cannot be converted. """ if not isinstance(schema, dict): raise ETLForgeError("JSON Schema must be a dictionary") # JSON Schema should have 'properties' for object schemas if "properties" not in schema: raise ETLForgeError( "JSON Schema must contain 'properties' key describing row fields" ) properties = schema["properties"] required_fields = set(schema.get("required", [])) etl_fields = [] # Process properties in order if possible property_order = schema.get("propertyOrder", list(properties.keys())) for prop_name in property_order: if prop_name not in properties: continue prop_schema = properties[prop_name] is_required = prop_name in required_fields etl_field = cls._convert_property(prop_name, prop_schema, is_required) etl_fields.append(etl_field) return {"fields": etl_fields}
@classmethod def _convert_property( cls, name: str, prop_schema: Dict[str, Any], is_required: bool ) -> Dict[str, Any]: """Convert a single JSON Schema property to an ETLForge field.""" if not isinstance(prop_schema, dict): raise ETLForgeError(f"Property '{name}' must have a schema object") # Handle $ref (not fully supported, just note it) if "$ref" in prop_schema: raise ETLForgeError( f"Property '{name}' uses $ref which is not supported. " f"Please dereference the schema first." ) # Get the JSON Schema type json_type = prop_schema.get("type", "string") # Handle type arrays (e.g., ["string", "null"]) if isinstance(json_type, list): # Filter out 'null' and use the first real type non_null_types = [t for t in json_type if t != "null"] is_nullable = "null" in json_type json_type = non_null_types[0] if non_null_types else "string" else: is_nullable = not is_required # Check for unsupported types if json_type in ["array", "object"]: raise ETLForgeError( f"Property '{name}' has unsupported type '{json_type}'. " f"ETLForge only supports tabular data types." ) # Map the type etl_type = cls.TYPE_MAP.get(json_type, "string") # Check for date format if json_type == "string" and "format" in prop_schema: format_val = prop_schema["format"] if format_val in ["date", "date-time"]: etl_type = "date" etl_field: Dict[str, Any] = { "name": name, "type": etl_type, "nullable": is_nullable, } # Numeric range constraints if etl_type in ["int", "float"]: range_config: Dict[str, Any] = {} if "minimum" in prop_schema: range_config["min"] = prop_schema["minimum"] if "maximum" in prop_schema: range_config["max"] = prop_schema["maximum"] if "exclusiveMinimum" in prop_schema: # Add 1 for int, small amount for float if etl_type == "int": range_config["min"] = prop_schema["exclusiveMinimum"] + 1 else: range_config["min"] = prop_schema["exclusiveMinimum"] + 0.0001 if "exclusiveMaximum" in prop_schema: if etl_type == "int": range_config["max"] = prop_schema["exclusiveMaximum"] - 1 else: range_config["max"] = prop_schema["exclusiveMaximum"] - 0.0001 if range_config: etl_field["range"] = range_config # String length constraints if json_type == "string": length_config: Dict[str, Any] = {} if "minLength" in prop_schema: length_config["min"] = prop_schema["minLength"] if "maxLength" in prop_schema: length_config["max"] = prop_schema["maxLength"] if length_config: etl_field["length"] = length_config # Enum values if "enum" in prop_schema: etl_field["type"] = "category" etl_field["values"] = prop_schema["enum"] # Boolean handling if json_type == "boolean": etl_field["values"] = ["true", "false"] # Date format handling if etl_type == "date": format_val = prop_schema.get("format", "date") if format_val == "date-time": etl_field["format"] = "%Y-%m-%dT%H:%M:%S" else: etl_field["format"] = "%Y-%m-%d" # String format hints for faker if json_type == "string" and "format" in prop_schema: format_val = prop_schema["format"] faker_map = { "email": "email", "uri": "url", "hostname": "hostname", "ipv4": "ipv4", "ipv6": "ipv6", "uuid": "uuid4", } if format_val in faker_map: etl_field["faker_template"] = faker_map[format_val] # Description if "description" in prop_schema: etl_field["_description"] = prop_schema["description"] # Title if "title" in prop_schema: etl_field["_title"] = prop_schema["title"] return etl_field
[docs] @classmethod def to_jsonschema(cls, etl_schema: Dict[str, Any]) -> Dict[str, Any]: """ Convert an ETLForge schema to JSON Schema format. Args: etl_schema: An ETLForge schema dictionary. Returns: A JSON Schema dictionary. """ if "fields" not in etl_schema: raise ETLForgeError("ETLForge schema must contain 'fields' key") # Reverse type mapping reverse_type_map = { "int": "integer", "float": "number", "string": "string", "date": "string", "category": "string", } properties: Dict[str, Any] = {} required: List[str] = [] property_order: List[str] = [] for field in etl_schema["fields"]: name = field["name"] etl_type = field.get("type", "string") json_type = reverse_type_map.get(etl_type, "string") prop: Dict[str, Any] = { "type": json_type, } # Nullable handling if field.get("nullable", False): prop["type"] = [json_type, "null"] else: required.append(name) # Range constraints if "range" in field: if "min" in field["range"]: prop["minimum"] = field["range"]["min"] if "max" in field["range"]: prop["maximum"] = field["range"]["max"] # Length constraints if "length" in field: if "min" in field["length"]: prop["minLength"] = field["length"]["min"] if "max" in field["length"]: prop["maxLength"] = field["length"]["max"] # Category values -> enum if etl_type == "category" and "values" in field: prop["enum"] = field["values"] # Date format if etl_type == "date": prop["format"] = "date" # Faker template hints if "faker_template" in field: faker_format_map = { "email": "email", "url": "uri", "hostname": "hostname", "ipv4": "ipv4", "ipv6": "ipv6", "uuid4": "uuid", } if field["faker_template"] in faker_format_map: prop["format"] = faker_format_map[field["faker_template"]] # Description if "_description" in field: prop["description"] = field["_description"] # Title if "_title" in field: prop["title"] = field["_title"] properties[name] = prop property_order.append(name) json_schema: Dict[str, Any] = { "$schema": "https://json-schema.org/draft/2020-12/schema", "type": "object", "properties": properties, "propertyOrder": property_order, } if required: json_schema["required"] = required return json_schema