// article
Part 2 of 2 in the ML deployment series. Part 1: ML deployment reference guide.
Atlas trades 0–3 DTE SPY options with real capital. The forecasting layer has three jobs: detect what market regime we’re in, predict the next 5–15 minutes of price action, and read microstructure signals from the order book. None of that matters if the system can’t keep up with the data, can’t tolerate a single component failing, or can’t tell you it’s broken.
This is the engineering writeup of how the production deployment got there. The model work is the easy half. The harder half is the infrastructure around it.
The prototype had every problem you’d expect from a notebook hauled into production:
asyncio service degraded 100ms tasks to 2+ seconds because the event loop couldn’t make progress while inference ran.Baseline measurements on 100K rows:
| Component | Latency | Memory | Failure mode |
|---|---|---|---|
| LSTM engine | 8.4s | 245 MB | Blocking TensorFlow calls |
| XGBoost engine | 5.7s | 89 MB | Synchronous predictions |
| SJM (Statistical Jump Model) | 1.2s | 12 MB | No async support |
| Health checks | — | — | None |
Everything traces back to one bottleneck: synchronous model inference inside an async service.
The model itself is CPU-bound. The fix isn’t to make the model async — it’s to push the model into a thread pool and await it from the async service.
class RegimeDetectionService:
"""Configuration-driven regime detection service"""
def __init__(self, model_version: Optional[str] = None,
enable_stability_fixes: bool = True):
self.redis_client = redis.Redis(decode_responses=True)
self.model_manager = get_model_manager()
self.config_manager = get_ml_config_manager()
if model_version:
self.model = self.model_manager.load_model('sjm', model_version)
else:
self.model = self.model_manager.get_active_model('sjm')
if enable_stability_fixes:
self.stability_enhancer = RegimeStabilityEnhancer(
min_confidence=0.70,
max_transitions_per_hour=10,
feature_zero_threshold=0.5,
covariance_regularization=1e-4
)
self.enhanced_predictor = EnhancedRegimePredictor(
self.model, self.stability_enhancer
)
async def start(self):
try:
while True:
await self._update_regime()
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Regime detection service error: {e}")
raise
async def _update_regime(self):
try:
features = await self.pipeline_connector.get_latest_features()
if features is None:
logger.warning("No features available")
return
# CPU-bound — push into the executor
loop = asyncio.get_event_loop()
X = await loop.run_in_executor(
None,
self.feature_transformer.transform,
features
)
prediction = await loop.run_in_executor(
None,
self.enhanced_predictor.predict,
X
)
# Async I/O is fine on the loop
await self._publish_regime(prediction)
except Exception as e:
logger.error(f"Error updating regime: {e}", exc_info=True)
run_in_executor() is doing the work. CPU-bound functions go into the default ThreadPoolExecutor; the async loop stays responsive for I/O.
Result: blocking calls that previously degraded async I/O from 100ms to 2,000ms now keep the I/O at 100ms.
Components have dependencies. The database has to come up before anything that queries it. Redis has to come up before the things publishing to it. The orchestrator computes dependency levels and starts components level-by-level — within a level, in parallel.
class ComponentOrchestrator:
"""Orchestrates component startup with dependency management"""
def __init__(self):
self.components: Dict[str, ComponentInfo] = {}
self.component_states: Dict[str, ComponentState] = {}
self.dependencies = DependencyGraph()
self._lock = asyncio.Lock()
self.startup_timeout = 30.0
self.max_retries = 3
self.retry_delay = 1.0
async def start_all(self) -> bool:
levels = self._calculate_dependency_levels()
for level, components in sorted(levels.items()):
tasks = []
for comp_name in components:
if not await self.verify_dependencies(comp_name):
raise RuntimeError(f"Dependencies not met: {comp_name}")
task = self._start_with_retries(comp_name)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for comp_name, result in zip(components, results):
if isinstance(result, Exception):
logger.error(f"Failed to start {comp_name}: {result}")
return False
return True
async def _start_with_retries(self, comp_name: str) -> bool:
for attempt in range(self.max_retries):
try:
comp_info = self.components[comp_name]
async with asyncio.timeout(self.startup_timeout):
await comp_info.start_func()
if comp_info.health_check:
health = await comp_info.health_check()
if not health.get('healthy', False):
raise RuntimeError(f"Health check failed: {health}")
self.component_states[comp_name] = ComponentState.RUNNING
logger.info(f"Started {comp_name}")
return True
except Exception as e:
logger.warning(
f"Attempt {attempt + 1}/{self.max_retries} failed "
f"for {comp_name}: {e}"
)
if attempt < self.max_retries - 1:
delay = self.retry_delay * (2 ** attempt)
await asyncio.sleep(delay)
else:
logger.error(f"Failed to start {comp_name} after retries")
raise
return False
The actual Atlas dependency graph:
ATLAS_COMPONENT_DEPENDENCIES = {
'database': [],
'redis': [],
'ib_connection': ['database'],
'subscription_manager': ['ib_connection', 'redis'],
'l2_microstructure': ['subscription_manager'],
'spy_microstructure_adapter': ['l2_microstructure', 'redis'],
'ml_data_collector': ['redis'],
'enhanced_features': ['spy_microstructure_adapter', 'database'],
'ml_integration': ['enhanced_features'],
'paper_trader': ['ml_integration', 'enhanced_features'],
}
Parallel startup within levels gave a 3.9× speedup at 8 workers.
The monitor checks every feature every 100ms. Each feature accumulates a metrics record; from that, a 0–100 health score; from that, a status (HEALTHY, WARNING, CRITICAL, DISABLED).
class FeatureHealthMonitor:
"""Real-time feature health monitoring at 10Hz"""
def __init__(self):
self.redis_client = redis.Redis(decode_responses=True)
self.monitoring_active = False
self.zero_rate_warning = 30.0
self.zero_rate_critical = 50.0
self.consecutive_zero_limit = 10
self.feature_metrics: Dict[str, FeatureMetrics] = {}
self.health_scores: Dict[str, float] = {}
async def start_monitoring(self):
self.monitoring_active = True
await self._monitor_loop()
async def _monitor_loop(self):
while self.monitoring_active:
features = await self._get_latest_features()
if features:
for feature_name, value in features.items():
self._update_feature_metrics(feature_name, value)
system_healthy = self._check_system_health()
await self._publish_health_status(system_healthy)
await asyncio.sleep(0.1) # 10Hz
def _update_feature_metrics(self, feature_name: str, value: float):
if feature_name not in self.feature_metrics:
self.feature_metrics[feature_name] = FeatureMetrics()
metrics = self.feature_metrics[feature_name]
if value == 0:
metrics.zero_count += 1
metrics.consecutive_zeros += 1
else:
metrics.consecutive_zeros = 0
if np.isnan(value) or np.isinf(value):
metrics.nan_inf_count += 1
metrics.total_count += 1
self.health_scores[feature_name] = self._calculate_health_score(
feature_name
)
def _calculate_health_score(self, feature_name: str) -> float:
metrics = self.feature_metrics[feature_name]
if metrics.total_count == 0:
return 0.0
score = 100.0
zero_rate = (metrics.zero_count / metrics.total_count) * 100
if zero_rate > self.zero_rate_critical:
score -= 50.0
elif zero_rate > self.zero_rate_warning:
score -= 25.0
if metrics.consecutive_zeros > self.consecutive_zero_limit:
score -= 30.0
nan_rate = (metrics.nan_inf_count / metrics.total_count) * 100
if nan_rate > 1.0:
score -= 20.0
return max(0.0, score)
def get_feature_status(self, feature_name: str) -> str:
score = self.health_scores.get(feature_name, 0.0)
if score >= 80:
return "HEALTHY"
elif score >= 50:
return "WARNING"
elif score > 0:
return "CRITICAL"
else:
return "DISABLED"
And a system-level rollup:
def get_system_health(self) -> Dict[str, Any]:
health = {
'redis': 'OK',
'features': 'Unknown',
'trading': 'Unknown',
'data_age': 'Unknown',
'uptime': self._get_uptime(),
'overall_status': 'OK',
'components': {}
}
try:
self.redis_client.ping()
health['redis'] = 'OK'
except Exception as e:
health['redis'] = f'ERROR: {e}'
health['overall_status'] = 'DEGRADED'
data_age = self._check_data_freshness()
health['data_age'] = data_age
if 'OLD' in data_age or 'STALE' in data_age:
health['overall_status'] = 'DEGRADED'
feature_status = self._check_feature_pipeline()
health['features'] = feature_status
if feature_status == 'ERROR':
health['overall_status'] = 'CRITICAL'
return health
The combined optimization moved every metric that mattered:
| Metric | Before | After | Change |
|---|---|---|---|
| 1M-row processing | 66 min | 1.84s | 2,156× |
| LSTM throughput | 11,900 rows/s | 50,694 rows/s | 4.3× |
| SJM throughput | 83,000 rows/s | 463,723 rows/s | 5.6× |
| Memory at 10M rows | 2.4 GB | 220 MB | −91% |
| Startup time | 45s | 8s | 5.6× |
| Health check latency | — | <100ms | — |
Per-engine, on 100K rows:
| Engine | Time | Throughput | Memory |
|---|---|---|---|
| LSTM | 1.97s | 50,694 rows/s | 53.4 MB |
| XGBoost | 3.04s | 32,886 rows/s | 12.5 MB |
| SJM | 0.22s | 463,723 rows/s | 0.6 MB |
The original target was 1M rows in under 5 minutes. Measured 1.84 seconds — 163× faster than target.
The async wrapping itself was responsible for most of the I/O recovery:
| Operation | Before | After | Change |
|---|---|---|---|
| Model inference I/O latency | 2,000ms degraded | 100ms maintained | 20× |
| Concurrent predictions | Sequential | Parallel | 8× throughput |
| Event-loop responsiveness | Blocked | Non-blocking | — |
Six months running real capital ($25K+). 542,000 rows/second sustained ingest. 99.7% uptime. Zero data-loss incidents. Sub-second regime detection.
The model ensemble:
Update cadence:
| Level | Condition | Action |
|---|---|---|
| CRITICAL | Zero rate >50%, NaN detected, Redis down | Emergency stop trading |
| WARNING | Zero rate >30%, data >15min old | Notify, investigate |
| INFO | Normal | Log only |
{
'healthy': True,
'phase': 'RUNNING',
'check_time_ms': 87.3,
'components': {
'redis': 'OK',
'features': 'HEALTHY (98.3% quality)',
'data_age': 'Fresh (12s ago)',
'ml_integration': 'RUNNING',
'regime_detection': 'HEALTHY'
},
'active_cursors': 8,
'uptime_hours': 156.3,
'overall_status': 'OK'
}
def _check_data_freshness(self) -> str:
timestamp_keys = [
'atlas:timestamp',
'atlas:features:timestamp',
'atlas:market:timestamp',
]
latest = None
for key in timestamp_keys:
ts = self.redis_client.get(key)
if ts:
dt = datetime.fromisoformat(ts)
if latest is None or dt > latest:
latest = dt
if latest is None:
return 'No data'
age = datetime.now() - latest
if age < timedelta(minutes=5):
return f'Fresh ({int(age.total_seconds())}s ago)'
elif age < timedelta(minutes=15):
return f'Recent ({int(age.total_seconds()/60)}m ago)'
elif age < timedelta(hours=1):
return f'OLD ({int(age.total_seconds()/60)}m ago)'
else:
return f'STALE ({int(age.total_seconds()/3600)}h ago)'
1. Async-wrap your CPU-bound inference. Don’t put model.predict() in the body of an async function — it blocks the entire event loop. loop.run_in_executor(None, model.predict, X) keeps the loop free.
2. Dependency-ordered startup prevents cascades. Before the orchestrator, one failed component took everything else with it. After, failures are isolated and retry with backoff. Uptime moved from 87% to 99.7%.
3. 10Hz monitoring catches degradation before P&L does. Three potential trading incidents in six months were avoided because the feature monitor flagged the issue before the model traded on bad data. The monitoring overhead is <1ms.
4. Memory at 10M rows requires effort. Three changes — lazy evaluation, feature selection, explicit garbage collection between batches — took resident memory from 2.4GB to 220MB.
5. Hot-swap models without downtime. Atlas has had 12 production model updates with zero downtime. The pattern is straightforward: load the new model, validate it, atomic-swap the reference.
async def switch_model(self, new_version: str):
new_model = self.model_manager.load_model('sjm', new_version)
health = await self._validate_model(new_model)
if not health['valid']:
raise RuntimeError(f"Model validation failed: {health}")
old_model = self.model
self.model = new_model
del old_model
logger.info(f"Switched to model version {new_version}")
The model is the smaller half of this. Most of the work is infrastructure — async patterns, dependency-ordered startup, health monitoring, memory discipline. Get those right and a 7-day average trader becomes a 24/7 system that doesn’t lose money to itself.
Part 1: ML deployment reference guide.