Database Integration Guide
The IndicatorSchema class provides essential schema information for integrating TheStrat output with databases and validation systems. This guide shows how to use the schema to create database tables, validate data, and ensure consistent integration.
Quick Start
from thestrat import IndicatorSchema
from polars import DataFrame
from datetime import datetime
# Validate input DataFrame
data = {
"timestamp": [datetime.now()],
"open": [100.0], "high": [105.0], "low": [95.0], "close": [102.0],
"symbol": ["AAPL"], "volume": [1000000.0], "timeframe": ["5min"]
}
df = DataFrame(data, schema=IndicatorSchema.get_polars_dtypes())
result = IndicatorSchema.validate_dataframe(df)
print(f"Valid: {result['valid']}")
print(f"Missing columns: {result['missing_required']}")
Database Schema Generation
SQL Table Creation
# Get column types and descriptions
descriptions = IndicatorSchema.get_column_descriptions()
polars_types = IndicatorSchema.get_polars_dtypes()
# Map Polars types to SQL types
from polars import Datetime, Float64, String, Boolean, Int32
type_mapping = {
Datetime: "TIMESTAMP",
Float64: "DOUBLE PRECISION",
String: "VARCHAR(50)",
Boolean: "BOOLEAN",
Int32: "INTEGER"
}
# Generate CREATE TABLE statement
def generate_sql_schema(table_name: str) -> str:
lines = [f"CREATE TABLE {table_name} ("]
for col, polars_type in polars_types.items():
sql_type = type_mapping.get(polars_type, "TEXT")
description = descriptions.get(col, "").replace("'", "''")
lines.append(f" {col} {sql_type}, -- {description}")
lines.append(" PRIMARY KEY (timestamp, symbol, timeframe)")
lines.append(");")
return "\n".join(lines)
schema_sql = generate_sql_schema("thestrat_indicators")
Column Categories
Organize columns by functionality for targeted database operations:
categories = IndicatorSchema.get_column_categories()
# Create separate tables by category
for category, columns in categories.items():
if category == "base_ohlc":
# Core market data table
create_base_table(columns)
elif category == "signals":
# Trading signals table with indexes
create_signals_table(columns)
elif category == "market_structure":
# Market structure analysis table
create_analysis_table(columns)
Input Validation
Required Columns Check
def validate_input_data(df) -> dict:
"""Validate DataFrame before processing."""
result = IndicatorSchema.validate_dataframe(df)
if not result['valid']:
errors = []
if result['missing_required']:
errors.append(f"Missing: {result['missing_required']}")
if result['type_issues']:
errors.append(f"Type errors: {result['type_issues']}")
raise ValueError(f"Invalid DataFrame: {'; '.join(errors)}")
return result['converted_df'] if result['conversion_performed'] else df
Auto-conversion from Pandas
from pandas import DataFrame as PandasDataFrame
# Pandas DataFrame automatically converts
df_pandas = PandasDataFrame(data)
df_pandas['timestamp'] = pd.to_datetime(df_pandas['timestamp'])
result = IndicatorSchema.validate_dataframe(df_pandas)
# result['converted_df'] contains Polars DataFrame
Column Documentation
Get Field Information
# Column descriptions for documentation
descriptions = IndicatorSchema.get_column_descriptions()
polars_types = IndicatorSchema.get_polars_dtypes()
# Generate documentation
for col in ["swing_high", "continuity", "signal"]:
print(f"**{col}**: {descriptions[col]}")
print(f"Type: `{polars_types[col].__name__}`\n")
Category-based Operations
categories = IndicatorSchema.get_column_categories()
# Process only price analysis columns
price_cols = categories['price_analysis']
df_prices = df.select(price_cols)
# Extract signal columns for trading system
signal_cols = categories['signals']
df_signals = df.select(signal_cols)
Advanced SQL Schema Generation
Generate complete SQL DDL with nullable constraints by examining the schema metadata:
from thestrat.schemas import IndicatorSchema
# Get schema information
descriptions = IndicatorSchema.get_column_descriptions()
polars_types = IndicatorSchema.get_polars_dtypes()
def generate_sql_with_constraints(table_name: str) -> str:
"""Generate SQL schema with proper NULL/NOT NULL constraints."""
lines = [f"CREATE TABLE {table_name} ("]
# Map Polars types to SQL types
type_mapping = {
"Datetime": "TIMESTAMP",
"Float64": "DOUBLE PRECISION",
"String": "VARCHAR(255)",
"Boolean": "BOOLEAN",
"Int32": "INTEGER"
}
# Process each field using the new helper method
for field_name in IndicatorSchema.model_fields.keys():
# Get SQL type from Polars type
polars_type = polars_types.get(field_name)
polars_type_name = polars_type.__name__ if polars_type and hasattr(polars_type, '__name__') else "String"
sql_type = type_mapping.get(polars_type_name, "TEXT")
# Get nullable constraint using helper method
metadata = IndicatorSchema.get_field_metadata(field_name)
nullable = metadata.get('nullable', True)
constraint = "" if nullable else " NOT NULL"
# Add description as comment
description = descriptions.get(field_name, "").replace("'", "''")
lines.append(f" {field_name} {sql_type}{constraint}, -- {description}")
lines.append(" PRIMARY KEY (timestamp, symbol, timeframe)")
lines.append(");")
return "\n".join(lines)
schema_sql = generate_sql_with_constraints("thestrat_indicators")
print(schema_sql)
Field Classification by Type
Organize columns by input/output type and nullable constraints:
# Classify fields by their purpose using the helper method
input_fields = []
output_fields = []
nullable_fields = []
required_fields = []
for field_name in IndicatorSchema.model_fields.keys():
metadata = IndicatorSchema.get_field_metadata(field_name)
# Classify by input/output
if metadata.get('input', False):
input_fields.append(field_name)
if metadata.get('output', False):
output_fields.append(field_name)
# Classify by nullable constraint
if metadata.get('nullable', True):
nullable_fields.append(field_name)
else:
required_fields.append(field_name)
print(f"Input fields ({len(input_fields)}): {input_fields}")
print(f"Output fields ({len(output_fields)}): {output_fields}")
print(f"Nullable fields ({len(nullable_fields)}): {nullable_fields}")
print(f"Required (non-null) fields ({len(required_fields)}): {required_fields}")
Column Listing Methods
The schema provides convenient methods to programmatically retrieve different sets of columns:
from thestrat.schemas import IndicatorSchema
# Get all input columns (required + optional)
input_columns = IndicatorSchema.get_all_input_columns()
print(f"Input columns: {input_columns}")
# ['close', 'high', 'low', 'open', 'symbol', 'timeframe', 'timestamp', 'volume']
# Get only required input columns
required_inputs = IndicatorSchema.get_required_input_columns()
print(f"Required: {required_inputs}")
# ['close', 'high', 'low', 'open', 'timeframe', 'timestamp']
# Get only optional input columns
optional_inputs = IndicatorSchema.get_optional_input_columns()
print(f"Optional: {optional_inputs}")
# ['symbol', 'volume']
# Get all output columns
output_columns = IndicatorSchema.get_output_columns()
print(f"Output columns ({len(output_columns)}): {output_columns[:5]}...")
# Output columns (33): ['ath', 'atl', 'bias', 'continuity', 'entry_price']...
Use Cases:
Database Column Selection:
# Select only output columns for downstream processing
output_cols = IndicatorSchema.get_output_columns()
analysis_df = processed_df.select(output_cols)
# Or exclude output columns to get only input data
input_cols = IndicatorSchema.get_all_input_columns()
raw_data_df = processed_df.select(input_cols)
API Response Filtering:
def get_indicator_outputs(df):
"""Extract only indicator output columns for API response."""
output_cols = IndicatorSchema.get_output_columns()
# Add timestamp for context
return df.select(['timestamp', 'symbol'] + output_cols)
Data Validation:
def validate_minimal_input(df):
"""Ensure DataFrame has all required input columns."""
required = IndicatorSchema.get_required_input_columns()
missing = [col for col in required if col not in df.columns]
if missing:
raise ValueError(f"Missing required columns: {missing}")
return True
Schema Evolution Tracking:
# Track schema changes over time
def document_schema_version():
"""Generate schema documentation for version control."""
return {
"version": "0.0.1a29",
"input_columns": IndicatorSchema.get_all_input_columns(),
"output_columns": IndicatorSchema.get_output_columns(),
"total_columns": len(IndicatorSchema.model_fields)
}
Integration Patterns
Database Insert with Validation
def insert_thestrat_data(df, connection):
"""Insert validated DataFrame into database."""
# Validate first
validated_df = validate_input_data(df)
# Get column info for proper insertion
polars_types = IndicatorSchema.get_polars_dtypes()
# Insert with proper type handling
for row in validated_df.iter_rows(named=True):
insert_row(connection, row, polars_types)
def insert_row(conn, row_data, type_info):
"""Insert single row with type conversion."""
columns = list(row_data.keys())
placeholders = ", ".join(["?" for _ in columns])
# Convert values based on schema
from polars import Datetime
values = []
for col, value in row_data.items():
if col in type_info and type_info[col] == Datetime:
values.append(value.isoformat() if value else None)
else:
values.append(value)
query = f"INSERT INTO thestrat_indicators ({', '.join(columns)}) VALUES ({placeholders})"
conn.execute(query, values)
API Response Validation
from polars import DataFrame
def validate_api_response(json_data: list) -> DataFrame:
"""Convert and validate API data."""
df = DataFrame(json_data)
# Validate structure
result = IndicatorSchema.validate_dataframe(df)
if not result['valid']:
raise ValueError(f"API data invalid: {result}")
return result.get('converted_df', df)
Volume Data Type Handling
Volume Integer Precision
During timeframe aggregation, volume values are summed across bars. To prevent floating-point arithmetic from introducing precision errors (like 117289485.035470001399517059326171875), all volume values are explicitly cast to Int64 after aggregation.
Example:
from thestrat import Factory
from thestrat.schemas import FactoryConfig, AggregationConfig
# Aggregate 1-minute data to hourly
config = FactoryConfig(
aggregation=AggregationConfig(
target_timeframes=["1h"],
asset_class="crypto"
)
)
pipeline = Factory.create_all(config)
result = pipeline["aggregation"].process(minute_data)
# Volume column is Int64, not Float64
print(result.schema["volume"]) # Int64
print(result["volume"][0]) # 23457897 (exact integer, no decimals)
Benefits: - Exact values: Volume remains as precise integers through all aggregation levels - Database compatibility: Integer volume fields work correctly with SQL INT/BIGINT columns - No precision drift: Multi-level aggregation (1min → 1h → 1d → 1w) maintains exactness
Important Notes:
- Volume is cast to Int64 after aggregation but before returning results
- Input data can have volume as Int64 or Float64 - both work
- Multi-level aggregation maintains integer precision at each level
- Very large volumes (billions) are supported by Int64 type
Best Practices
- Always validate input data before processing
- Use column categories to organize database tables efficiently
- Leverage auto-conversion for Pandas compatibility
- Check type_issues for data quality problems
- Use descriptions for database comments and API documentation
- Expect Int64 volume: Design database schemas with INTEGER/BIGINT for volume columns