// project
A full-stack email platform with behavior-based triggers, A/B testing that runs under statistical significance checks, and ML-powered personalization. It processes 100K+ emails per hour with 98% inbox deliverability, bidirectionally synced with CRM systems.
The unusual thing about it is that statistical rigor lives at the marketing layer — the system doesn’t declare a test winner until p < 0.05.
The platform decomposes into four loosely-coupled subsystems. Each can be operated and reasoned about independently.
User actions in the product publish events. The trigger engine subscribes, matches events against registered conditions, and fires the corresponding follow-up email — with cooldown logic so a single user doesn’t get hammered.
class BehaviorTriggerEngine:
def __init__(self):
self.triggers = {}
self.event_queue = Queue()
def register_trigger(self, event_type, conditions, action):
"""Register a behavior-based email trigger"""
self.triggers[event_type] = {
'conditions': conditions,
'action': action,
'cooldown': 24 # hours
}
def process_event(self, event):
if event.type in self.triggers:
trigger = self.triggers[event.type]
if self.evaluate_conditions(event, trigger['conditions']):
self.execute_action(trigger['action'], event.user)
def evaluate_conditions(self, event, conditions):
# Time-based windows, engagement-score thresholds,
# segment membership, previous-action history
return all(self.check_condition(event, c) for c in conditions)
Email content is template-driven, with merge tags resolved at send time. ML-powered recommendation blocks plug into the template language — collaborative filtering, content-based filtering, and a hybrid for cold-start users.
class PersonalizationEngine:
def __init__(self):
self.user_profiles = {}
self.content_variants = {}
def personalize_email(self, template, user_data):
"""Dynamic content personalization"""
personalized = template
# Merge tags
for field, value in user_data.items():
personalized = personalized.replace(f'{{{field}}}', str(value))
# Dynamic content blocks
personalized = self.insert_dynamic_content(personalized, user_data)
# Recommendation block
if '{{recommendations}}' in personalized:
recs = self.generate_recommendations(user_data)
personalized = personalized.replace('{{recommendations}}', recs)
return personalized
def generate_recommendations(self, user_data):
# Collaborative + content-based + hybrid for cold-start
...
This is where the statistics live. The engine assigns variants via consistent hashing — same user always gets the same variant for a given test — and runs a chi-squared contingency test on conversion outcomes. No winner is declared until p < 0.05.
class ABTestingEngine:
def __init__(self):
self.active_tests = {}
self.results = {}
def create_test(self, test_config):
test = {
'id': generate_test_id(),
'variants': test_config['variants'],
'sample_size': test_config['sample_size'],
'success_metric': test_config['metric'],
'statistical_significance': 0.95
}
self.active_tests[test['id']] = test
return test['id']
def assign_variant(self, user_id, test_id):
"""Consistent hashing for stable variant assignment"""
hash_value = hashlib.md5(f"{user_id}{test_id}".encode()).hexdigest()
variant_index = int(hash_value, 16) % len(
self.active_tests[test_id]['variants']
)
return self.active_tests[test_id]['variants'][variant_index]
def calculate_winner(self, test_id):
"""Statistical significance testing"""
from scipy.stats import chi2_contingency
data = self.results[test_id]
chi2, p_value, dof, expected = chi2_contingency(data)
if p_value < 0.05:
return self.identify_winner(data)
return None # Not significant yet
The integration between marketing and statistics is the point. Without significance checks, “winning variants” are noise. With them, the system stops calling small differences victories.
The platform doesn’t replace the CRM — it amplifies it. Contacts flow in from the CRM, engagement signals flow back out. Real-time webhooks plus periodic polling cover both freshness and reliability.
class CRMIntegration:
def __init__(self, crm_type):
self.connector = self.setup_connector(crm_type)
self.sync_interval = 300 # seconds
def sync_contacts(self):
"""Bidirectional contact sync"""
new_contacts = self.connector.get_new_contacts()
engagement_data = self.get_engagement_metrics()
self.connector.update_contacts(engagement_data)
self.resolve_conflicts()
def setup_webhooks(self):
"""Real-time event sync"""
for event in ['contact.created', 'contact.updated',
'deal.closed', 'task.completed']:
self.connector.register_webhook(event, self.handle_webhook)
Every email event — open, click, conversion, unsubscribe — gets streamed to Redis on the fast path and batched into PostgreSQL (or ClickHouse, for the larger deployments) for long-term analytics.
class AnalyticsCollector:
def __init__(self):
self.events = []
self.redis_client = redis.Redis()
def track_event(self, event_type, data):
event = {
'type': event_type,
'timestamp': datetime.now(),
'data': data,
'session_id': data.get('session_id'),
'user_id': data.get('user_id')
}
# Fast path
self.redis_client.xadd('email_events', event)
# Batch path
self.events.append(event)
if len(self.events) >= 1000:
self.flush_to_database()
The metrics that come out of that: open rates by segment, click-through rate per variant, conversion attribution, per-user engagement scores. Standard email-analytics surface, with the actual lift measured under significance tests instead of headline numbers.
Celery distributes the send queue across workers. A per-provider rate limiter ensures the platform doesn’t trip SendGrid, Mailchimp, or whichever ESP is in use; emails that hit the limit get re-queued for retry rather than dropped.
class EmailDispatcher:
def __init__(self):
self.celery_app = Celery('email_dispatcher')
self.rate_limiter = RateLimiter()
@celery_app.task(rate_limit='1000/m')
def send_email_batch(self, batch):
for email in batch:
if self.rate_limiter.allow(email['provider']):
send_email.delay(email)
else:
schedule_retry(email)
Connection pooling for SMTP, template caching, CDN for embedded assets. The optimizations are unglamorous and load-bearing.
Email regulation is a compliance burden, and getting it wrong has financial and reputational costs. The system bakes in: GDPR consent and right-to-erasure flows, CAN-SPAM unsubscribe handling, automated bounce processing, and reputation monitoring across the sending domain.
These aren’t optional features. They’re the difference between an email platform that scales and an email platform that gets blocklisted.
Specific client details and proprietary algorithms have been omitted.