Skip to content

Schemas

Pydantic configuration schemas

Pydantic schema models for TheStrat configuration validation.

This module provides comprehensive validation schemas that replace all manual validation logic in the Factory class. Models use Pydantic v2 features for maximum performance, type safety, and detailed error reporting.

Classes:

Name Description
AggregationConfig

Complete configuration for aggregation component with market-aware settings.

AssetClassConfig

Configuration for a specific asset class with comprehensive market metadata.

FactoryConfig

Root configuration for Factory.create_all method with complete pipeline setup.

GapDetectionConfig

Configuration for gap detection with comprehensive threshold documentation.

IndicatorSchema

Complete Indicator Schema

IndicatorsConfig

Complete configuration for indicators component with per-timeframe settings.

SwingPointsConfig

Configuration for swing point detection with comprehensive parameter documentation.

TargetConfig

Configuration for multi-target detection.

TimeframeConfig

Configuration and validation for timeframes with comprehensive metadata.

TimeframeItemConfig

Configuration for a single timeframe item with flexible timeframe targeting.

AggregationConfig

Bases: BaseModel

Complete configuration for aggregation component with market-aware settings.

Methods:

Name Description
apply_asset_class_defaults

Apply AssetClassConfig defaults for timezone, hour_boundary, and session_start.

validate_and_expand_target_timeframes

Validate target_timeframes field and expand 'all' keyword.

apply_asset_class_defaults classmethod

apply_asset_class_defaults(data: Any) -> Any

Apply AssetClassConfig defaults for timezone, hour_boundary, and session_start.

Source code in thestrat/schemas.py
@model_validator(mode="before")
@classmethod
def apply_asset_class_defaults(cls, data: Any) -> Any:
    """Apply AssetClassConfig defaults for timezone, hour_boundary, and session_start."""
    if not isinstance(data, dict):
        return data

    # Get asset class, default to "equities"
    asset_class = data.get("asset_class", "equities")
    asset_config = AssetClassConfig.get_config(asset_class)

    # Force UTC timezone for crypto and fx regardless of input
    if asset_class in ["crypto", "fx"]:
        data["timezone"] = "UTC"
    elif "timezone" not in data or data["timezone"] is None:
        data["timezone"] = asset_config.timezone

    # Apply other defaults only if fields are not provided or are None
    if "hour_boundary" not in data or data["hour_boundary"] is None:
        data["hour_boundary"] = asset_config.hour_boundary
    if "session_start" not in data or data["session_start"] is None:
        data["session_start"] = asset_config.session_start

    return data

validate_and_expand_target_timeframes classmethod

validate_and_expand_target_timeframes(
    target_timeframes: list[str],
) -> list[str]

Validate target_timeframes field and expand 'all' keyword.

This Pydantic field validator is automatically called when AggregationConfig is instantiated. It validates individual timeframes and expands ['all'] to all supported timeframes.

Source code in thestrat/schemas.py
@field_validator("target_timeframes")
@classmethod
def validate_and_expand_target_timeframes(cls, target_timeframes: list[str]) -> list[str]:
    """
    Validate target_timeframes field and expand 'all' keyword.

    This Pydantic field validator is automatically called when AggregationConfig
    is instantiated. It validates individual timeframes and expands ['all'] to
    all supported timeframes.
    """
    if not target_timeframes:
        raise ValueError("target_timeframes cannot be empty")

    # Check for 'all' keyword
    if "all" in target_timeframes:
        if len(target_timeframes) > 1:
            raise ValueError("'all' cannot be combined with specific timeframes")
        # Expand to all supported timeframes
        return list(TimeframeConfig.TIMEFRAME_METADATA.keys())

    # Validate each specific timeframe
    for i, timeframe in enumerate(target_timeframes):
        if not isinstance(timeframe, str) or not timeframe.strip():
            raise ValueError(f"target_timeframes[{i}] must be a non-empty string")

        # Validate timeframe format using TimeframeConfig
        if not TimeframeConfig.validate_timeframe(timeframe):
            raise ValueError(f"Invalid timeframe '{timeframe}'")

    return target_timeframes

AssetClassConfig

Bases: BaseModel

Configuration for a specific asset class with comprehensive market metadata.

Methods:

Name Description
get_config

Get configuration for specific asset class.

get_config classmethod

get_config(asset_class: str) -> AssetClassConfig

Get configuration for specific asset class.

Source code in thestrat/schemas.py
@classmethod
def get_config(cls, asset_class: str) -> "AssetClassConfig":
    """Get configuration for specific asset class."""
    return cls.REGISTRY.get(asset_class, cls.REGISTRY["equities"])

FactoryConfig

Bases: BaseModel

Root configuration for Factory.create_all method with complete pipeline setup.

Methods:

Name Description
validate_configuration_consistency

Validate consistency between aggregation and indicators configurations.

validate_configuration_consistency

validate_configuration_consistency() -> Self

Validate consistency between aggregation and indicators configurations.

Source code in thestrat/schemas.py
@model_validator(mode="after")
def validate_configuration_consistency(self) -> Self:
    """Validate consistency between aggregation and indicators configurations."""
    # Future enhancement: Could validate that indicator timeframes are compatible
    # with aggregation target timeframes, but this is not currently enforced
    # in the existing system
    return self

GapDetectionConfig

Bases: BaseModel

Configuration for gap detection with comprehensive threshold documentation.

IndicatorSchema

Bases: BaseModel

Complete Indicator Schema

Defines all columns that are created by TheStrat processing pipeline. All columns are required as the indicators component creates them all.

Methods:

Name Description
get_all_input_columns

Get list of all input columns (required + optional).

get_column_categories

Get columns organized by functional categories.

get_column_descriptions

Get descriptions for all possible DataFrame columns.

get_field_metadata

Get json_schema_extra metadata for a field, safely handling missing data.

get_optional_input_columns

Get list of optional input columns based on schema definition.

get_output_columns

Get list of all output columns based on schema definition.

get_polars_dtypes

Get Polars data types for all DataFrame columns.

get_precision_metadata

Get precision metadata for all fields.

get_required_input_columns

Get list of required input columns based on schema definition.

get_standard_column_order

Get the standard column ordering for aggregation output.

validate_dataframe

Validate input DataFrame columns and data types against IndicatorSchema input requirements.

get_all_input_columns classmethod

get_all_input_columns() -> list[str]

Get list of all input columns (required + optional).

Returns:

Type Description
list[str]

List of all input column names

Source code in thestrat/schemas.py
@classmethod
def get_all_input_columns(cls) -> list[str]:
    """
    Get list of all input columns (required + optional).

    Returns:
        List of all input column names
    """
    return sorted(cls.get_required_input_columns() + cls.get_optional_input_columns())

get_column_categories classmethod

get_column_categories() -> dict[str, list[str]]

Get columns organized by functional categories.

Dynamically extracts categories from the IndicatorSchema metadata.

Returns:

Type Description
dict[str, list[str]]

Dictionary mapping category names to lists of column names

Source code in thestrat/schemas.py
@classmethod
def get_column_categories(cls) -> dict[str, list[str]]:
    """
    Get columns organized by functional categories.

    Dynamically extracts categories from the IndicatorSchema metadata.

    Returns:
        Dictionary mapping category names to lists of column names
    """
    categories: dict[str, list[str]] = {}

    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {})
        if isinstance(json_extra, dict) and "category" in json_extra:
            category = json_extra["category"]
            if category not in categories:
                categories[category] = []
            categories[category].append(field_name)

    # Sort columns within each category for consistent output
    for category in categories:
        categories[category].sort()

    return categories

get_column_descriptions classmethod

get_column_descriptions() -> dict[str, str]

Get descriptions for all possible DataFrame columns.

Returns:

Type Description
dict[str, str]

Dictionary mapping column names to their descriptions

Source code in thestrat/schemas.py
@classmethod
def get_column_descriptions(cls) -> dict[str, str]:
    """
    Get descriptions for all possible DataFrame columns.

    Returns:
        Dictionary mapping column names to their descriptions
    """
    descriptions = {}

    # Get descriptions from the schema
    for field_name, field_info in cls.model_fields.items():
        if field_info.description:
            descriptions[field_name] = field_info.description

    return descriptions

get_field_metadata classmethod

get_field_metadata(field_name: str) -> dict[str, Any]

Get json_schema_extra metadata for a field, safely handling missing data.

Parameters:

Name Type Description Default
field_name str

Name of the field to get metadata for

required

Returns:

Type Description
dict[str, Any]

Dictionary of metadata from json_schema_extra, empty dict if not found

Source code in thestrat/schemas.py
@classmethod
def get_field_metadata(cls, field_name: str) -> dict[str, Any]:
    """
    Get json_schema_extra metadata for a field, safely handling missing data.

    Args:
        field_name: Name of the field to get metadata for

    Returns:
        Dictionary of metadata from json_schema_extra, empty dict if not found
    """
    field_info = cls.model_fields.get(field_name)
    if not field_info:
        return {}
    return getattr(field_info, "json_schema_extra", {}) or {}

get_optional_input_columns classmethod

get_optional_input_columns() -> list[str]

Get list of optional input columns based on schema definition.

Returns:

Type Description
list[str]

List of column names that are optional for input data

Source code in thestrat/schemas.py
@classmethod
def get_optional_input_columns(cls) -> list[str]:
    """
    Get list of optional input columns based on schema definition.

    Returns:
        List of column names that are optional for input data
    """
    from pydantic_core import PydanticUndefined

    optional_columns = []
    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {}) or {}
        if (
            json_extra.get("input") is True  # Is an input column
            and (
                json_extra.get("optional", False)  # Marked as optional
                or getattr(field_info, "default", PydanticUndefined) is not PydanticUndefined
            )  # Has default value
        ):
            optional_columns.append(field_name)
    return sorted(optional_columns)

get_output_columns classmethod

get_output_columns() -> list[str]

Get list of all output columns based on schema definition.

Returns:

Type Description
list[str]

List of column names marked as output in schema metadata

Source code in thestrat/schemas.py
@classmethod
def get_output_columns(cls) -> list[str]:
    """
    Get list of all output columns based on schema definition.

    Returns:
        List of column names marked as output in schema metadata
    """
    output_columns = []
    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {}) or {}
        if json_extra.get("output") is True:
            output_columns.append(field_name)
    return sorted(output_columns)

get_polars_dtypes classmethod

get_polars_dtypes() -> dict[str, Any]

Get Polars data types for all DataFrame columns.

Returns:

Type Description
dict[str, Any]

Dictionary mapping column names to their Polars data types

Source code in thestrat/schemas.py
@classmethod
def get_polars_dtypes(cls) -> dict[str, Any]:
    """
    Get Polars data types for all DataFrame columns.

    Returns:
        Dictionary mapping column names to their Polars data types
    """
    types = {}

    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {})
        if isinstance(json_extra, dict) and "polars_dtype" in json_extra:
            types[field_name] = json_extra["polars_dtype"]

    return types

get_precision_metadata classmethod

get_precision_metadata() -> dict[str, dict[str, Any]]

Get precision metadata for all fields.

Returns:

Type Description
dict[str, dict[str, Any]]

Dict mapping field_name → {'precision_type': str, 'decimal_places': int | None}

Source code in thestrat/schemas.py
@classmethod
def get_precision_metadata(cls) -> dict[str, dict[str, Any]]:
    """
    Get precision metadata for all fields.

    Returns:
        Dict mapping field_name → {'precision_type': str, 'decimal_places': int | None}
    """
    metadata = {}

    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {}) or {}
        if "precision_type" in json_extra:
            metadata[field_name] = {
                "precision_type": json_extra["precision_type"],
                "decimal_places": json_extra.get("decimal_places"),
            }

    return metadata

get_required_input_columns classmethod

get_required_input_columns() -> list[str]

Get list of required input columns based on schema definition.

Returns:

Type Description
list[str]

List of column names that are required for input data

Source code in thestrat/schemas.py
@classmethod
def get_required_input_columns(cls) -> list[str]:
    """
    Get list of required input columns based on schema definition.

    Returns:
        List of column names that are required for input data
    """
    from pydantic_core import PydanticUndefined

    required_columns = []
    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {}) or {}
        if (
            json_extra.get("input") is True  # Is an input column
            and not json_extra.get("optional", False)  # Not marked as optional
            and getattr(field_info, "default", PydanticUndefined)
            is PydanticUndefined  # No default value (required)
        ):
            required_columns.append(field_name)
    return sorted(required_columns)

get_standard_column_order classmethod

get_standard_column_order() -> list[str]

Get the standard column ordering for aggregation output.

Source code in thestrat/schemas.py
@classmethod
def get_standard_column_order(cls) -> list[str]:
    """Get the standard column ordering for aggregation output."""
    return ["timestamp", "symbol", "timeframe", "open", "high", "low", "close", "volume"]

validate_dataframe classmethod

validate_dataframe(df) -> dict[str, Any]

Validate input DataFrame columns and data types against IndicatorSchema input requirements.

Automatically converts Pandas DataFrames to Polars for consistent validation.

Parameters:

Name Type Description Default
df

Polars or Pandas DataFrame to validate

required

Returns:

Type Description
dict[str, Any]

Dictionary with validation results including missing/extra columns, type issues,

dict[str, Any]

and the converted Polars DataFrame if conversion occurred

Source code in thestrat/schemas.py
@classmethod
def validate_dataframe(cls, df) -> dict[str, Any]:
    """
    Validate input DataFrame columns and data types against IndicatorSchema input requirements.

    Automatically converts Pandas DataFrames to Polars for consistent validation.

    Args:
        df: Polars or Pandas DataFrame to validate

    Returns:
        Dictionary with validation results including missing/extra columns, type issues,
        and the converted Polars DataFrame if conversion occurred
    """
    from polars import from_pandas

    # Detect DataFrame type and convert if necessary
    df_type = "unknown"
    converted_df = None
    conversion_errors = []

    if hasattr(df, "columns"):
        # Check if it's a Pandas DataFrame
        if hasattr(df, "dtypes") and not hasattr(df, "schema"):
            df_type = "pandas"
            try:
                # Convert Pandas to Polars
                converted_df = from_pandas(df)
                df = converted_df  # Use converted DataFrame for validation
            except Exception as e:
                conversion_errors.append(f"Failed to convert Pandas to Polars: {str(e)}")
                # Fall back to column-only validation
                df_columns = list(df.columns)
                return {
                    "valid": False,
                    "conversion_error": conversion_errors[0],
                    "df_type": df_type,
                    "columns": df_columns,
                    "message": "Could not convert Pandas DataFrame to Polars for full validation",
                }
        # Check if it's already a Polars DataFrame
        elif hasattr(df, "schema"):
            df_type = "polars"
        else:
            raise ValueError("Unknown DataFrame type - must be Pandas or Polars DataFrame")

        df_columns = list(df.columns)
    else:
        raise ValueError("Input must be a DataFrame with .columns attribute")

    required_fields = []
    optional_fields = []
    expected_types = {}

    # Extract input field requirements from schema
    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {})
        if isinstance(json_extra, dict) and json_extra.get("input"):
            if field_info.is_required():
                required_fields.append(field_name)
            else:
                optional_fields.append(field_name)

            # Store expected Polars type for validation
            if "polars_dtype" in json_extra:
                expected_types[field_name] = json_extra["polars_dtype"]

    # Check for missing and extra columns
    missing_required = [col for col in required_fields if col not in df_columns]
    missing_optional = [col for col in optional_fields if col not in df_columns]
    extra_columns = [col for col in df_columns if col not in required_fields + optional_fields]

    # Check data types for present columns (now guaranteed to be Polars)
    type_issues = []
    if hasattr(df, "schema"):  # Polars DataFrame
        for col_name, expected_type in expected_types.items():
            if col_name in df_columns:
                actual_type = df.schema[col_name]
                if actual_type != expected_type:
                    type_issues.append(
                        {
                            "column": col_name,
                            "expected": expected_type.__name__
                            if hasattr(expected_type, "__name__")
                            else str(expected_type),
                            "actual": str(actual_type),
                        }
                    )

    result = {
        "valid": len(missing_required) == 0 and len(type_issues) == 0,
        "missing_required": missing_required,
        "missing_optional": missing_optional,
        "extra_columns": extra_columns,
        "type_issues": type_issues,
        "required_fields": required_fields,
        "optional_fields": optional_fields,
        "expected_types": {k: v.__name__ if hasattr(v, "__name__") else str(v) for k, v in expected_types.items()},
        "df_type": df_type,
    }

    # Include converted DataFrame if conversion occurred
    if converted_df is not None:
        result["converted_df"] = converted_df
        result["conversion_performed"] = True
    else:
        result["conversion_performed"] = False

    return result

IndicatorsConfig

Bases: BaseModel

Complete configuration for indicators component with per-timeframe settings.

SwingPointsConfig

Bases: BaseModel

Configuration for swing point detection with comprehensive parameter documentation.

TargetConfig

Bases: BaseModel

Configuration for multi-target detection.

TimeframeConfig

Bases: BaseModel

Configuration and validation for timeframes with comprehensive metadata.

Methods:

Name Description
get_optimal_source_timeframe

Get optimal source timeframe for aggregating to target.

get_polars_format

Get the Polars format for a timeframe.

validate_timeframe

Validate that the timeframe is supported (strict mode only).

get_optimal_source_timeframe classmethod

get_optimal_source_timeframe(
    target_timeframe: str, available_timeframes: list[str]
) -> str | None

Get optimal source timeframe for aggregating to target.

Parameters:

Name Type Description Default
target_timeframe str

Target timeframe to aggregate to

required
available_timeframes list[str]

List of available source timeframes

required

Returns:

Type Description
str | None

Optimal source timeframe or None if target already exists or no valid source

Source code in thestrat/schemas.py
@classmethod
def get_optimal_source_timeframe(cls, target_timeframe: str, available_timeframes: list[str]) -> str | None:
    """
    Get optimal source timeframe for aggregating to target.

    Args:
        target_timeframe: Target timeframe to aggregate to
        available_timeframes: List of available source timeframes

    Returns:
        Optimal source timeframe or None if target already exists or no valid source
    """
    # If target exists, use it directly (pass-through)
    if target_timeframe in available_timeframes:
        return target_timeframe

    target_metadata = cls.TIMEFRAME_METADATA.get(target_timeframe)
    if not target_metadata:
        return None

    target_seconds = target_metadata["seconds"]

    # Find all mathematically valid sources (those that divide evenly into target)
    valid_sources = []
    for source_tf in available_timeframes:
        source_metadata = cls.TIMEFRAME_METADATA.get(source_tf)
        if source_metadata:
            source_seconds = source_metadata["seconds"]
            if target_seconds % source_seconds == 0:
                valid_sources.append((source_tf, source_seconds))

    if not valid_sources:
        return None

    # Return the source with the largest duration (minimize aggregation operations)
    return max(valid_sources, key=lambda x: x[1])[0]

get_polars_format classmethod

get_polars_format(timeframe: str) -> str

Get the Polars format for a timeframe.

Source code in thestrat/schemas.py
@classmethod
def get_polars_format(cls, timeframe: str) -> str:
    """Get the Polars format for a timeframe."""
    metadata = cls.TIMEFRAME_METADATA.get(timeframe)
    if metadata:
        return metadata.get("polars_format", timeframe)
    return timeframe

validate_timeframe classmethod

validate_timeframe(timeframe: str) -> bool

Validate that the timeframe is supported (strict mode only).

Source code in thestrat/schemas.py
@classmethod
def validate_timeframe(cls, timeframe: str) -> bool:
    """Validate that the timeframe is supported (strict mode only)."""
    return timeframe in cls.TIMEFRAME_METADATA

TimeframeItemConfig

Bases: BaseModel

Configuration for a single timeframe item with flexible timeframe targeting.

Methods:

Name Description
validate_timeframe_combinations

Validate that 'all' is not mixed with specific timeframes.

validate_timeframe_combinations

validate_timeframe_combinations() -> Self

Validate that 'all' is not mixed with specific timeframes.

Source code in thestrat/schemas.py
@model_validator(mode="after")
def validate_timeframe_combinations(self) -> Self:
    """Validate that 'all' is not mixed with specific timeframes."""
    if "all" in self.timeframes and len(self.timeframes) > 1:
        raise ValueError("'all' cannot be combined with specific timeframes")
    return self