Skip to content

Examples

This section provides comprehensive examples of using TheStrat for various trading scenarios and asset classes.

Basic Examples

Simple 5-Minute Analysis

from pandas import DataFrame as PandasDataFrame
from thestrat import Factory
from thestrat.schemas import (
    FactoryConfig, AggregationConfig, IndicatorsConfig,
    TimeframeItemConfig, SwingPointsConfig
)

# Sample market data
data = PandasDataFrame({
    'timestamp': pd.date_range('2024-01-01 09:30', periods=300, freq='1min'),
    'open': [100 + i*0.1 for i in range(300)],
    'high': [100.5 + i*0.1 for i in range(300)],
    'low': [99.5 + i*0.1 for i in range(300)],
    'close': [100.2 + i*0.1 for i in range(300)],
    'volume': [1000 + i*10 for i in range(300)]
})

# Configure for 5-minute equity analysis with Pydantic models
config = FactoryConfig(
    aggregation=AggregationConfig(
        target_timeframes=["5m"],
        asset_class="equities",
        timezone="US/Eastern"
    ),
    indicators=IndicatorsConfig(
        timeframe_configs=[
            TimeframeItemConfig(
                timeframes=["all"],
                swing_points=SwingPointsConfig(window=5, threshold=2.0)
            )
        ]
    )
)

# Process the data
pipeline = Factory.create_all(config)
aggregated = pipeline["aggregation"].process(data)
analyzed = pipeline["indicators"].process(aggregated)

# Display results
print(f"Converted {len(data)} 1-minute bars to {len(aggregated)} 5-minute bars")
print(f"Found {analyzed['inside_bar'].sum()} inside bars")
print(f"Found {analyzed['outside_bar'].sum()} outside bars")

Multi-Timeframe Analysis

from thestrat import Factory
from thestrat.schemas import (
    FactoryConfig, AggregationConfig, IndicatorsConfig,
    TimeframeItemConfig, SwingPointsConfig
)
from pandas import DataFrame as PandasDataFrame

def analyze_multiple_timeframes(data, timeframes=['5m', '15m', '1h']):
    """Analyze data across multiple timeframes using Pydantic models."""
    # Single configuration for multiple timeframes using models
    config = FactoryConfig(
        aggregation=AggregationConfig(
            target_timeframes=timeframes,  # Process all timeframes together
            asset_class="equities",
            timezone="US/Eastern"
        ),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["5m"],
                    swing_points=SwingPointsConfig(window=3, threshold=1.5)  # Short-term settings
                ),
                TimeframeItemConfig(
                    timeframes=["15m", "1h"],
                    swing_points=SwingPointsConfig(window=7, threshold=2.5)  # Long-term settings
                )
            ]
        )
    )

    # Single pipeline processes all timeframes
    pipeline = Factory.create_all(config)
    aggregated = pipeline["aggregation"].process(data)
    analyzed = pipeline["indicators"].process(aggregated)

    # Extract results by timeframe from normalized output
    results = {}
    for tf in timeframes:
        tf_data = analyzed[analyzed['timeframe'] == tf]
        results[tf] = {
            'data': tf_data,
            'inside_bars': tf_data['inside_bar'].sum(),
            'outside_bars': tf_data['outside_bar'].sum(),
            'pivot_highs': tf_data['pivot_high'].sum() if 'pivot_high' in tf_data.columns else 0,
            'pivot_lows': tf_data['pivot_low'].sum() if 'pivot_low' in tf_data.columns else 0
        }

    return results, analyzed  # Return both summary and full data

# Use with your data
multi_tf_analysis, full_data = analyze_multiple_timeframes(sample_data)

# Display summary
for tf, result in multi_tf_analysis.items():
    print(f"{tf}: {result['inside_bars']} inside, {result['outside_bars']} outside")

print(f"Total processed: {len(full_data)} bars across {len(full_data['timeframe'].unique())} timeframes")

Asset Class Specific Examples

Cryptocurrency (24/7 Trading)

# Bitcoin/crypto configuration with Pydantic models
crypto_config = FactoryConfig(
    aggregation=AggregationConfig(
        target_timeframes=["1h"],
        asset_class="crypto",
        timezone="UTC"  # Always UTC for crypto
    ),
    indicators=IndicatorsConfig(
        timeframe_configs=[
            TimeframeItemConfig(
                timeframes=["all"],
                swing_points=SwingPointsConfig(
                    window=6,  # Slightly larger window for hourly
                    threshold=3.0  # Higher threshold for crypto volatility
                )
            )
        ]
    )
)

crypto_pipeline = Factory.create_all(crypto_config)

# Process crypto data (note: no market hours restrictions)
crypto_analyzed = crypto_pipeline["aggregation"].process(btc_data)
crypto_signals = crypto_pipeline["indicators"].process(crypto_analyzed)

print(f"Crypto analysis: 24/7 trading, {len(crypto_signals)} hourly bars")

Forex (24/5 Trading)

# EUR/USD analysis with Pydantic models
fx_config = FactoryConfig(
    aggregation=AggregationConfig(
        target_timeframes=["4h"],
        asset_class="fx",
        timezone="UTC"
    ),
    indicators=IndicatorsConfig(
        timeframe_configs=[
            TimeframeItemConfig(
                timeframes=["all"],
                swing_points=SwingPointsConfig(
                    window=4,
                    threshold=0.5  # Lower threshold for FX (measured in %)
                )
            )
        ]
    )
)

fx_pipeline = Factory.create_all(fx_config)

# FX data processing handles weekend gaps automatically
eurusd_aggregated = fx_pipeline["aggregation"].process(eurusd_1m_data)
eurusd_analyzed = fx_pipeline["indicators"].process(eurusd_aggregated)

# Find major swing points (check for boolean columns indicating new pivots)
major_swings = eurusd_analyzed[
    (eurusd_analyzed.get('new_pivot_high', False) == True) |
    (eurusd_analyzed.get('new_pivot_low', False) == True)
] if 'new_pivot_high' in eurusd_analyzed.columns or 'new_pivot_low' in eurusd_analyzed.columns else []
print(f"Found {len(major_swings)} major swing points in EUR/USD")

Multi-Timeframe Examples

Single Request Multi-Timeframe Analysis

from thestrat import Factory

# Process multiple timeframes with different configurations using Pydantic models
config = FactoryConfig(
    aggregation=AggregationConfig(
        target_timeframes=["5m", "15m", "1h", "1d"],  # All timeframes together
        asset_class="equities",
        timezone="US/Eastern"
    ),
    indicators=IndicatorsConfig(
        timeframe_configs=[
            TimeframeItemConfig(
                timeframes=["5m"],  # Short-term aggressive settings
                swing_points=SwingPointsConfig(window=3, threshold=1.0)
            ),
            TimeframeItemConfig(
                timeframes=["15m"],  # Medium-term balanced settings
                swing_points=SwingPointsConfig(window=5, threshold=2.0)
            ),
            TimeframeItemConfig(
                timeframes=["1h", "1d"],  # Long-term conservative settings
                swing_points=SwingPointsConfig(window=10, threshold=3.0)
            )
        ]
    )
)

pipeline = Factory.create_all(config)
aggregated = pipeline["aggregation"].process(market_data)
analyzed = pipeline["indicators"].process(aggregated)

# Extract results for each timeframe from normalized output
for tf in ["5m", "15m", "1h", "1d"]:
    tf_data = analyzed[analyzed['timeframe'] == tf]
    print(f"{tf}: {len(tf_data)} bars, {tf_data['inside_bar'].sum()} inside bars")

print(f"Total: {len(analyzed)} bars across {len(analyzed['timeframe'].unique())} timeframes")

Cross-Timeframe Signal Correlation

def analyze_cross_timeframe_signals(data):
    """Analyze signal correlation across multiple timeframes."""
    config = FactoryConfig(
        aggregation=AggregationConfig(
            target_timeframes=["5m", "15m", "1h"],
            asset_class="equities"
        ),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(window=5, threshold=2.0)
                )
            ]
        )
    )

    pipeline = Factory.create_all(config)
    aggregated = pipeline["aggregation"].process(data)
    analyzed = pipeline["indicators"].process(aggregated)

    # Find synchronized signals across timeframes
    synchronized_signals = []

    # Get latest bar for each timeframe
    latest_by_tf = {}
    for tf in ["5m", "15m", "1h"]:
        tf_data = analyzed[analyzed['timeframe'] == tf]
        if len(tf_data) > 0:
            latest_by_tf[tf] = tf_data.iloc[-1]

    # Check for signal alignment
    if all(bar.get('outside_bar', False) for bar in latest_by_tf.values()):
        synchronized_signals.append({
            'type': 'multi_timeframe_breakout',
            'timeframes': list(latest_by_tf.keys()),
            'timestamp': list(latest_by_tf.values())[0]['timestamp']
        })

    return synchronized_signals, analyzed

# Example usage
signals, full_analysis = analyze_cross_timeframe_signals(sample_data)
print(f"Found {len(signals)} synchronized signals across multiple timeframes")

Advanced Analysis Examples

Custom Signal Detection

def detect_strat_patterns(data):
    """Detect common TheStrat patterns."""
    patterns = []

    for i in range(2, len(data)):
        current = data.iloc[i]
        prev1 = data.iloc[i-1]
        prev2 = data.iloc[i-2]

        # Inside bar followed by breakout (2-1-2 Continuation)
        if (prev2['outside_bar'] and
            prev1['inside_bar'] and
            current['close'] > prev2['high']):
            patterns.append({
                'timestamp': current['timestamp'],
                'pattern': '2-1-2_bullish_continuation',
                'entry_price': prev2['high'],
                'target': current['close'] + (current['close'] - prev2['low']) * 0.5
            })

        # Outside bar reversal
        if (current['outside_bar'] and
            prev1['close'] > prev1['open'] and  # Previous bar was bullish
            current['close'] < current['open']):  # Current bar is bearish
            patterns.append({
                'timestamp': current['timestamp'],
                'pattern': 'outside_bar_reversal',
                'entry_price': current['low'],
                'stop_loss': current['high']
            })

    return patterns

# Apply pattern detection
patterns = detect_strat_patterns(analyzed_data)
print(f"Detected {len(patterns)} TheStrat patterns")

# Display recent patterns
for pattern in patterns[-5:]:
    print(f"{pattern['timestamp']}: {pattern['pattern']} @ {pattern['entry_price']}")

Risk Management Integration

def calculate_position_sizes(signals, account_balance, risk_percent=2.0):
    """Calculate position sizes based on TheStrat signals."""
    positions = []

    for signal in signals:
        if 'entry_price' in signal and 'stop_loss' in signal:
            # Calculate risk per share
            risk_per_share = abs(signal['entry_price'] - signal['stop_loss'])

            # Calculate position size
            risk_amount = account_balance * (risk_percent / 100)
            position_size = int(risk_amount / risk_per_share) if risk_per_share > 0 else 0

            positions.append({
                **signal,
                'position_size': position_size,
                'risk_amount': risk_amount,
                'risk_per_share': risk_per_share
            })

    return positions

# Example usage
account_balance = 100000  # $100k account
risk_per_trade = 2.0      # 2% risk per trade

sized_positions = calculate_position_sizes(patterns, account_balance, risk_per_trade)

for pos in sized_positions:
    if pos['position_size'] > 0:
        print(f"Signal: {pos['pattern']}")
        print(f"Entry: ${pos['entry_price']:.2f}")
        print(f"Size: {pos['position_size']} shares")
        print(f"Risk: ${pos['risk_amount']:.2f}")
        print("---")

Real-Time Analysis Simulation

import time
from datetime import datetime

def simulate_real_time_analysis(historical_data, interval_seconds=60):
    """Simulate real-time TheStrat analysis with Pydantic models."""

    config = FactoryConfig(
        aggregation=AggregationConfig(target_timeframes=["5m"], asset_class="equities"),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(window=5, threshold=2.0)
                )
            ]
        )
    )

    pipeline = Factory.create_all(config)

    # Simulate streaming data
    for i in range(50, len(historical_data), 5):  # Add 5 bars at a time
        current_data = historical_data.iloc[:i]

        # Process latest data
        aggregated = pipeline["aggregation"].process(current_data)
        analyzed = pipeline["indicators"].process(aggregated)

        # Check for new signals (last bar)
        if len(analyzed) > 0:
            latest = analyzed.iloc[-1]

            if latest['inside_bar']:
                print(f"{datetime.now()}: Inside bar detected @ {latest['close']:.2f}")
            elif latest['outside_bar']:
                print(f"{datetime.now()}: Outside bar detected @ {latest['close']:.2f}")

            # Check for pivot points
            if latest.get('new_pivot_high', False):
                print(f"{datetime.now()}: New Pivot HIGH @ {latest['high']:.2f}")
            elif latest.get('new_pivot_low', False):
                print(f"{datetime.now()}: New Pivot LOW @ {latest['low']:.2f}")

        time.sleep(interval_seconds)

# Run simulation (comment out for docs)
# simulate_real_time_analysis(sample_data, interval_seconds=2)

Performance Optimization Examples

Batch Processing

def batch_process_symbols(symbol_data_dict, config_template):
    """Process multiple symbols efficiently with new API."""
    results = {}

    # Create pipeline once - supports multiple timeframes per symbol
    pipeline = Factory.create_all(config_template)

    for symbol, data in symbol_data_dict.items():
        try:
            # Process each symbol - now handles multiple timeframes
            aggregated = pipeline["aggregation"].process(data)
            analyzed = pipeline["indicators"].process(aggregated)

            # Store results with timeframe breakdown
            results[symbol] = {
                'data': analyzed,
                'timeframes': analyzed['timeframe'].unique().tolist(),
                'inside_bars': analyzed['inside_bar'].sum(),
                'outside_bars': analyzed['outside_bar'].sum(),
                'last_price': analyzed.iloc[-1]['close'],
                'total_bars': len(analyzed)
            }

            print(f"Processed {symbol}: {len(analyzed)} bars across {len(analyzed['timeframe'].unique())} timeframes")

        except Exception as e:
            print(f"Error processing {symbol}: {e}")
            results[symbol] = None

    return results

# Example usage
symbols_data = {
    'AAPL': aapl_data,
    'MSFT': msft_data,
    'GOOGL': googl_data
}

batch_results = batch_process_symbols(symbols_data, config)

Memory Efficient Processing

def process_large_dataset(data, chunk_size=1000):
    """Process large datasets in chunks to manage memory with new API."""

    config = FactoryConfig(
        aggregation=AggregationConfig(
            target_timeframes=["5m"],
            asset_class="equities"
        ),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(window=5)
                )
            ]
        )
    )

    pipeline = Factory.create_all(config)
    results = []

    # Process in chunks
    for start_idx in range(0, len(data), chunk_size):
        end_idx = min(start_idx + chunk_size, len(data))
        chunk = data.iloc[start_idx:end_idx]

        # Include overlap for continuity
        if start_idx > 0:
            overlap = data.iloc[max(0, start_idx-100):start_idx]
            chunk = pd.concat([overlap, chunk])

        # Process chunk
        aggregated = pipeline["aggregation"].process(chunk)
        analyzed = pipeline["indicators"].process(aggregated)

        # Store results (excluding overlap)
        if start_idx > 0:
            analyzed = analyzed.iloc[20:]  # Remove overlap portion

        results.append(analyzed)
        print(f"Processed chunk {start_idx//chunk_size + 1}")

    # Combine results
    final_result = pd.concat(results, ignore_index=True)
    return final_result

Integration Examples

# Integration with backtrader
import backtrader as bt

class TheStratStrategy(bt.Strategy):
    def __init__(self):
        self.thestrat_config = FactoryConfig(
            aggregation=AggregationConfig(
                target_timeframes=["5m"],
                asset_class="equities"
            ),
            indicators=IndicatorsConfig(
                timeframe_configs=[
                    TimeframeItemConfig(
                        timeframes=["all"],
                        swing_points=SwingPointsConfig(window=5, threshold=2.0)
                    )
                ]
            )
        )
        self.pipeline = Factory.create_all(self.thestrat_config)

    def next(self):
        # Convert backtrader data to DataFrame
        data = self.convert_bt_data()

        # Apply TheStrat analysis
        analyzed = self.pipeline["indicators"].process(
            self.pipeline["aggregation"].process(data)
        )

        # Trading logic based on TheStrat signals
        if analyzed.iloc[-1]['outside_bar'] and not self.position:
            self.buy()
        elif analyzed.iloc[-1]['inside_bar'] and self.position:
            self.close()

# Integration with zipline
from zipline.api import order, record, symbol

def thestrat_zipline_algo(context, data):
    # Get price data
    prices = data.history(symbol('AAPL'), ['open', 'high', 'low', 'close'], 100, '1d')

    # Apply TheStrat with new API
    config = FactoryConfig(
        aggregation=AggregationConfig(target_timeframes=["1d"]),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(window=5)
                )
            ]
        )
    )
    pipeline = Factory.create_all(config)
    aggregated = pipeline["aggregation"].process(prices.reset_index())
    analyzed = pipeline["indicators"].process(aggregated)

    # Trading decisions
    if analyzed.iloc[-1]['outside_bar']:
        order(symbol('AAPL'), 100)

    record(inside_bars=analyzed['inside_bar'].sum())

These examples demonstrate the flexibility and power of TheStrat for various trading scenarios. Adapt the configurations and logic to match your specific trading strategy and requirements.