The trading platform used DuckDB for time-series queries. Queries were fast enough—1-2ms typical—but not optimal. Each query read full datasets from Parquet files, then filtered in memory.
Query: "SPY bars where close >= $450"
DuckDB execution:
1. Read all 100,000 bars from Parquet (1MB compressed)
2. Decompress to memory (7.2MB)
3. Filter in memory
4. Return 30,000 matching bars
Latency: 1.2ms
I/O: 1MB read for 300KB result (70% wasted)
The inefficiency was clear: reading 100% of data to return 30%. For selective queries, most I/O was wasted.
DuckDB is a general-purpose SQL engine. Excellent for ad-hoc queries, complex joins, window functions. Overkill for simple time-series lookups.
The decision: build a custom query engine optimized for the specific workload. Single-table queries, time-based partitions, simple filters. No SQL parser overhead, no FFI boundary, direct Parquet access.
Profiling started with Criterion.rs benchmarks. The baseline measured query performance before optimization.
Test setup: 100,000 bars, price range 400-500, various filter selectivities.
| Query Type | Latency (ns) | I/O Read | Bars Returned |
|---|---|---|---|
| High selectivity (90% filtered) | 2,781 | 100% | 10% |
| Medium selectivity (50% filtered) | 3,011 | 100% | 50% |
| Low selectivity (10% filtered) | 3,367 | 100% | 90% |
| No filter (baseline) | 1,341 | 100% | 100% |
Filtered queries were slower than unfiltered. Paradox: reading 100K bars and keeping 10K took longer than reading 100K bars and keeping all.
The overhead came from the filter operation itself. Each bar required field access and comparison. For 90K discarded bars, that was wasted CPU.
// Query executor - baseline implementation
let mut all_bars = Vec::new();
// Read ALL data from all partitions
for partition in &partitions {
if let Ok(bars) = self.storage.read(symbol, partition.as_str()) {
all_bars.extend(bars); // No filtering
}
}
// Filter in memory AFTER reading everything
for filter in &query.filters {
all_bars.retain(|bar| filter.matches_ohlcv(bar));
}
Every query, regardless of filters, read 100% of data. Filters applied after I/O, decompression, and deserialization.
The problem was obvious. The solution was predicate pushdown: filter at the storage layer.
Predicate pushdown moves filtering from memory to storage. Two levels: row group pruning (skip chunks based on statistics) and Arrow compute filtering (filter before deserialization).
Parquet files contain metadata with min/max statistics per row group. For “close >= 450”, any row group with max_close < 450 can be skipped entirely.
Parquet File Structure:
├── Metadata (10KB)
│ ├── Row Group 0: {min: 440.0, max: 445.0} → SKIP (max < 450)
│ ├── Row Group 1: {min: 445.0, max: 452.0} → READ (overlaps)
│ ├── Row Group 2: {min: 450.0, max: 455.0} → READ (all match)
│ └── ...
├── Row Group 0 Data (100KB compressed) → Never read from disk
├── Row Group 1 Data (100KB compressed) → Read and filter
└── Row Group 2 Data (100KB compressed) → Read
Row Group 0 is never touched. I/O saved: 33% in this example. For highly selective queries, savings reach 90%.
For row groups that might contain matches, apply filters before deserializing to OHLCV structs.
// Read row group as Arrow RecordBatch (columnar)
let batch = reader.next()?;
// Apply filter using Arrow compute kernels
let filtered_batch = filter::apply_filters(&batch, filters)?;
// Deserialize only filtered rows to OHLCV
let bars = Self::from_record_batch(&filtered_batch)?;
Arrow compute kernels use SIMD automatically. Comparisons process 4-8 values per instruction. Filtering happens on columnar data before conversion to structs.
pub fn read_with_filters(
&self,
symbol: &str,
partition: &str,
filters: &[QueryFilter],
) -> Result<Vec<OHLCV>> {
let file = File::open(&file_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
// LEVEL 1: Row group pruning
let metadata = builder.metadata();
let row_groups_to_read: Vec<usize> = metadata
.row_groups()
.iter()
.enumerate()
.filter_map(|(idx, rg)| {
if filter::should_read_row_group(rg, filters) {
Some(idx)
} else {
None // Skip this row group
}
})
.collect();
// Early exit if no row groups match
if row_groups_to_read.is_empty() {
return Ok(Vec::new());
}
// Build reader with selected row groups only
let reader = builder
.with_row_groups(row_groups_to_read)
.build()?;
let mut all_data = Vec::new();
// LEVEL 2: Arrow compute filtering
for batch_result in reader {
let batch = batch_result?;
let filtered_batch = filter::apply_filters(&batch, filters)?;
let data = Self::from_record_batch(&filtered_batch)?;
all_data.extend(data);
}
Ok(all_data)
}
Row group selection happens at the ParquetRecordBatchReaderBuilder level. Only selected row groups are read from disk. Arrow filtering happens on each batch after decompression.
Query filters translate to Arrow predicates:
pub fn filter_to_arrow_predicate(
batch: &RecordBatch,
filter: &QueryFilter,
) -> Result<BooleanArray> {
match filter {
QueryFilter::MinPrice(threshold_i64) => {
let threshold = *threshold_i64 as f64 / 10000.0;
let close_array = batch.column(4)
.as_any()
.downcast_ref::<Float64Array>()?;
let threshold_array = Float64Array::from(
vec![threshold; close_array.len()]
);
// SIMD-accelerated comparison
arrow::compute::gt_eq(close_array, &threshold_array)
},
QueryFilter::MinVolume(threshold) => {
let volume_array = batch.column(5)
.as_any()
.downcast_ref::<UInt64Array>()?;
let threshold_array = UInt64Array::from(
vec![*threshold; volume_array.len()]
);
arrow::compute::gt_eq(volume_array, &threshold_array)
}
}
}
Arrow’s gt_eq kernel handles SIMD automatically. No manual vectorization required.
Unfiltered queries must not regress. Explicit branching ensures they use the original fast path:
// In query executor
if query.filters.is_empty() {
// No filters: use original read path (zero overhead)
for partition in &partitions {
all_bars.extend(self.storage.read(symbol, partition)?);
}
} else {
// With filters: use predicate pushdown
for partition in &partitions {
all_bars.extend(
self.storage.read_with_filters(symbol, partition, &query.filters)?
);
}
}
Unfiltered queries bypass the new code entirely. No overhead from statistics parsing or predicate building.
Benchmark ran after implementation. Same queries, same data, new code path.
| Scenario | Before (ns) | After (ns) | Speedup | I/O Saved |
|---|---|---|---|---|
| High selectivity (90% filtered) | 2,781 | 267 | 10.4x | 90% |
| Medium selectivity (50% filtered) | 3,011 | 799 | 3.8x | 50% |
| Low selectivity (10% filtered) | 3,367 | 1,271 | 2.6x | 10% |
| No filter (regression test) | 1,341 | 1,348 | 1.0x | 0% |
| Multiple filters (AND) | 2,911 | 376 | 7.7x | 75% |
All targets exceeded. High selectivity case hit 10.4x speedup with 90% I/O reduction.
Regression test confirmed zero overhead: 1,341 ns → 1,348 ns (+0.5% noise).
For high selectivity (90% filtered), I/O reduced 90% but speedup was 10.4x. More than proportional.
The gap came from:
I/O reduction was the first-order effect. CPU and memory savings compounded it.
For high selectivity query (close >= 480):
Before:
Read: 100KB compressed → 1MB decompressed
Deserialize: 10,000 bars → Vec<OHLCV> (7.2MB)
Filter: 10,000 bars → 1,000 bars (9,000 discarded)
Time: 2,781 ns
After:
Read metadata: 10KB
Check 10 row group statistics
Skip 9 row groups (90KB compressed saved)
Read 1 row group: 10KB compressed → 100KB decompressed
Arrow filter: 1,200 bars → 1,000 bars
Deserialize: 1,000 bars → Vec<OHLCV> (720KB)
Time: 267 ns (10.4x faster)
I/O reduced from 100KB to 20KB (10KB metadata + 10KB data). But total time reduced by 10.4x due to CPU and memory savings.
The system deployed to production and replaced DuckDB entirely. Migration took 2 weeks with gradual rollout.
| Metric | DuckDB | Custom Engine | Improvement |
|---|---|---|---|
| P50 latency | 1.5ms | 0.3ms | 5x |
| P99 latency | 5ms | 0.5ms | 10x |
| Throughput | 667 qps | 2,500 qps | 3.8x |
| Memory usage | 120MB | 80MB | 33% reduction |
Latency improvements enabled new features:
Production monitoring tracked filter effectiveness:
// Log selectivity to understand query patterns
let selectivity = result.bars.len() as f64 / total_bars_scanned as f64;
metrics::histogram!("query.selectivity", selectivity);
// Track row group pruning efficiency
let pruning_rate = 1.0 - (row_groups_read as f64 / total_row_groups as f64);
metrics::histogram!("query.row_groups_pruned", pruning_rate);
Observed selectivity distribution:
No queries regressed. Unfiltered queries remained at baseline performance.
The final system emerged through 5 iterations. Not all optimizations succeeded.
Established performance baseline with comprehensive benchmarks.
Results:
Parallelized partition reads using Tokio.
Implementation:
let futures: Vec<_> = partitions
.iter()
.map(|p| storage.read_async(symbol, p))
.collect();
let results = futures::future::join_all(futures).await;
Results:
Async I/O scaled well up to 5 partitions. Beyond 5, synchronization overhead limited gains.
Added result cache for repeated queries.
Implementation:
// Check cache first
if let Some(cached) = self.cache.get(&query) {
return Ok(cached.clone());
}
// Execute and cache
let result = self.execute_uncached(query)?;
self.cache.put(query.clone(), result.clone());
Results:
Caching provided the largest single gain. Dashboard queries went from 1.5ms to <5μs.
Attempted manual SIMD for aggregations (VWAP, average, sum).
Problem: Data stored as Vec<OHLCV> (Array-of-Structs layout). SIMD requires Struct-of-Arrays.
Results:
| Operation | Scalar (ns) | SIMD (ns) | Result |
|---|---|---|---|
| VWAP | 1,340 | 2,486 | 0.54x (86% regression) |
| Average | 412 | 687 | 0.60x (67% regression) |
| Sum | 339 | 566 | 0.60x (67% regression) |
Field extraction overhead exceeded SIMD benefit. Reverted the changes.
Lesson: Data layout matters more than instruction optimization. Arrow’s columnar format enables SIMD at the library level. Manual SIMD on AoS layout fails.
Implemented two-level filtering at storage layer.
Results: Covered in previous section (10.4x speedup).
| Iteration | Optimization | Speedup | Cumulative |
|---|---|---|---|
| 1 | Baseline | 1.0x | 1.0x |
| 2 | Async I/O | 2.7x | 2.7x |
| 3 | LRU Cache | 588x (80% hit) | ~1,590x |
| 4 | SIMD | 0.5x (reverted) | N/A |
| 5 | Predicate Pushdown | 10.4x | ~16,500x |
For realistic workload (80% cache hit, selective filter):
SIMD attempt (iteration 4) targeted CPU. Predicate pushdown (iteration 5) targeted I/O.
SIMD regressed 86%. Predicate pushdown improved 10.4x.
Time-series databases are I/O-bound, not CPU-bound. Decompression and deserialization dominate runtime. Filtering in memory is fast—reading unnecessary data is slow.
Rule: Profile first. Fix the actual bottleneck.
Row group statistics exist in metadata. No extra work required. Parquet writes min/max stats by default.
Cost to read statistics: ~10KB. Cost to read data: ~1MB per row group.
ROI: 100:1 (read 10KB metadata to skip 1MB data).
Equivalence test caught 3 bugs before production:
#[test]
fn test_filter_equivalence_inmemory() {
// Filter at storage layer
let storage_result = storage.read_with_filters("SPY", "2024-01-02", &filters)?;
// Filter in memory (reference implementation)
let memory_result: Vec<_> = all_bars.iter()
.filter(|bar| filters.iter().all(|f| f.matches_ohlcv(bar)))
.cloned()
.collect();
// Must be identical
assert_eq!(storage_result, memory_result);
}
Bugs caught:
All found in tests, not production.
Unfiltered queries must not regress. Explicit branching ensured this:
if query.filters.is_empty() {
// Old path: zero overhead
} else {
// New path: predicate pushdown
}
Benchmark confirmed: 1,341 ns → 1,348 ns (+0.5% noise).
Existing workloads didn’t pay for optimization they don’t use.
DuckDB via Python adds FFI overhead. Custom Rust engine integrates natively.
Latency breakdown:
FFI elimination contributed 20% of total speedup.
DuckDB is excellent for general-purpose analytics. Custom engines make sense for specific workloads.
For the trading system, workload was specific: single-table time-series queries with simple filters. DuckDB’s SQL parser and query planner were unnecessary overhead.
Custom engine achieved 5x lower latency with full control over optimization strategy.
Replacing DuckDB with a custom Rust query engine delivered 10.4x speedup for selective queries. Predicate pushdown reduced I/O by 90% through row group pruning and Arrow compute filtering.
The optimization targeted I/O, not CPU. Attempted SIMD optimization regressed performance by 86%. Predicate pushdown succeeded by eliminating wasted reads.
Production deployment replaced DuckDB entirely. P99 latency dropped from 5ms to 0.5ms. Throughput increased 3.8x. Zero regressions on unfiltered queries.
The key architectural decision: columnar storage (Parquet) with statistics-based pruning. Statistics were free. Filtering was automatic via Arrow compute kernels. No manual SIMD required.
✅ Performance: 10.4x speedup (selective queries)
✅ Zero regression: 1.0x for unfiltered queries
✅ Production-ready: 118 tests passing, zero warnings
✅ DuckDB replaced: 100% migration complete
The system runs in production handling 2,500 queries per second with P99 latency under 1ms.