Skip to content

Examples

This section provides comprehensive examples of using TheStrat for various trading scenarios and asset classes.

Basic Examples

Simple 5-Minute Analysis

from pandas import DataFrame as PandasDataFrame
from thestrat import Factory
from thestrat.schemas import (
    FactoryConfig, AggregationConfig, IndicatorsConfig,
    TimeframeItemConfig, SwingPointsConfig
)

# Sample market data
data = PandasDataFrame({
    'timestamp': pd.date_range('2024-01-01 09:30', periods=300, freq='1min'),
    'open': [100 + i*0.1 for i in range(300)],
    'high': [100.5 + i*0.1 for i in range(300)],
    'low': [99.5 + i*0.1 for i in range(300)],
    'close': [100.2 + i*0.1 for i in range(300)],
    'volume': [1000 + i*10 for i in range(300)],
    'timeframe': ['1min'] * 300
})

# Configure for 5-minute equity analysis with Pydantic models
config = FactoryConfig(
    aggregation=AggregationConfig(
        target_timeframes=["5min"],
        asset_class="equities",
        timezone="US/Eastern"
    ),
    indicators=IndicatorsConfig(
        timeframe_configs=[
            TimeframeItemConfig(
                timeframes=["all"],
                swing_points=SwingPointsConfig(window=5, threshold=2.0)
            )
        ]
    )
)

# Process the data
pipeline = Factory.create_all(config)
aggregated = pipeline["aggregation"].process(data)
analyzed = pipeline["indicators"].process(aggregated)

# Display results
print(f"Converted {len(data)} 1-minute bars to {len(aggregated)} 5-minute bars")
inside_bars = len(analyzed.filter(pl.col('scenario') == "1"))
outside_bars = len(analyzed.filter(pl.col('scenario') == "3"))
print(f"Found {inside_bars} inside bars")
print(f"Found {outside_bars} outside bars")

Multi-Target Configuration

Configure multiple target detection for reversal signals with per-timeframe settings:

from thestrat import Factory
from thestrat.schemas import (
    FactoryConfig, AggregationConfig, IndicatorsConfig,
    TimeframeItemConfig, SwingPointsConfig, TargetConfig
)

# Configure with target detection
config = FactoryConfig(
    aggregation=AggregationConfig(
        target_timeframes=["5min"],
        asset_class="equities",
        timezone="US/Eastern"
    ),
    indicators=IndicatorsConfig(
        timeframe_configs=[
            TimeframeItemConfig(
                timeframes=["5min"],
                swing_points=SwingPointsConfig(window=5, threshold=2.0),
                target_config=TargetConfig(
                    upper_bound="higher_high",    # For long signals - targets until next HH
                    lower_bound="lower_low",       # For short signals - targets until next LL
                    merge_threshold_pct=0.02,     # Merge targets within 2% of each other
                    max_targets=3                  # Limit to 3 targets per signal
                )
            )
        ]
    )
)

pipeline = Factory.create_all(config)
aggregated = pipeline["aggregation"].process(market_data)
analyzed = pipeline["indicators"].process(aggregated)

# Get signal objects with multiple targets
from thestrat.indicators import Indicators
signals_with_targets = analyzed.filter(pl.col("signal").is_not_null())
signal_objects = [
    Indicators.get_signal_object(signals_with_targets.slice(i, 1))
    for i in range(len(signals_with_targets))
]

for signal in signal_objects:
    print(f"Signal: {signal.pattern} - {signal.bias.value}")
    print(f"Entry: ${signal.entry_price:.2f}, Stop: ${signal.stop_price:.2f}")
    if signal.target_prices:
        for i, target in enumerate(signal.target_prices, 1):
            print(f"  Target {i}: ${target.price:.2f}")

Key Parameters: - upper_bound: Boundary for long signal targets (higher_high, lower_high) - lower_bound: Boundary for short signal targets (lower_low, higher_low) - merge_threshold_pct: Merge middle targets within threshold (0.02 = 2%). First and last targets are never merged. - max_targets: Maximum targets per signal (None = unlimited)

Multi-Timeframe Analysis

from thestrat import Factory
from thestrat.schemas import (
    FactoryConfig, AggregationConfig, IndicatorsConfig,
    TimeframeItemConfig, SwingPointsConfig
)
from pandas import DataFrame as PandasDataFrame

def analyze_multiple_timeframes(data, timeframes=['5min', '15min', '1h']):
    """Analyze data across multiple timeframes using Pydantic models."""
    # Single configuration for multiple timeframes using models
    config = FactoryConfig(
        aggregation=AggregationConfig(
            target_timeframes=timeframes,  # Process all timeframes together
            asset_class="equities",
            timezone="US/Eastern"
        ),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["5min"],
                    swing_points=SwingPointsConfig(window=3, threshold=1.5)  # Short-term settings
                ),
                TimeframeItemConfig(
                    timeframes=["15min", "1h"],
                    swing_points=SwingPointsConfig(window=7, threshold=2.5)  # Long-term settings
                )
            ]
        )
    )

    # Single pipeline processes all timeframes
    pipeline = Factory.create_all(config)
    aggregated = pipeline["aggregation"].process(data)
    analyzed = pipeline["indicators"].process(aggregated)

    # Extract results by timeframe from normalized output
    results = {}
    for tf in timeframes:
        tf_data = analyzed[analyzed['timeframe'] == tf]
        results[tf] = {
            'data': tf_data,
            'inside_bars': len(tf_data.filter(pl.col('scenario') == "1")),
            'outside_bars': len(tf_data.filter(pl.col('scenario') == "3")),
            'higher_highs': len(tf_data['higher_high'].drop_nulls()),
            'lower_lows': len(tf_data['lower_low'].drop_nulls())
        }

    return results, analyzed  # Return both summary and full data

# Use with your data
multi_tf_analysis, full_data = analyze_multiple_timeframes(sample_data)

# Display summary
for tf, result in multi_tf_analysis.items():
    print(f"{tf}: {result['inside_bars']} inside, {result['outside_bars']} outside")

print(f"Total processed: {len(full_data)} bars across {len(full_data['timeframe'].unique())} timeframes")

Asset Class Specific Examples

Cryptocurrency (24/7 Trading)

Not Actively Tested

This configuration is illustrative only. Crypto/FX are not actively tested or used in production.

# Bitcoin/crypto configuration with Pydantic models
crypto_config = FactoryConfig(
    aggregation=AggregationConfig(
        target_timeframes=["1h"],
        asset_class="crypto",
        timezone="UTC"  # Always UTC for crypto
    ),
    indicators=IndicatorsConfig(
        timeframe_configs=[
            TimeframeItemConfig(
                timeframes=["all"],
                swing_points=SwingPointsConfig(
                    window=6,  # Slightly larger window for hourly
                    threshold=3.0  # Higher threshold for crypto volatility
                )
            )
        ]
    )
)

crypto_pipeline = Factory.create_all(crypto_config)

# Process crypto data (note: no market hours restrictions)
crypto_analyzed = crypto_pipeline["aggregation"].process(btc_data)
crypto_signals = crypto_pipeline["indicators"].process(crypto_analyzed)

print(f"Crypto analysis: 24/7 trading, {len(crypto_signals)} hourly bars")

Forex (24/5 Trading)

Not Actively Tested

This configuration is illustrative only. Crypto/FX are not actively tested or used in production.

# EUR/USD analysis with Pydantic models
fx_config = FactoryConfig(
    aggregation=AggregationConfig(
        target_timeframes=["4h"],
        asset_class="fx",
        timezone="UTC"
    ),
    indicators=IndicatorsConfig(
        timeframe_configs=[
            TimeframeItemConfig(
                timeframes=["all"],
                swing_points=SwingPointsConfig(
                    window=4,
                    threshold=0.5  # Lower threshold for FX (measured in %)
                )
            )
        ]
    )
)

fx_pipeline = Factory.create_all(fx_config)

# FX data processing handles weekend gaps automatically
eurusd_aggregated = fx_pipeline["aggregation"].process(eurusd_1m_data)
eurusd_analyzed = fx_pipeline["indicators"].process(eurusd_aggregated)

# Find major market structure points
higher_highs = len(eurusd_analyzed['higher_high'].drop_nulls())
lower_lows = len(eurusd_analyzed['lower_low'].drop_nulls())
print(f"Found {higher_highs} HH and {lower_lows} LL in EUR/USD")

Multi-Timeframe Examples

Single Request Multi-Timeframe Analysis

from thestrat import Factory

# Process multiple timeframes with different configurations using Pydantic models
config = FactoryConfig(
    aggregation=AggregationConfig(
        target_timeframes=["5min", "15min", "1h", "1d"],  # All timeframes together
        asset_class="equities",
        timezone="US/Eastern"
    ),
    indicators=IndicatorsConfig(
        timeframe_configs=[
            TimeframeItemConfig(
                timeframes=["5min"],  # Short-term aggressive settings
                swing_points=SwingPointsConfig(window=3, threshold=1.0)
            ),
            TimeframeItemConfig(
                timeframes=["15min"],  # Medium-term balanced settings
                swing_points=SwingPointsConfig(window=5, threshold=2.0)
            ),
            TimeframeItemConfig(
                timeframes=["1h", "1d"],  # Long-term conservative settings
                swing_points=SwingPointsConfig(window=10, threshold=3.0)
            )
        ]
    )
)

pipeline = Factory.create_all(config)
aggregated = pipeline["aggregation"].process(market_data)
analyzed = pipeline["indicators"].process(aggregated)

# Extract results for each timeframe from normalized output
for tf in ["5min", "15min", "1h", "1d"]:
    tf_data = analyzed[analyzed['timeframe'] == tf]
    print(f"{tf}: {len(tf_data)} bars, {len(tf_data.filter(pl.col('scenario') == '1'))} inside bars")

print(f"Total: {len(analyzed)} bars across {len(analyzed['timeframe'].unique())} timeframes")

Swing Point Analysis

Understanding Swing Point Detection

Swing points are critical for identifying market structure in TheStrat methodology. The implementation uses precise peak/valley detection with configurable parameters.

from thestrat import Factory
from thestrat.schemas import (
    FactoryConfig, AggregationConfig, IndicatorsConfig,
    TimeframeItemConfig, SwingPointsConfig
)
from pandas import DataFrame as PandasDataFrame

def analyze_swing_points(data):
    """Demonstrate swing point detection with different configurations."""

    # Configuration with detailed swing point settings
    config = FactoryConfig(
        aggregation=AggregationConfig(
            target_timeframes=["5min"],
            asset_class="equities",
            timezone="US/Eastern"
        ),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(
                        window=5,        # Look 5 bars back and ahead
                        threshold=2.0    # Require 2% price change to confirm
                    )
                )
            ]
        )
    )

    # Process the data
    pipeline = Factory.create_all(config)
    aggregated = pipeline["aggregation"].process(data)
    analyzed = pipeline["indicators"].process(aggregated)

    # Market structure analysis
    higher_highs = len(analyzed['higher_high'].drop_nulls())
    lower_highs = len(analyzed['lower_high'].drop_nulls())
    higher_lows = len(analyzed['higher_low'].drop_nulls())
    lower_lows = len(analyzed['lower_low'].drop_nulls())

    print(f"Higher highs: {higher_highs} (bullish structure)")
    print(f"Lower lows: {lower_lows} (bearish structure)")
    print(f"Lower lows: {len(lower_lows)} (bearish structure)")

    return analyzed

# Example usage with trending data
trending_data = PandasDataFrame({
    'timestamp': pd.date_range('2024-01-01 09:30', periods=100, freq='5min'),
    'open': [100 + i*0.5 + (i%10)*0.2 for i in range(100)],    # Trending up with oscillations
    'high': [101 + i*0.5 + (i%10)*0.3 for i in range(100)],
    'low': [99 + i*0.5 + (i%10)*0.1 for i in range(100)],
    'close': [100.5 + i*0.5 + (i%10)*0.25 for i in range(100)],
    'volume': [1000 + i*10 for i in range(100)],
    'timeframe': ['5min'] * 100
})

results = analyze_swing_points(trending_data)

Swing Point Configuration Strategies

Different market conditions and trading styles require different swing point settings:

def compare_swing_configurations(data):
    """Compare different swing point configurations."""

    configurations = [
        ("Scalping", SwingPointsConfig(window=3, threshold=0.5)),     # Very sensitive
        ("Day Trading", SwingPointsConfig(window=5, threshold=1.5)),  # Balanced
        ("Swing Trading", SwingPointsConfig(window=10, threshold=3.0)), # Conservative
        ("Position Trading", SwingPointsConfig(window=20, threshold=5.0)) # Very conservative
    ]

    results = {}

    for strategy_name, swing_config in configurations:
        config = FactoryConfig(
            aggregation=AggregationConfig(
                target_timeframes=["5min"],
                asset_class="equities"
            ),
            indicators=IndicatorsConfig(
                timeframe_configs=[
                    TimeframeItemConfig(
                        timeframes=["all"],
                        swing_points=swing_config
                    )
                ]
            )
        )

        pipeline = Factory.create_all(config)
        aggregated = pipeline["aggregation"].process(data)
        analyzed = pipeline["indicators"].process(aggregated)

        # Count market structure points detected
        structure_count = (len(analyzed['higher_high'].drop_nulls()) +
                          len(analyzed['lower_high'].drop_nulls()) +
                          len(analyzed['higher_low'].drop_nulls()) +
                          len(analyzed['lower_low'].drop_nulls()))

        results[strategy_name] = {
            'config': swing_config,
            'swing_points': swing_count,
            'frequency': f"{swing_count/len(analyzed)*100:.1f}% of bars"
        }

        print(f"{strategy_name}: {swing_count} swing points ({results[strategy_name]['frequency']})")

    return results

# Compare configurations
config_results = compare_swing_configurations(trending_data)

Market Structure Trend Analysis

Understanding the relationship between swing highs and lows reveals market trends:

def analyze_market_structure_trend(analyzed_data):
    """Analyze trend direction using market structure."""

    # Get market structure data
    structure_data = analyzed_data.filter(
        (analyzed_data['higher_high'].is_not_null()) |
        (analyzed_data['lower_low'].is_not_null())
    ).sort('timestamp')

    if len(swing_points) < 4:
        return "Insufficient swing points for trend analysis"

    # Count recent structure patterns
    recent_data = analyzed_data.tail(50)  # Last 50 bars

    hh_count = len(recent_data['higher_high'].drop_nulls())
    hl_count = len(recent_data['higher_low'].drop_nulls())
    lh_count = len(recent_data['lower_high'].drop_nulls())
    ll_count = len(recent_data['lower_low'].drop_nulls())

    bullish_signals = hh_count + hl_count
    bearish_signals = lh_count + ll_count

    print(f"Recent Market Structure (last 50 bars):")
    print(f"  Higher Highs: {hh_count}")
    print(f"  Higher Lows: {hl_count}")
    print(f"  Lower Highs: {lh_count}")
    print(f"  Lower Lows: {ll_count}")
    print(f"  Bullish signals: {bullish_signals}")
    print(f"  Bearish signals: {bearish_signals}")

    if bullish_signals > bearish_signals * 1.5:
        trend = "Strong Uptrend"
    elif bearish_signals > bullish_signals * 1.5:
        trend = "Strong Downtrend"
    elif bullish_signals > bearish_signals:
        trend = "Weak Uptrend"
    elif bearish_signals > bullish_signals:
        trend = "Weak Downtrend"
    else:
        trend = "Sideways/Consolidation"

    print(f"  Trend Assessment: {trend}")
    return trend

# Analyze the trend
trend_assessment = analyze_market_structure_trend(results)

Performance Considerations

TheStrat's swing point detection is fully vectorized for optimal performance:

import time

def benchmark_swing_detection(data_size=10000):
    """Benchmark swing point detection performance."""

    # Generate large dataset
    large_data = PandasDataFrame({
        'timestamp': pd.date_range('2024-01-01', periods=data_size, freq='1min'),
        'open': [100 + i*0.01 + (i%100)*0.1 for i in range(data_size)],
        'high': [100.5 + i*0.01 + (i%100)*0.15 for i in range(data_size)],
        'low': [99.5 + i*0.01 + (i%100)*0.05 for i in range(data_size)],
        'close': [100.2 + i*0.01 + (i%100)*0.12 for i in range(data_size)],
        'volume': [1000 + i for i in range(data_size)],
        'timeframe': ['1min'] * data_size
    })

    config = FactoryConfig(
        aggregation=AggregationConfig(
            target_timeframes=["5min"],
            asset_class="equities"
        ),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(window=5, threshold=2.0)
                )
            ]
        )
    )

    pipeline = Factory.create_all(config)

    # Benchmark aggregation
    start_time = time.time()
    aggregated = pipeline["aggregation"].process(large_data)
    agg_time = time.time() - start_time

    # Benchmark indicators (including swing points)
    start_time = time.time()
    analyzed = pipeline["indicators"].process(aggregated)
    indicator_time = time.time() - start_time

    print(f"Performance Benchmark ({data_size:,} input rows):")
    print(f"  Aggregation: {agg_time:.3f}s ({len(large_data)/agg_time:,.0f} rows/sec)")
    print(f"  Indicators: {indicator_time:.3f}s ({len(aggregated)/indicator_time:,.0f} rows/sec)")
    print(f"  Total: {agg_time + indicator_time:.3f}s")
    print(f"  Output: {len(analyzed)} bars with full indicator analysis")

# Run performance benchmark
benchmark_swing_detection(10000)

Cross-Timeframe Signal Correlation

def analyze_cross_timeframe_signals(data):
    """Analyze signal correlation across multiple timeframes."""
    config = FactoryConfig(
        aggregation=AggregationConfig(
            target_timeframes=["5min", "15min", "1h"],
            asset_class="equities"
        ),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(window=5, threshold=2.0)
                )
            ]
        )
    )

    pipeline = Factory.create_all(config)
    aggregated = pipeline["aggregation"].process(data)
    analyzed = pipeline["indicators"].process(aggregated)

    # Find synchronized signals across timeframes
    synchronized_signals = []

    # Get latest bar for each timeframe
    latest_by_tf = {}
    for tf in ["5min", "15min", "1h"]:
        tf_data = analyzed[analyzed['timeframe'] == tf]
        if len(tf_data) > 0:
            latest_by_tf[tf] = tf_data.iloc[-1]

    # Check for signal alignment
    if all(bar.get('scenario') == "3" for bar in latest_by_tf.values()):
        synchronized_signals.append({
            'type': 'multi_timeframe_breakout',
            'timeframes': list(latest_by_tf.keys()),
            'timestamp': list(latest_by_tf.values())[0]['timestamp']
        })

    return synchronized_signals, analyzed

# Example usage
signals, full_analysis = analyze_cross_timeframe_signals(sample_data)
print(f"Found {len(signals)} synchronized signals across multiple timeframes")

Advanced Analysis Examples

Custom Signal Detection

def detect_strat_patterns(data):
    """Detect common TheStrat patterns."""
    patterns = []

    for i in range(2, len(data)):
        current = data.iloc[i]
        prev1 = data.iloc[i-1]
        prev2 = data.iloc[i-2]

        # Inside bar followed by breakout (2-1-2 Continuation)
        if (prev2['scenario'] == "3" and
            prev1['scenario'] == "1" and
            current['close'] > prev2['high']):
            patterns.append({
                'timestamp': current['timestamp'],
                'pattern': '2-1-2_bullish_continuation',
                'entry_price': prev2['high'],
                'target': current['close'] + (current['close'] - prev2['low']) * 0.5
            })

        # Outside bar reversal
        if (current['scenario'] == "3" and
            prev1['close'] > prev1['open'] and  # Previous bar was bullish
            current['close'] < current['open']):  # Current bar is bearish
            patterns.append({
                'timestamp': current['timestamp'],
                'pattern': 'outside_bar_reversal',
                'entry_price': current['low'],
                'stop_loss': current['high']
            })

    return patterns

# Apply pattern detection
patterns = detect_strat_patterns(analyzed_data)
print(f"Detected {len(patterns)} TheStrat patterns")

# Display recent patterns
for pattern in patterns[-5:]:
    print(f"{pattern['timestamp']}: {pattern['pattern']} @ {pattern['entry_price']}")

Risk Management Integration

def calculate_position_sizes(signals, account_balance, risk_percent=2.0):
    """Calculate position sizes based on TheStrat signals."""
    positions = []

    for signal in signals:
        if 'entry_price' in signal and 'stop_loss' in signal:
            # Calculate risk per share
            risk_per_share = abs(signal['entry_price'] - signal['stop_loss'])

            # Calculate position size
            risk_amount = account_balance * (risk_percent / 100)
            position_size = int(risk_amount / risk_per_share) if risk_per_share > 0 else 0

            positions.append({
                **signal,
                'position_size': position_size,
                'risk_amount': risk_amount,
                'risk_per_share': risk_per_share
            })

    return positions

# Example usage
account_balance = 100000  # $100k account
risk_per_trade = 2.0      # 2% risk per trade

sized_positions = calculate_position_sizes(patterns, account_balance, risk_per_trade)

for pos in sized_positions:
    if pos['position_size'] > 0:
        print(f"Signal: {pos['pattern']}")
        print(f"Entry: ${pos['entry_price']:.2f}")
        print(f"Size: {pos['position_size']} shares")
        print(f"Risk: ${pos['risk_amount']:.2f}")
        print("---")

Real-Time Analysis Simulation

import time
from datetime import datetime

def simulate_real_time_analysis(historical_data, interval_seconds=60):
    """Simulate real-time TheStrat analysis with Pydantic models."""

    config = FactoryConfig(
        aggregation=AggregationConfig(target_timeframes=["5min"], asset_class="equities"),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(window=5, threshold=2.0)
                )
            ]
        )
    )

    pipeline = Factory.create_all(config)

    # Simulate streaming data
    for i in range(50, len(historical_data), 5):  # Add 5 bars at a time
        current_data = historical_data.iloc[:i]

        # Process latest data
        aggregated = pipeline["aggregation"].process(current_data)
        analyzed = pipeline["indicators"].process(aggregated)

        # Check for new signals (last bar)
        if len(analyzed) > 0:
            latest = analyzed.iloc[-1]

            if latest['scenario'] == "1":
                print(f"{datetime.now()}: Inside bar detected @ {latest['close']:.2f}")
            elif latest['scenario'] == "3":
                print(f"{datetime.now()}: Outside bar detected @ {latest['close']:.2f}")

            # Check for market structure changes
            if latest.get('higher_high') is not None:
                print(f"{datetime.now()}: Higher High @ {latest['higher_high']:.2f}")
            elif latest.get('lower_low') is not None:
                print(f"{datetime.now()}: Lower Low @ {latest['lower_low']:.2f}")

        time.sleep(interval_seconds)

# Run simulation (comment out for docs)
# simulate_real_time_analysis(sample_data, interval_seconds=2)

Performance Optimization Examples

Batch Processing

def batch_process_symbols(symbol_data_dict, config_template):
    """Process multiple symbols efficiently with new API."""
    results = {}

    # Create pipeline once - supports multiple timeframes per symbol
    pipeline = Factory.create_all(config_template)

    for symbol, data in symbol_data_dict.items():
        try:
            # Process each symbol - now handles multiple timeframes
            aggregated = pipeline["aggregation"].process(data)
            analyzed = pipeline["indicators"].process(aggregated)

            # Store results with timeframe breakdown
            results[symbol] = {
                'data': analyzed,
                'timeframes': analyzed['timeframe'].unique().tolist(),
                'inside_bars': len(analyzed.filter(pl.col('scenario') == "1")),
                'outside_bars': len(analyzed.filter(pl.col('scenario') == "3")),
                'last_price': analyzed.iloc[-1]['close'],
                'total_bars': len(analyzed)
            }

            print(f"Processed {symbol}: {len(analyzed)} bars across {len(analyzed['timeframe'].unique())} timeframes")

        except Exception as e:
            print(f"Error processing {symbol}: {e}")
            results[symbol] = None

    return results

# Example usage
symbols_data = {
    'AAPL': aapl_data,
    'MSFT': msft_data,
    'GOOGL': googl_data
}

batch_results = batch_process_symbols(symbols_data, config)

Memory Efficient Processing

def process_large_dataset(data, chunk_size=1000):
    """Process large datasets in chunks to manage memory with new API."""

    config = FactoryConfig(
        aggregation=AggregationConfig(
            target_timeframes=["5min"],
            asset_class="equities"
        ),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(window=5)
                )
            ]
        )
    )

    pipeline = Factory.create_all(config)
    results = []

    # Process in chunks
    for start_idx in range(0, len(data), chunk_size):
        end_idx = min(start_idx + chunk_size, len(data))
        chunk = data.iloc[start_idx:end_idx]

        # Include overlap for continuity
        if start_idx > 0:
            overlap = data.iloc[max(0, start_idx-100):start_idx]
            chunk = pd.concat([overlap, chunk])

        # Process chunk
        aggregated = pipeline["aggregation"].process(chunk)
        analyzed = pipeline["indicators"].process(aggregated)

        # Store results (excluding overlap)
        if start_idx > 0:
            analyzed = analyzed.iloc[20:]  # Remove overlap portion

        results.append(analyzed)
        print(f"Processed chunk {start_idx//chunk_size + 1}")

    # Combine results
    final_result = pd.concat(results, ignore_index=True)
    return final_result

Integration Examples

# Integration with backtrader
import backtrader as bt

class TheStratStrategy(bt.Strategy):
    def __init__(self):
        self.thestrat_config = FactoryConfig(
            aggregation=AggregationConfig(
                target_timeframes=["5min"],
                asset_class="equities"
            ),
            indicators=IndicatorsConfig(
                timeframe_configs=[
                    TimeframeItemConfig(
                        timeframes=["all"],
                        swing_points=SwingPointsConfig(window=5, threshold=2.0)
                    )
                ]
            )
        )
        self.pipeline = Factory.create_all(self.thestrat_config)

    def next(self):
        # Convert backtrader data to DataFrame
        data = self.convert_bt_data()

        # Apply TheStrat analysis
        analyzed = self.pipeline["indicators"].process(
            self.pipeline["aggregation"].process(data)
        )

        # Trading logic based on TheStrat signals
        if analyzed.iloc[-1]['scenario'] == "3" and not self.position:
            self.buy()
        elif analyzed.iloc[-1]['scenario'] == "1" and self.position:
            self.close()

# Integration with zipline
from zipline.api import order, record, symbol

def thestrat_zipline_algo(context, data):
    # Get price data
    prices = data.history(symbol('AAPL'), ['open', 'high', 'low', 'close'], 100, '1d')

    # Apply TheStrat with new API
    config = FactoryConfig(
        aggregation=AggregationConfig(target_timeframes=["1d"]),
        indicators=IndicatorsConfig(
            timeframe_configs=[
                TimeframeItemConfig(
                    timeframes=["all"],
                    swing_points=SwingPointsConfig(window=5)
                )
            ]
        )
    )
    pipeline = Factory.create_all(config)
    aggregated = pipeline["aggregation"].process(prices.reset_index())
    analyzed = pipeline["indicators"].process(aggregated)

    # Trading decisions
    if analyzed.iloc[-1]['scenario'] == "3":
        order(symbol('AAPL'), 100)

    record(inside_bars=len(analyzed.filter(pl.col('scenario') == "1")))

These examples demonstrate the flexibility and power of TheStrat for various trading scenarios. Adapt the configurations and logic to match your specific trading strategy and requirements.