← Research

// article

Atlas in production: putting a forecasting system in front of real capital

October 3, 2025 Article

Part 2 of 2 in the ML deployment series. Part 1: ML deployment reference guide.

Atlas trades 0–3 DTE SPY options with real capital. The forecasting layer has three jobs: detect what market regime we’re in, predict the next 5–15 minutes of price action, and read microstructure signals from the order book. None of that matters if the system can’t keep up with the data, can’t tolerate a single component failing, or can’t tell you it’s broken.

This is the engineering writeup of how the production deployment got there. The model work is the easy half. The harder half is the infrastructure around it.

What was actually slow

The prototype had every problem you’d expect from a notebook hauled into production:

  • Blocking I/O in async loops. Synchronous model calls inside an asyncio service degraded 100ms tasks to 2+ seconds because the event loop couldn’t make progress while inference ran.
  • Cold start of 8–12 seconds loading the 6-state HMM, which made restarts during market hours unusable.
  • Memory leaks in the feature pipeline — 4GB+ accumulation over 24 hours.
  • No observability. Models would silently degrade, and nobody knew until the P&L told us.
  • Cascading failures. One component going down took the whole system with it.

Baseline measurements on 100K rows:

ComponentLatencyMemoryFailure mode
LSTM engine8.4s245 MBBlocking TensorFlow calls
XGBoost engine5.7s89 MBSynchronous predictions
SJM (Statistical Jump Model)1.2s12 MBNo async support
Health checksNone

Everything traces back to one bottleneck: synchronous model inference inside an async service.

The fix, in three parts

  1. Async-wrap CPU-bound inference so the event loop stays responsive.
  2. Orchestrate component startup by dependency, with retries and health verification.
  3. Monitor everything at 10Hz so degradation is detected before it shows up in trades.

Async service pattern

The model itself is CPU-bound. The fix isn’t to make the model async — it’s to push the model into a thread pool and await it from the async service.

class RegimeDetectionService:
    """Configuration-driven regime detection service"""

    def __init__(self, model_version: Optional[str] = None,
                 enable_stability_fixes: bool = True):
        self.redis_client = redis.Redis(decode_responses=True)

        self.model_manager = get_model_manager()
        self.config_manager = get_ml_config_manager()

        if model_version:
            self.model = self.model_manager.load_model('sjm', model_version)
        else:
            self.model = self.model_manager.get_active_model('sjm')

        if enable_stability_fixes:
            self.stability_enhancer = RegimeStabilityEnhancer(
                min_confidence=0.70,
                max_transitions_per_hour=10,
                feature_zero_threshold=0.5,
                covariance_regularization=1e-4
            )
            self.enhanced_predictor = EnhancedRegimePredictor(
                self.model, self.stability_enhancer
            )

    async def start(self):
        try:
            while True:
                await self._update_regime()
                await asyncio.sleep(5)
        except Exception as e:
            logger.error(f"Regime detection service error: {e}")
            raise

    async def _update_regime(self):
        try:
            features = await self.pipeline_connector.get_latest_features()
            if features is None:
                logger.warning("No features available")
                return

            # CPU-bound — push into the executor
            loop = asyncio.get_event_loop()
            X = await loop.run_in_executor(
                None,
                self.feature_transformer.transform,
                features
            )

            prediction = await loop.run_in_executor(
                None,
                self.enhanced_predictor.predict,
                X
            )

            # Async I/O is fine on the loop
            await self._publish_regime(prediction)

        except Exception as e:
            logger.error(f"Error updating regime: {e}", exc_info=True)

run_in_executor() is doing the work. CPU-bound functions go into the default ThreadPoolExecutor; the async loop stays responsive for I/O.

Result: blocking calls that previously degraded async I/O from 100ms to 2,000ms now keep the I/O at 100ms.

Component orchestrator

Components have dependencies. The database has to come up before anything that queries it. Redis has to come up before the things publishing to it. The orchestrator computes dependency levels and starts components level-by-level — within a level, in parallel.

class ComponentOrchestrator:
    """Orchestrates component startup with dependency management"""

    def __init__(self):
        self.components: Dict[str, ComponentInfo] = {}
        self.component_states: Dict[str, ComponentState] = {}
        self.dependencies = DependencyGraph()
        self._lock = asyncio.Lock()

        self.startup_timeout = 30.0
        self.max_retries = 3
        self.retry_delay = 1.0

    async def start_all(self) -> bool:
        levels = self._calculate_dependency_levels()

        for level, components in sorted(levels.items()):
            tasks = []
            for comp_name in components:
                if not await self.verify_dependencies(comp_name):
                    raise RuntimeError(f"Dependencies not met: {comp_name}")

                task = self._start_with_retries(comp_name)
                tasks.append(task)

            results = await asyncio.gather(*tasks, return_exceptions=True)

            for comp_name, result in zip(components, results):
                if isinstance(result, Exception):
                    logger.error(f"Failed to start {comp_name}: {result}")
                    return False

        return True

    async def _start_with_retries(self, comp_name: str) -> bool:
        for attempt in range(self.max_retries):
            try:
                comp_info = self.components[comp_name]

                async with asyncio.timeout(self.startup_timeout):
                    await comp_info.start_func()

                if comp_info.health_check:
                    health = await comp_info.health_check()
                    if not health.get('healthy', False):
                        raise RuntimeError(f"Health check failed: {health}")

                self.component_states[comp_name] = ComponentState.RUNNING
                logger.info(f"Started {comp_name}")
                return True

            except Exception as e:
                logger.warning(
                    f"Attempt {attempt + 1}/{self.max_retries} failed "
                    f"for {comp_name}: {e}"
                )

                if attempt < self.max_retries - 1:
                    delay = self.retry_delay * (2 ** attempt)
                    await asyncio.sleep(delay)
                else:
                    logger.error(f"Failed to start {comp_name} after retries")
                    raise

        return False

The actual Atlas dependency graph:

ATLAS_COMPONENT_DEPENDENCIES = {
    'database': [],
    'redis': [],
    'ib_connection': ['database'],
    'subscription_manager': ['ib_connection', 'redis'],
    'l2_microstructure': ['subscription_manager'],
    'spy_microstructure_adapter': ['l2_microstructure', 'redis'],
    'ml_data_collector': ['redis'],
    'enhanced_features': ['spy_microstructure_adapter', 'database'],
    'ml_integration': ['enhanced_features'],
    'paper_trader': ['ml_integration', 'enhanced_features'],
}

Parallel startup within levels gave a 3.9× speedup at 8 workers.

Health monitoring at 10Hz

The monitor checks every feature every 100ms. Each feature accumulates a metrics record; from that, a 0–100 health score; from that, a status (HEALTHY, WARNING, CRITICAL, DISABLED).

class FeatureHealthMonitor:
    """Real-time feature health monitoring at 10Hz"""

    def __init__(self):
        self.redis_client = redis.Redis(decode_responses=True)
        self.monitoring_active = False

        self.zero_rate_warning = 30.0
        self.zero_rate_critical = 50.0
        self.consecutive_zero_limit = 10

        self.feature_metrics: Dict[str, FeatureMetrics] = {}
        self.health_scores: Dict[str, float] = {}

    async def start_monitoring(self):
        self.monitoring_active = True
        await self._monitor_loop()

    async def _monitor_loop(self):
        while self.monitoring_active:
            features = await self._get_latest_features()

            if features:
                for feature_name, value in features.items():
                    self._update_feature_metrics(feature_name, value)

                system_healthy = self._check_system_health()
                await self._publish_health_status(system_healthy)

            await asyncio.sleep(0.1)  # 10Hz

    def _update_feature_metrics(self, feature_name: str, value: float):
        if feature_name not in self.feature_metrics:
            self.feature_metrics[feature_name] = FeatureMetrics()

        metrics = self.feature_metrics[feature_name]

        if value == 0:
            metrics.zero_count += 1
            metrics.consecutive_zeros += 1
        else:
            metrics.consecutive_zeros = 0

        if np.isnan(value) or np.isinf(value):
            metrics.nan_inf_count += 1

        metrics.total_count += 1

        self.health_scores[feature_name] = self._calculate_health_score(
            feature_name
        )

    def _calculate_health_score(self, feature_name: str) -> float:
        metrics = self.feature_metrics[feature_name]

        if metrics.total_count == 0:
            return 0.0

        score = 100.0

        zero_rate = (metrics.zero_count / metrics.total_count) * 100
        if zero_rate > self.zero_rate_critical:
            score -= 50.0
        elif zero_rate > self.zero_rate_warning:
            score -= 25.0

        if metrics.consecutive_zeros > self.consecutive_zero_limit:
            score -= 30.0

        nan_rate = (metrics.nan_inf_count / metrics.total_count) * 100
        if nan_rate > 1.0:
            score -= 20.0

        return max(0.0, score)

    def get_feature_status(self, feature_name: str) -> str:
        score = self.health_scores.get(feature_name, 0.0)

        if score >= 80:
            return "HEALTHY"
        elif score >= 50:
            return "WARNING"
        elif score > 0:
            return "CRITICAL"
        else:
            return "DISABLED"

And a system-level rollup:

def get_system_health(self) -> Dict[str, Any]:
    health = {
        'redis': 'OK',
        'features': 'Unknown',
        'trading': 'Unknown',
        'data_age': 'Unknown',
        'uptime': self._get_uptime(),
        'overall_status': 'OK',
        'components': {}
    }

    try:
        self.redis_client.ping()
        health['redis'] = 'OK'
    except Exception as e:
        health['redis'] = f'ERROR: {e}'
        health['overall_status'] = 'DEGRADED'

    data_age = self._check_data_freshness()
    health['data_age'] = data_age
    if 'OLD' in data_age or 'STALE' in data_age:
        health['overall_status'] = 'DEGRADED'

    feature_status = self._check_feature_pipeline()
    health['features'] = feature_status
    if feature_status == 'ERROR':
        health['overall_status'] = 'CRITICAL'

    return health

Numbers

The combined optimization moved every metric that mattered:

MetricBeforeAfterChange
1M-row processing66 min1.84s2,156×
LSTM throughput11,900 rows/s50,694 rows/s4.3×
SJM throughput83,000 rows/s463,723 rows/s5.6×
Memory at 10M rows2.4 GB220 MB−91%
Startup time45s8s5.6×
Health check latency<100ms

Per-engine, on 100K rows:

EngineTimeThroughputMemory
LSTM1.97s50,694 rows/s53.4 MB
XGBoost3.04s32,886 rows/s12.5 MB
SJM0.22s463,723 rows/s0.6 MB

The original target was 1M rows in under 5 minutes. Measured 1.84 seconds — 163× faster than target.

The async wrapping itself was responsible for most of the I/O recovery:

OperationBeforeAfterChange
Model inference I/O latency2,000ms degraded100ms maintained20×
Concurrent predictionsSequentialParallel8× throughput
Event-loop responsivenessBlockedNon-blocking

In production

Six months running real capital ($25K+). 542,000 rows/second sustained ingest. 99.7% uptime. Zero data-loss incidents. Sub-second regime detection.

The model ensemble:

  • 6-state Statistical Jump Model — primary regime detection
  • Prophet — 15-minute price forecasting
  • LSTM — 5-minute microstructure prediction
  • 17 features across multiple timeframes

Update cadence:

  • Regime detection: 5-second cycle
  • Feature health monitoring: 100ms (10Hz)
  • Prediction latency: sub-second
  • Component startup: parallel within dependency levels, 3.9× speedup

Alerts

LevelConditionAction
CRITICALZero rate >50%, NaN detected, Redis downEmergency stop trading
WARNINGZero rate >30%, data >15min oldNotify, investigate
INFONormalLog only

Health-check shape

{
    'healthy': True,
    'phase': 'RUNNING',
    'check_time_ms': 87.3,
    'components': {
        'redis': 'OK',
        'features': 'HEALTHY (98.3% quality)',
        'data_age': 'Fresh (12s ago)',
        'ml_integration': 'RUNNING',
        'regime_detection': 'HEALTHY'
    },
    'active_cursors': 8,
    'uptime_hours': 156.3,
    'overall_status': 'OK'
}

Data freshness

def _check_data_freshness(self) -> str:
    timestamp_keys = [
        'atlas:timestamp',
        'atlas:features:timestamp',
        'atlas:market:timestamp',
    ]

    latest = None
    for key in timestamp_keys:
        ts = self.redis_client.get(key)
        if ts:
            dt = datetime.fromisoformat(ts)
            if latest is None or dt > latest:
                latest = dt

    if latest is None:
        return 'No data'

    age = datetime.now() - latest

    if age < timedelta(minutes=5):
        return f'Fresh ({int(age.total_seconds())}s ago)'
    elif age < timedelta(minutes=15):
        return f'Recent ({int(age.total_seconds()/60)}m ago)'
    elif age < timedelta(hours=1):
        return f'OLD ({int(age.total_seconds()/60)}m ago)'
    else:
        return f'STALE ({int(age.total_seconds()/3600)}h ago)'

Things this taught me

1. Async-wrap your CPU-bound inference. Don’t put model.predict() in the body of an async function — it blocks the entire event loop. loop.run_in_executor(None, model.predict, X) keeps the loop free.

2. Dependency-ordered startup prevents cascades. Before the orchestrator, one failed component took everything else with it. After, failures are isolated and retry with backoff. Uptime moved from 87% to 99.7%.

3. 10Hz monitoring catches degradation before P&L does. Three potential trading incidents in six months were avoided because the feature monitor flagged the issue before the model traded on bad data. The monitoring overhead is <1ms.

4. Memory at 10M rows requires effort. Three changes — lazy evaluation, feature selection, explicit garbage collection between batches — took resident memory from 2.4GB to 220MB.

5. Hot-swap models without downtime. Atlas has had 12 production model updates with zero downtime. The pattern is straightforward: load the new model, validate it, atomic-swap the reference.

async def switch_model(self, new_version: str):
    new_model = self.model_manager.load_model('sjm', new_version)

    health = await self._validate_model(new_model)
    if not health['valid']:
        raise RuntimeError(f"Model validation failed: {health}")

    old_model = self.model
    self.model = new_model
    del old_model

    logger.info(f"Switched to model version {new_version}")

The model is the smaller half of this. Most of the work is infrastructure — async patterns, dependency-ordered startup, health monitoring, memory discipline. Get those right and a 7-day average trader becomes a 24/7 system that doesn’t lose money to itself.

Part 1: ML deployment reference guide.