Part 2 of 2 in the ML Deployment series
Atlas is a quantitative trading system managing $25K+ capital with 0-3 DTE options strategies. The system needed to detect market regime changes in real-time to adjust trading behavior—but deploying ML models into a production trading environment presented unique challenges:
Critical requirements:
The system needed to process streaming market data at 542,000 rows/second while maintaining model accuracy and handling the chaos of live trading.
Early prototypes revealed fundamental deployment issues:
Initial benchmark (100K rows):
| Component | Latency | Memory | Issue |
|---|---|---|---|
| LSTM Engine | 8.4s | 245 MB | Blocking TensorFlow calls |
| XGBoost Engine | 5.7s | 89 MB | Synchronous predictions |
| SJM Engine | 1.2s | 12 MB | No async support |
| Health Checks | None | N/A | Zero observability |
System bottleneck: Synchronous model inference in async event loops.
The production deployment required three coordinated systems:
Replace blocking model calls with async wrapper pattern.
File: ml_integration/regime_detection/regime_service.py
class RegimeDetectionService:
"""Configuration-driven regime detection service"""
def __init__(self, model_version: Optional[str] = None,
enable_stability_fixes: bool = True):
self.redis_client = redis.Redis(decode_responses=True)
# Get managers
self.model_manager = get_model_manager()
self.config_manager = get_ml_config_manager()
# Load model (uses active version if not specified)
if model_version:
self.model = self.model_manager.load_model('sjm', model_version)
else:
self.model = self.model_manager.get_active_model('sjm')
# Initialize stability enhancer
if enable_stability_fixes:
self.stability_enhancer = RegimeStabilityEnhancer(
min_confidence=0.70, # Production threshold
max_transitions_per_hour=10,
feature_zero_threshold=0.5,
covariance_regularization=1e-4
)
self.enhanced_predictor = EnhancedRegimePredictor(
self.model, self.stability_enhancer
)
async def start(self):
"""Start the regime detection service"""
try:
while True:
await self._update_regime()
await asyncio.sleep(5) # Update every 5 seconds
except Exception as e:
logger.error(f"Regime detection service error: {e}")
raise
async def _update_regime(self):
"""Update regime detection (non-blocking)"""
try:
# Get latest features from pipeline
features = await self.pipeline_connector.get_latest_features()
if features is None:
logger.warning("No features available")
return
# Transform features (CPU-bound, run in executor)
loop = asyncio.get_event_loop()
X = await loop.run_in_executor(
None,
self.feature_transformer.transform,
features
)
# Predict regime (CPU-bound, run in executor)
prediction = await loop.run_in_executor(
None,
self.enhanced_predictor.predict,
X
)
# Publish to Redis (async I/O)
await self._publish_regime(prediction)
except Exception as e:
logger.error(f"Error updating regime: {e}", exc_info=True)
Key pattern: CPU-bound model inference runs in ThreadPoolExecutor via run_in_executor(), preserving async event loop responsiveness.
Performance gain:
Manage startup dependencies and parallel initialization.
File: core/component_orchestrator.py
class ComponentOrchestrator:
"""Orchestrates component startup with dependency management"""
def __init__(self):
self.components: Dict[str, ComponentInfo] = {}
self.component_states: Dict[str, ComponentState] = {}
self.dependencies = DependencyGraph()
self._lock = asyncio.Lock()
# Configuration
self.startup_timeout = 30.0
self.max_retries = 3
self.retry_delay = 1.0
async def start_all(self) -> bool:
"""Start all components in dependency order"""
# Calculate dependency levels for parallel startup
levels = self._calculate_dependency_levels()
for level, components in sorted(levels.items()):
# Start components at this level in parallel
tasks = []
for comp_name in components:
# Verify dependencies met
if not await self.verify_dependencies(comp_name):
raise RuntimeError(f"Dependencies not met: {comp_name}")
# Create startup task with retries
task = self._start_with_retries(comp_name)
tasks.append(task)
# Wait for all components at this level
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for failures
for comp_name, result in zip(components, results):
if isinstance(result, Exception):
logger.error(f"Failed to start {comp_name}: {result}")
return False
return True
async def _start_with_retries(self, comp_name: str) -> bool:
"""Start component with exponential backoff retry"""
for attempt in range(self.max_retries):
try:
# Get component
comp_info = self.components[comp_name]
# Start with timeout
async with asyncio.timeout(self.startup_timeout):
await comp_info.start_func()
# Verify health
if comp_info.health_check:
health = await comp_info.health_check()
if not health.get('healthy', False):
raise RuntimeError(f"Health check failed: {health}")
# Update state
self.component_states[comp_name] = ComponentState.RUNNING
logger.info(f"✅ Started {comp_name}")
return True
except Exception as e:
logger.warning(
f"Attempt {attempt + 1}/{self.max_retries} failed "
f"for {comp_name}: {e}"
)
if attempt < self.max_retries - 1:
# Exponential backoff
delay = self.retry_delay * (2 ** attempt)
await asyncio.sleep(delay)
else:
logger.error(f"❌ Failed to start {comp_name} after retries")
raise
return False
Dependency graph:
ATLAS_COMPONENT_DEPENDENCIES = {
'database': [],
'redis': [],
'ib_connection': ['database'],
'subscription_manager': ['ib_connection', 'redis'],
'l2_microstructure': ['subscription_manager'],
'spy_microstructure_adapter': ['l2_microstructure', 'redis'],
'ml_data_collector': ['redis'],
'enhanced_features': ['spy_microstructure_adapter', 'database'],
'ml_integration': ['enhanced_features'],
'paper_trader': ['ml_integration', 'enhanced_features'],
}
Parallel startup: Components at same dependency level start concurrently (3.9x speedup with 8 workers).
Track model and feature health at 10Hz.
File: ml_integration/monitoring/feature_health_monitor.py
class FeatureHealthMonitor:
"""Real-time feature health monitoring at 10Hz"""
def __init__(self):
self.redis_client = redis.Redis(decode_responses=True)
self.monitoring_active = False
# Health thresholds
self.zero_rate_warning = 30.0 # % zeros before warning
self.zero_rate_critical = 50.0 # % zeros before critical
self.consecutive_zero_limit = 10 # Max consecutive zeros
# Metrics storage
self.feature_metrics: Dict[str, FeatureMetrics] = {}
self.health_scores: Dict[str, float] = {}
async def start_monitoring(self):
"""Start the monitoring loop"""
self.monitoring_active = True
await self._monitor_loop()
async def _monitor_loop(self):
"""Main monitoring loop - checks features every 100ms"""
while self.monitoring_active:
features = await self._get_latest_features()
if features:
for feature_name, value in features.items():
self._update_feature_metrics(feature_name, value)
# Check system health
system_healthy = self._check_system_health()
await self._publish_health_status(system_healthy)
await asyncio.sleep(0.1) # 10Hz monitoring
def _update_feature_metrics(self, feature_name: str, value: float):
"""Update metrics for a feature"""
if feature_name not in self.feature_metrics:
self.feature_metrics[feature_name] = FeatureMetrics()
metrics = self.feature_metrics[feature_name]
# Track zeros
if value == 0:
metrics.zero_count += 1
metrics.consecutive_zeros += 1
else:
metrics.consecutive_zeros = 0
# Track NaN/Inf
if np.isnan(value) or np.isinf(value):
metrics.nan_inf_count += 1
# Update total count
metrics.total_count += 1
# Calculate health score
self.health_scores[feature_name] = self._calculate_health_score(
feature_name
)
def _calculate_health_score(self, feature_name: str) -> float:
"""Calculate 0-100 health score for feature"""
metrics = self.feature_metrics[feature_name]
if metrics.total_count == 0:
return 0.0
# Start at 100
score = 100.0
# Penalize zero rate
zero_rate = (metrics.zero_count / metrics.total_count) * 100
if zero_rate > self.zero_rate_critical:
score -= 50.0
elif zero_rate > self.zero_rate_warning:
score -= 25.0
# Penalize consecutive zeros
if metrics.consecutive_zeros > self.consecutive_zero_limit:
score -= 30.0
# Penalize NaN/Inf
nan_rate = (metrics.nan_inf_count / metrics.total_count) * 100
if nan_rate > 1.0:
score -= 20.0
return max(0.0, score)
def get_feature_status(self, feature_name: str) -> str:
"""Get status: HEALTHY, WARNING, CRITICAL, or DISABLED"""
score = self.health_scores.get(feature_name, 0.0)
if score >= 80:
return "HEALTHY"
elif score >= 50:
return "WARNING"
elif score > 0:
return "CRITICAL"
else:
return "DISABLED"
Health check tiers:
def get_system_health(self) -> Dict[str, Any]:
"""Comprehensive system health status"""
health = {
'redis': 'OK',
'features': 'Unknown',
'trading': 'Unknown',
'data_age': 'Unknown',
'uptime': self._get_uptime(),
'overall_status': 'OK',
'components': {}
}
# Check Redis connectivity
try:
self.redis_client.ping()
health['redis'] = 'OK'
except Exception as e:
health['redis'] = f'ERROR: {e}'
health['overall_status'] = 'DEGRADED'
# Check data freshness
data_age = self._check_data_freshness()
health['data_age'] = data_age
if 'OLD' in data_age or 'STALE' in data_age:
health['overall_status'] = 'DEGRADED'
# Check feature pipeline
feature_status = self._check_feature_pipeline()
health['features'] = feature_status
if feature_status == 'ERROR':
health['overall_status'] = 'CRITICAL'
return health
| Metric | Baseline | Optimized | Improvement |
|---|---|---|---|
| 1M row processing | 66 minutes | 1.84 seconds | 2,156X faster |
| LSTM throughput | 11,900 rows/sec | 50,694 rows/sec | 4.3X faster |
| SJM throughput | 83,000 rows/sec | 463,723 rows/sec | 5.6X faster |
| Memory (10M rows) | 2.4 GB | 220 MB | 91% reduction |
| Startup time | 45 seconds | 8 seconds | 5.6X faster |
| Health check latency | N/A | <100ms | Real-time monitoring |
| Engine | Processing Time (100K rows) | Throughput | Memory |
|---|---|---|---|
| LSTM | 1.97 seconds | 50,694 rows/sec | 53.4 MB |
| XGBoost | 3.04 seconds | 32,886 rows/sec | 12.5 MB |
| SJM | 0.22 seconds | 463,723 rows/sec | 0.6 MB |
Target achievement:
| Operation | Before (Blocking) | After (Async) | Improvement |
|---|---|---|---|
| Model inference I/O | 2000ms degradation | 100ms maintained | 20X better |
| Concurrent predictions | Sequential | Parallel | 8X throughput |
| Event loop responsiveness | Blocked | Non-blocking | Real-time capable |
Production deployment:
Model ensemble:
Real-time capabilities:
| Alert Level | Condition | Action |
|---|---|---|
| CRITICAL | Zero rate >50%, NaN detected, Redis down | Emergency stop trading |
| WARNING | Zero rate >30%, data >15min old | Notification, log analysis |
| INFO | Normal operation | Routine logging |
{
'healthy': True,
'phase': 'RUNNING',
'check_time_ms': 87.3,
'components': {
'redis': 'OK',
'features': 'HEALTHY (98.3% quality)',
'data_age': 'Fresh (12s ago)',
'ml_integration': 'RUNNING',
'regime_detection': 'HEALTHY'
},
'active_cursors': 8,
'uptime_hours': 156.3,
'overall_status': 'OK'
}
def _check_data_freshness(self) -> str:
"""Check how fresh the data is"""
timestamp_keys = [
'atlas:timestamp',
'atlas:features:timestamp',
'atlas:market:timestamp',
]
# Find latest timestamp
latest = None
for key in timestamp_keys:
ts = self.redis_client.get(key)
if ts:
dt = datetime.fromisoformat(ts)
if latest is None or dt > latest:
latest = dt
if latest is None:
return 'No data'
age = datetime.now() - latest
if age < timedelta(minutes=5):
return f'Fresh ({int(age.total_seconds())}s ago)'
elif age < timedelta(minutes=15):
return f'Recent ({int(age.total_seconds()/60)}m ago)'
elif age < timedelta(hours=1):
return f'OLD ({int(age.total_seconds()/60)}m ago)'
else:
return f'STALE ({int(age.total_seconds()/3600)}h ago)'
Don’t block the event loop with synchronous model inference.
Pattern:
# ❌ Blocks event loop
prediction = self.model.predict(features)
# ✅ Non-blocking
loop = asyncio.get_event_loop()
prediction = await loop.run_in_executor(
None,
self.model.predict,
features
)
Impact: Preserved 100ms async I/O performance while running CPU-intensive models.
Dependencies must start in correct order with health verification.
Before: Single component failure crashed entire system After: Failed components isolated, dependencies retry with backoff
Result: 99.7% uptime vs. 87% in prototype.
10Hz health checks caught issues before they impacted trading:
Cost: <1ms monitoring overhead Value: Prevented 3 potential trading incidents in 6 months
Processing 10M rows requires careful memory handling:
| Technique | Memory Saved | Implementation |
|---|---|---|
| Lazy evaluation | 60% | Process in chunks, don’t materialize |
| Feature selection | 30% | Only compute needed features |
| Garbage collection tuning | 15% | Explicit cleanup after batches |
Result: 2.4GB → 220MB (91% reduction)
Production model updates without downtime:
async def switch_model(self, new_version: str):
"""Switch to new model version without downtime"""
# Load new model
new_model = self.model_manager.load_model('sjm', new_version)
# Validate before switching
health = await self._validate_model(new_model)
if not health['valid']:
raise RuntimeError(f"Model validation failed: {health}")
# Atomic swap
old_model = self.model
self.model = new_model
# Cleanup old model
del old_model
logger.info(f"✅ Switched to model version {new_version}")
Usage: Updated regime detection model 12 times in production, zero downtime.
Deploying the Atlas Forecasting System to production required systematic application of ML deployment patterns:
Production validation:
The system demonstrates that production ML deployment is 80% engineering discipline (async patterns, dependency management, health monitoring, memory optimization) and 20% model performance (accuracy, latency). Get the infrastructure right, and the models can deliver value reliably.
For systems where reliability matters—trading, healthcare, autonomous systems—measure everything, fail gracefully, and build observability from day one.
Part of the ML Deployment series: