Skip to content

Schemas

Pydantic configuration schemas

Pydantic schema models for TheStrat configuration validation.

This module provides comprehensive validation schemas that replace all manual validation logic in the Factory class. Models use Pydantic v2 features for maximum performance, type safety, and detailed error reporting.

Classes:

Name Description
AggregationConfig

Complete configuration for aggregation component with market-aware settings.

AssetClassConfig

Configuration for a specific asset class with comprehensive market metadata.

FactoryConfig

Root configuration for Factory.create_all method with complete pipeline setup.

GapDetectionConfig

Configuration for gap detection with comprehensive threshold documentation.

IndicatorSchema

Complete Indicator Schema

IndicatorsConfig

Complete configuration for indicators component with per-timeframe settings.

SchemaDocGenerator

Generate comprehensive documentation from Pydantic schemas.

SwingPointsConfig

Configuration for swing point detection with comprehensive parameter documentation.

TimeframeConfig

Configuration and validation for timeframes with comprehensive metadata.

TimeframeItemConfig

Configuration for a single timeframe item with flexible timeframe targeting.

AggregationConfig

Bases: BaseModel

Complete configuration for aggregation component with market-aware settings.

Methods:

Name Description
apply_asset_class_defaults

Apply AssetClassConfig defaults for timezone, hour_boundary, and session_start.

validate_and_expand_target_timeframes

Validate target_timeframes field and expand 'all' keyword.

apply_asset_class_defaults classmethod

apply_asset_class_defaults(data: Any) -> Any

Apply AssetClassConfig defaults for timezone, hour_boundary, and session_start.

Source code in thestrat/schemas.py
@model_validator(mode="before")
@classmethod
def apply_asset_class_defaults(cls, data: Any) -> Any:
    """Apply AssetClassConfig defaults for timezone, hour_boundary, and session_start."""
    if not isinstance(data, dict):
        return data

    # Get asset class, default to "equities"
    asset_class = data.get("asset_class", "equities")
    asset_config = AssetClassConfig.get_config(asset_class)

    # Force UTC timezone for crypto and fx regardless of input
    if asset_class in ["crypto", "fx"]:
        data["timezone"] = "UTC"
    elif "timezone" not in data or data["timezone"] is None:
        data["timezone"] = asset_config.timezone

    # Apply other defaults only if fields are not provided or are None
    if "hour_boundary" not in data or data["hour_boundary"] is None:
        data["hour_boundary"] = asset_config.hour_boundary
    if "session_start" not in data or data["session_start"] is None:
        data["session_start"] = asset_config.session_start

    return data

validate_and_expand_target_timeframes classmethod

validate_and_expand_target_timeframes(
    target_timeframes: list[str],
) -> list[str]

Validate target_timeframes field and expand 'all' keyword.

This Pydantic field validator is automatically called when AggregationConfig is instantiated. It validates individual timeframes and expands ['all'] to all supported timeframes.

Source code in thestrat/schemas.py
@field_validator("target_timeframes")
@classmethod
def validate_and_expand_target_timeframes(cls, target_timeframes: list[str]) -> list[str]:
    """
    Validate target_timeframes field and expand 'all' keyword.

    This Pydantic field validator is automatically called when AggregationConfig
    is instantiated. It validates individual timeframes and expands ['all'] to
    all supported timeframes.
    """
    if not target_timeframes:
        raise ValueError("target_timeframes cannot be empty")

    # Check for 'all' keyword
    if "all" in target_timeframes:
        if len(target_timeframes) > 1:
            raise ValueError("'all' cannot be combined with specific timeframes")
        # Expand to all supported timeframes
        return list(TimeframeConfig.TIMEFRAME_METADATA.keys())

    # Validate each specific timeframe
    for i, timeframe in enumerate(target_timeframes):
        if not isinstance(timeframe, str) or not timeframe.strip():
            raise ValueError(f"target_timeframes[{i}] must be a non-empty string")

        # Validate timeframe format using TimeframeConfig
        if not TimeframeConfig.validate_timeframe(timeframe):
            raise ValueError(f"Invalid timeframe '{timeframe}'")

    return target_timeframes

AssetClassConfig

Bases: BaseModel

Configuration for a specific asset class with comprehensive market metadata.

Methods:

Name Description
get_config

Get configuration for specific asset class.

get_config classmethod

get_config(asset_class: str) -> AssetClassConfig

Get configuration for specific asset class.

Source code in thestrat/schemas.py
@classmethod
def get_config(cls, asset_class: str) -> "AssetClassConfig":
    """Get configuration for specific asset class."""
    return cls.REGISTRY.get(asset_class, cls.REGISTRY["equities"])

FactoryConfig

Bases: BaseModel

Root configuration for Factory.create_all method with complete pipeline setup.

Methods:

Name Description
validate_configuration_consistency

Validate consistency between aggregation and indicators configurations.

validate_configuration_consistency

validate_configuration_consistency() -> Self

Validate consistency between aggregation and indicators configurations.

Source code in thestrat/schemas.py
@model_validator(mode="after")
def validate_configuration_consistency(self) -> Self:
    """Validate consistency between aggregation and indicators configurations."""
    # Future enhancement: Could validate that indicator timeframes are compatible
    # with aggregation target timeframes, but this is not currently enforced
    # in the existing system
    return self

GapDetectionConfig

Bases: BaseModel

Configuration for gap detection with comprehensive threshold documentation.

IndicatorSchema

Bases: BaseModel

Complete Indicator Schema

Defines all columns that are created by TheStrat processing pipeline. All columns are required as the indicators component creates them all.

Methods:

Name Description
get_all_input_columns

Get list of all input columns (required + optional).

get_column_categories

Get columns organized by functional categories.

get_column_descriptions

Get descriptions for all possible DataFrame columns.

get_optional_input_columns

Get list of optional input columns based on schema definition.

get_polars_dtypes

Get Polars data types for all DataFrame columns.

get_required_input_columns

Get list of required input columns based on schema definition.

validate_dataframe

Validate input DataFrame columns and data types against IndicatorSchema input requirements.

get_all_input_columns classmethod

get_all_input_columns() -> list[str]

Get list of all input columns (required + optional).

Returns:

Type Description
list[str]

List of all input column names

Source code in thestrat/schemas.py
@classmethod
def get_all_input_columns(cls) -> list[str]:
    """
    Get list of all input columns (required + optional).

    Returns:
        List of all input column names
    """
    return sorted(cls.get_required_input_columns() + cls.get_optional_input_columns())

get_column_categories classmethod

get_column_categories() -> dict[str, list[str]]

Get columns organized by functional categories.

Dynamically extracts categories from the IndicatorSchema metadata.

Returns:

Type Description
dict[str, list[str]]

Dictionary mapping category names to lists of column names

Source code in thestrat/schemas.py
@classmethod
def get_column_categories(cls) -> dict[str, list[str]]:
    """
    Get columns organized by functional categories.

    Dynamically extracts categories from the IndicatorSchema metadata.

    Returns:
        Dictionary mapping category names to lists of column names
    """
    categories: dict[str, list[str]] = {}

    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {})
        if isinstance(json_extra, dict) and "category" in json_extra:
            category = json_extra["category"]
            if category not in categories:
                categories[category] = []
            categories[category].append(field_name)

    # Sort columns within each category for consistent output
    for category in categories:
        categories[category].sort()

    return categories

get_column_descriptions classmethod

get_column_descriptions() -> dict[str, str]

Get descriptions for all possible DataFrame columns.

Returns:

Type Description
dict[str, str]

Dictionary mapping column names to their descriptions

Source code in thestrat/schemas.py
@classmethod
def get_column_descriptions(cls) -> dict[str, str]:
    """
    Get descriptions for all possible DataFrame columns.

    Returns:
        Dictionary mapping column names to their descriptions
    """
    descriptions = {}

    # Get descriptions from the schema
    for field_name, field_info in cls.model_fields.items():
        if field_info.description:
            descriptions[field_name] = field_info.description

    return descriptions

get_optional_input_columns classmethod

get_optional_input_columns() -> list[str]

Get list of optional input columns based on schema definition.

Returns:

Type Description
list[str]

List of column names that are optional for input data

Source code in thestrat/schemas.py
@classmethod
def get_optional_input_columns(cls) -> list[str]:
    """
    Get list of optional input columns based on schema definition.

    Returns:
        List of column names that are optional for input data
    """
    from pydantic_core import PydanticUndefined

    optional_columns = []
    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {}) or {}
        if (
            json_extra.get("input") is True  # Is an input column
            and (
                json_extra.get("optional", False)  # Marked as optional
                or getattr(field_info, "default", PydanticUndefined) is not PydanticUndefined
            )  # Has default value
        ):
            optional_columns.append(field_name)
    return sorted(optional_columns)

get_polars_dtypes classmethod

get_polars_dtypes() -> dict[str, Any]

Get Polars data types for all DataFrame columns.

Returns:

Type Description
dict[str, Any]

Dictionary mapping column names to their Polars data types

Source code in thestrat/schemas.py
@classmethod
def get_polars_dtypes(cls) -> dict[str, Any]:
    """
    Get Polars data types for all DataFrame columns.

    Returns:
        Dictionary mapping column names to their Polars data types
    """
    types = {}

    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {})
        if isinstance(json_extra, dict) and "polars_dtype" in json_extra:
            types[field_name] = json_extra["polars_dtype"]

    return types

get_required_input_columns classmethod

get_required_input_columns() -> list[str]

Get list of required input columns based on schema definition.

Returns:

Type Description
list[str]

List of column names that are required for input data

Source code in thestrat/schemas.py
@classmethod
def get_required_input_columns(cls) -> list[str]:
    """
    Get list of required input columns based on schema definition.

    Returns:
        List of column names that are required for input data
    """
    from pydantic_core import PydanticUndefined

    required_columns = []
    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {}) or {}
        if (
            json_extra.get("input") is True  # Is an input column
            and not json_extra.get("optional", False)  # Not marked as optional
            and getattr(field_info, "default", PydanticUndefined)
            is PydanticUndefined  # No default value (required)
        ):
            required_columns.append(field_name)
    return sorted(required_columns)

validate_dataframe classmethod

validate_dataframe(df) -> dict[str, Any]

Validate input DataFrame columns and data types against IndicatorSchema input requirements.

Automatically converts Pandas DataFrames to Polars for consistent validation.

Parameters:

Name Type Description Default
df

Polars or Pandas DataFrame to validate

required

Returns:

Type Description
dict[str, Any]

Dictionary with validation results including missing/extra columns, type issues,

dict[str, Any]

and the converted Polars DataFrame if conversion occurred

Source code in thestrat/schemas.py
@classmethod
def validate_dataframe(cls, df) -> dict[str, Any]:
    """
    Validate input DataFrame columns and data types against IndicatorSchema input requirements.

    Automatically converts Pandas DataFrames to Polars for consistent validation.

    Args:
        df: Polars or Pandas DataFrame to validate

    Returns:
        Dictionary with validation results including missing/extra columns, type issues,
        and the converted Polars DataFrame if conversion occurred
    """
    from polars import from_pandas

    # Detect DataFrame type and convert if necessary
    df_type = "unknown"
    converted_df = None
    conversion_errors = []

    if hasattr(df, "columns"):
        # Check if it's a Pandas DataFrame
        if hasattr(df, "dtypes") and not hasattr(df, "schema"):
            df_type = "pandas"
            try:
                # Convert Pandas to Polars
                converted_df = from_pandas(df)
                df = converted_df  # Use converted DataFrame for validation
            except Exception as e:
                conversion_errors.append(f"Failed to convert Pandas to Polars: {str(e)}")
                # Fall back to column-only validation
                df_columns = list(df.columns)
                return {
                    "valid": False,
                    "conversion_error": conversion_errors[0],
                    "df_type": df_type,
                    "columns": df_columns,
                    "message": "Could not convert Pandas DataFrame to Polars for full validation",
                }
        # Check if it's already a Polars DataFrame
        elif hasattr(df, "schema"):
            df_type = "polars"
        else:
            raise ValueError("Unknown DataFrame type - must be Pandas or Polars DataFrame")

        df_columns = list(df.columns)
    else:
        raise ValueError("Input must be a DataFrame with .columns attribute")

    required_fields = []
    optional_fields = []
    expected_types = {}

    # Extract input field requirements from schema
    for field_name, field_info in cls.model_fields.items():
        json_extra = getattr(field_info, "json_schema_extra", {})
        if isinstance(json_extra, dict) and json_extra.get("input"):
            if field_info.is_required():
                required_fields.append(field_name)
            else:
                optional_fields.append(field_name)

            # Store expected Polars type for validation
            if "polars_dtype" in json_extra:
                expected_types[field_name] = json_extra["polars_dtype"]

    # Check for missing and extra columns
    missing_required = [col for col in required_fields if col not in df_columns]
    missing_optional = [col for col in optional_fields if col not in df_columns]
    extra_columns = [col for col in df_columns if col not in required_fields + optional_fields]

    # Check data types for present columns (now guaranteed to be Polars)
    type_issues = []
    if hasattr(df, "schema"):  # Polars DataFrame
        for col_name, expected_type in expected_types.items():
            if col_name in df_columns:
                actual_type = df.schema[col_name]
                if actual_type != expected_type:
                    type_issues.append(
                        {
                            "column": col_name,
                            "expected": expected_type.__name__
                            if hasattr(expected_type, "__name__")
                            else str(expected_type),
                            "actual": str(actual_type),
                        }
                    )

    result = {
        "valid": len(missing_required) == 0 and len(type_issues) == 0,
        "missing_required": missing_required,
        "missing_optional": missing_optional,
        "extra_columns": extra_columns,
        "type_issues": type_issues,
        "required_fields": required_fields,
        "optional_fields": optional_fields,
        "expected_types": {k: v.__name__ if hasattr(v, "__name__") else str(v) for k, v in expected_types.items()},
        "df_type": df_type,
    }

    # Include converted DataFrame if conversion occurred
    if converted_df is not None:
        result["converted_df"] = converted_df
        result["conversion_performed"] = True
    else:
        result["conversion_performed"] = False

    return result

IndicatorsConfig

Bases: BaseModel

Complete configuration for indicators component with per-timeframe settings.

SchemaDocGenerator

Generate comprehensive documentation from Pydantic schemas.

Methods:

Name Description
export_json_schemas

Export JSON schemas for all models (useful for OpenAPI/AsyncAPI generation).

generate_all_model_docs

Generate documentation for all schema models.

generate_complete_documentation

Generate complete markdown documentation for all schema models.

generate_field_docs

Extract comprehensive field documentation from a Pydantic model.

generate_json_schema

Generate JSON Schema for a Pydantic model with all metadata.

generate_markdown_table

Generate a markdown table for a Pydantic model's fields.

export_json_schemas staticmethod

export_json_schemas() -> dict[str, dict[str, Any]]

Export JSON schemas for all models (useful for OpenAPI/AsyncAPI generation).

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary mapping model names to their JSON schemas

Source code in thestrat/schemas.py
@staticmethod
def export_json_schemas() -> dict[str, dict[str, Any]]:
    """
    Export JSON schemas for all models (useful for OpenAPI/AsyncAPI generation).

    Returns:
        Dictionary mapping model names to their JSON schemas
    """
    docs = SchemaDocGenerator.generate_all_model_docs()
    return {name: doc_info["json_schema"] for name, doc_info in docs.items()}

generate_all_model_docs staticmethod

generate_all_model_docs() -> dict[str, dict[str, Any]]

Generate documentation for all schema models.

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary mapping model names to their complete documentation

Source code in thestrat/schemas.py
@staticmethod
def generate_all_model_docs() -> dict[str, dict[str, Any]]:
    """
    Generate documentation for all schema models.

    Returns:
        Dictionary mapping model names to their complete documentation
    """
    models = [
        AssetClassConfig,
        TimeframeConfig,
        SwingPointsConfig,
        GapDetectionConfig,
        TimeframeItemConfig,
        IndicatorsConfig,
        AggregationConfig,
        FactoryConfig,
        IndicatorSchema,
    ]

    all_docs = {}
    for model in models:
        all_docs[model.__name__] = {
            "class_doc": model.__doc__,
            "field_docs": SchemaDocGenerator.generate_field_docs(model),
            "markdown": SchemaDocGenerator.generate_markdown_table(model),
            "json_schema": SchemaDocGenerator.generate_json_schema(model),
        }

    return all_docs

generate_complete_documentation staticmethod

generate_complete_documentation() -> str

Generate complete markdown documentation for all schema models.

Returns:

Type Description
str

Complete markdown documentation string

Source code in thestrat/schemas.py
@staticmethod
def generate_complete_documentation() -> str:
    """
    Generate complete markdown documentation for all schema models.

    Returns:
        Complete markdown documentation string
    """
    docs = SchemaDocGenerator.generate_all_model_docs()

    markdown = "# TheStrat Schema Documentation\n\n"
    markdown += "Auto-generated documentation for all Pydantic configuration models.\n\n"
    markdown += "---\n\n"

    for model_docs in docs.values():
        markdown += model_docs["markdown"] + "\n\n"

    return markdown

generate_field_docs staticmethod

generate_field_docs(model: type[BaseModel]) -> dict[str, dict[str, Any]]

Extract comprehensive field documentation from a Pydantic model.

Parameters:

Name Type Description Default
model type[BaseModel]

Pydantic model class to document

required

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary mapping field names to their documentation metadata

Source code in thestrat/schemas.py
@staticmethod
def generate_field_docs(model: type[BaseModel]) -> dict[str, dict[str, Any]]:
    """
    Extract comprehensive field documentation from a Pydantic model.

    Args:
        model: Pydantic model class to document

    Returns:
        Dictionary mapping field names to their documentation metadata
    """
    field_docs = {}

    for field_name, field_info in model.model_fields.items():
        field_doc = {
            "name": field_name,
            "type": SchemaDocGenerator._get_type_string(field_info.annotation),
            "description": field_info.description or "No description provided",
            "default": SchemaDocGenerator._format_default(field_info.default),
            "required": field_info.is_required(),
            "examples": getattr(field_info, "examples", []),
        }

        # Add validation constraints
        constraints = SchemaDocGenerator._extract_constraints(field_info)
        if constraints:
            field_doc["constraints"] = constraints

        # Add json_schema_extra information (convert Polars types to strings)
        if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra:
            metadata = field_info.json_schema_extra.copy()
            # Convert Polars data types to string representations for serialization
            if "polars_dtype" in metadata:
                polars_type = metadata["polars_dtype"]
                # Convert Polars type to string representation for JSON serialization
                try:
                    metadata["polars_dtype"] = getattr(polars_type, "__name__", str(polars_type))
                except (AttributeError, TypeError):
                    metadata["polars_dtype"] = str(polars_type)
            field_doc["metadata"] = metadata

        field_docs[field_name] = field_doc

    return field_docs

generate_json_schema staticmethod

generate_json_schema(model: type[BaseModel]) -> dict[str, Any]

Generate JSON Schema for a Pydantic model with all metadata.

Parameters:

Name Type Description Default
model type[BaseModel]

Pydantic model class

required

Returns:

Type Description
dict[str, Any]

JSON Schema dictionary with enhanced metadata

Source code in thestrat/schemas.py
@staticmethod
def generate_json_schema(model: type[BaseModel]) -> dict[str, Any]:
    """
    Generate JSON Schema for a Pydantic model with all metadata.

    Args:
        model: Pydantic model class

    Returns:
        JSON Schema dictionary with enhanced metadata
    """
    # Special handling for IndicatorSchema which contains non-serializable Polars types
    if hasattr(model, "__name__") and model.__name__ == "IndicatorSchema":
        # Generate a basic JSON schema structure manually for IndicatorSchema
        schema = {
            "type": "object",
            "title": model.__name__,
            "description": model.__doc__ or f"Schema for {model.__name__}",
            "properties": {},
            "required": [],
        }

        # Add properties from field documentation (which has serializable metadata)
        field_docs = SchemaDocGenerator.generate_field_docs(model)

        for field_name, field_doc in field_docs.items():
            prop_schema = {
                "type": SchemaDocGenerator._pydantic_type_to_json_type(field_doc["type"]),
                "description": field_doc["description"],
            }

            # Add metadata (already converted to strings)
            if "metadata" in field_doc:
                prop_schema.update(field_doc["metadata"])

            # Add examples if available
            if field_doc["examples"]:
                prop_schema["examples"] = field_doc["examples"]

            schema["properties"][field_name] = prop_schema

            # Add to required if field is required
            if field_doc["required"]:
                schema["required"].append(field_name)

        return schema
    else:
        # Use standard Pydantic JSON schema generation for other models
        schema = model.model_json_schema()

        # Enhance with field documentation
        field_docs = SchemaDocGenerator.generate_field_docs(model)

        if "properties" in schema:
            for field_name, field_schema in schema["properties"].items():
                if field_name in field_docs:
                    field_doc = field_docs[field_name]

                    # Add examples if available
                    if field_doc["examples"]:
                        field_schema["examples"] = field_doc["examples"]

                    # Add metadata (already converted to strings in generate_field_docs)
                    if "metadata" in field_doc:
                        field_schema.update(field_doc["metadata"])

        return schema

generate_markdown_table staticmethod

generate_markdown_table(model: type[BaseModel]) -> str

Generate a markdown table for a Pydantic model's fields.

Parameters:

Name Type Description Default
model type[BaseModel]

Pydantic model class to document

required

Returns:

Type Description
str

Markdown table string

Source code in thestrat/schemas.py
@staticmethod
def generate_markdown_table(model: type[BaseModel]) -> str:
    """
    Generate a markdown table for a Pydantic model's fields.

    Args:
        model: Pydantic model class to document

    Returns:
        Markdown table string
    """
    field_docs = SchemaDocGenerator.generate_field_docs(model)

    # Table header
    markdown = f"## {model.__name__}\n\n"
    markdown += f"{model.__doc__ or 'Configuration model'}\n\n"
    markdown += "| Field | Type | Default | Description |\n"
    markdown += "|-------|------|---------|-------------|\n"

    # Table rows
    for field_name, field_doc in field_docs.items():
        type_str = field_doc["type"]
        default_str = field_doc["default"]
        desc_str = field_doc["description"]

        # Add constraints to description if present
        if "constraints" in field_doc:
            constraints = field_doc["constraints"]
            constraint_parts = []
            if "min_length" in constraints:
                constraint_parts.append(f"min length: {constraints['min_length']}")
            if "max_length" in constraints:
                constraint_parts.append(f"max length: {constraints['max_length']}")
            if "ge" in constraints:
                constraint_parts.append(f">= {constraints['ge']}")
            if "le" in constraints:
                constraint_parts.append(f"<= {constraints['le']}")
            if "pattern" in constraints:
                constraint_parts.append(f"pattern: `{constraints['pattern']}`")

            if constraint_parts:
                desc_str += f" ({', '.join(constraint_parts)})"

        markdown += f"| `{field_name}` | {type_str} | {default_str} | {desc_str} |\n"

    # Add ClassVar information if present
    classvars = SchemaDocGenerator._get_classvars(model)
    if classvars:
        markdown += "\n### Class Constants\n\n"
        for var_name, var_info in classvars.items():
            markdown += f"- **`{var_name}`**: {var_info['description']}\n"
            if var_info.get("keys"):
                markdown += f"  - Available keys: {', '.join(var_info['keys'])}\n"

            # Add detailed configuration tables for registries
            if var_name == "REGISTRY" and var_info.get("detailed_configs"):
                markdown += "\n#### Configured Asset Classes\n\n"
                for config_name, config_details in var_info["detailed_configs"].items():
                    markdown += f"**{config_name}**\n"
                    markdown += f"- Trading Hours: {config_details.get('trading_hours', 'N/A')}\n"
                    markdown += f"- Timezone: `{config_details.get('timezone', 'N/A')}`\n"
                    markdown += f"- Session Start: `{config_details.get('session_start', 'N/A')}`\n"
                    markdown += f"- Hour Boundary: `{config_details.get('hour_boundary', 'N/A')}`\n"
                    markdown += "\n"

            # Add detailed timeframe metadata tables
            elif var_name == "TIMEFRAME_METADATA" and var_info.get("detailed_metadata"):
                markdown += "\n#### Timeframe Details\n\n"
                markdown += "| Timeframe | Category | Duration | Description | Typical Use |\n"
                markdown += "|-----------|----------|----------|-------------|-------------|\n"

                for tf_name, tf_details in var_info["detailed_metadata"].items():
                    category = tf_details.get("category", "N/A")
                    seconds = tf_details.get("seconds", 0)
                    description = tf_details.get("description", "N/A")
                    typical_use = tf_details.get("typical_use", "N/A")

                    # Convert seconds to readable duration
                    if seconds < 3600:
                        duration = f"{seconds // 60}m"
                    elif seconds < 86400:
                        duration = f"{seconds // 3600}h"
                    elif seconds < 604800:
                        duration = f"{seconds // 86400}d"
                    else:
                        duration = f"{seconds // 604800}w"

                    markdown += f"| `{tf_name}` | {category} | {duration} | {description} | {typical_use} |\n"
                markdown += "\n"

    return markdown

SwingPointsConfig

Bases: BaseModel

Configuration for swing point detection with comprehensive parameter documentation.

TimeframeConfig

Bases: BaseModel

Configuration and validation for timeframes with comprehensive metadata.

Methods:

Name Description
get_optimal_source_timeframe

Get optimal source timeframe for aggregating to target.

get_polars_format

Get the Polars format for a timeframe.

validate_timeframe

Validate that the timeframe is supported (strict mode only).

get_optimal_source_timeframe classmethod

get_optimal_source_timeframe(
    target_timeframe: str, available_timeframes: list[str]
) -> str | None

Get optimal source timeframe for aggregating to target.

Parameters:

Name Type Description Default
target_timeframe str

Target timeframe to aggregate to

required
available_timeframes list[str]

List of available source timeframes

required

Returns:

Type Description
str | None

Optimal source timeframe or None if target already exists or no valid source

Source code in thestrat/schemas.py
@classmethod
def get_optimal_source_timeframe(cls, target_timeframe: str, available_timeframes: list[str]) -> str | None:
    """
    Get optimal source timeframe for aggregating to target.

    Args:
        target_timeframe: Target timeframe to aggregate to
        available_timeframes: List of available source timeframes

    Returns:
        Optimal source timeframe or None if target already exists or no valid source
    """
    # If target exists, use it directly (pass-through)
    if target_timeframe in available_timeframes:
        return target_timeframe

    target_metadata = cls.TIMEFRAME_METADATA.get(target_timeframe)
    if not target_metadata:
        return None

    target_seconds = target_metadata["seconds"]

    # Find all mathematically valid sources (those that divide evenly into target)
    valid_sources = []
    for source_tf in available_timeframes:
        source_metadata = cls.TIMEFRAME_METADATA.get(source_tf)
        if source_metadata:
            source_seconds = source_metadata["seconds"]
            if target_seconds % source_seconds == 0:
                valid_sources.append((source_tf, source_seconds))

    if not valid_sources:
        return None

    # Return the source with the largest duration (minimize aggregation operations)
    return max(valid_sources, key=lambda x: x[1])[0]

get_polars_format classmethod

get_polars_format(timeframe: str) -> str

Get the Polars format for a timeframe.

Source code in thestrat/schemas.py
@classmethod
def get_polars_format(cls, timeframe: str) -> str:
    """Get the Polars format for a timeframe."""
    metadata = cls.TIMEFRAME_METADATA.get(timeframe)
    if metadata:
        return metadata.get("polars_format", timeframe)
    return timeframe

validate_timeframe classmethod

validate_timeframe(timeframe: str) -> bool

Validate that the timeframe is supported (strict mode only).

Source code in thestrat/schemas.py
@classmethod
def validate_timeframe(cls, timeframe: str) -> bool:
    """Validate that the timeframe is supported (strict mode only)."""
    return timeframe in cls.TIMEFRAME_METADATA

TimeframeItemConfig

Bases: BaseModel

Configuration for a single timeframe item with flexible timeframe targeting.

Methods:

Name Description
validate_timeframe_combinations

Validate that 'all' is not mixed with specific timeframes.

validate_timeframe_combinations

validate_timeframe_combinations() -> Self

Validate that 'all' is not mixed with specific timeframes.

Source code in thestrat/schemas.py
@model_validator(mode="after")
def validate_timeframe_combinations(self) -> Self:
    """Validate that 'all' is not mixed with specific timeframes."""
    if "all" in self.timeframes and len(self.timeframes) > 1:
        raise ValueError("'all' cannot be combined with specific timeframes")
    return self