← Home

// project

Email Automation Platform

September 2025
Technologies
PythonFlaskCeleryRedisPostgreSQLSendGrid APIReactDockerAWS

A full-stack email platform with behavior-based triggers, A/B testing that runs under statistical significance checks, and ML-powered personalization. It processes 100K+ emails per hour with 98% inbox deliverability, bidirectionally synced with CRM systems.

The unusual thing about it is that statistical rigor lives at the marketing layer — the system doesn’t declare a test winner until p < 0.05.

Four engines

The platform decomposes into four loosely-coupled subsystems. Each can be operated and reasoned about independently.

1. Behavior triggers

User actions in the product publish events. The trigger engine subscribes, matches events against registered conditions, and fires the corresponding follow-up email — with cooldown logic so a single user doesn’t get hammered.

class BehaviorTriggerEngine:
    def __init__(self):
        self.triggers = {}
        self.event_queue = Queue()

    def register_trigger(self, event_type, conditions, action):
        """Register a behavior-based email trigger"""
        self.triggers[event_type] = {
            'conditions': conditions,
            'action': action,
            'cooldown': 24  # hours
        }

    def process_event(self, event):
        if event.type in self.triggers:
            trigger = self.triggers[event.type]
            if self.evaluate_conditions(event, trigger['conditions']):
                self.execute_action(trigger['action'], event.user)

    def evaluate_conditions(self, event, conditions):
        # Time-based windows, engagement-score thresholds,
        # segment membership, previous-action history
        return all(self.check_condition(event, c) for c in conditions)

2. Personalization

Email content is template-driven, with merge tags resolved at send time. ML-powered recommendation blocks plug into the template language — collaborative filtering, content-based filtering, and a hybrid for cold-start users.

class PersonalizationEngine:
    def __init__(self):
        self.user_profiles = {}
        self.content_variants = {}

    def personalize_email(self, template, user_data):
        """Dynamic content personalization"""
        personalized = template

        # Merge tags
        for field, value in user_data.items():
            personalized = personalized.replace(f'{{{field}}}', str(value))

        # Dynamic content blocks
        personalized = self.insert_dynamic_content(personalized, user_data)

        # Recommendation block
        if '{{recommendations}}' in personalized:
            recs = self.generate_recommendations(user_data)
            personalized = personalized.replace('{{recommendations}}', recs)

        return personalized

    def generate_recommendations(self, user_data):
        # Collaborative + content-based + hybrid for cold-start
        ...

3. A/B testing

This is where the statistics live. The engine assigns variants via consistent hashing — same user always gets the same variant for a given test — and runs a chi-squared contingency test on conversion outcomes. No winner is declared until p < 0.05.

class ABTestingEngine:
    def __init__(self):
        self.active_tests = {}
        self.results = {}

    def create_test(self, test_config):
        test = {
            'id': generate_test_id(),
            'variants': test_config['variants'],
            'sample_size': test_config['sample_size'],
            'success_metric': test_config['metric'],
            'statistical_significance': 0.95
        }
        self.active_tests[test['id']] = test
        return test['id']

    def assign_variant(self, user_id, test_id):
        """Consistent hashing for stable variant assignment"""
        hash_value = hashlib.md5(f"{user_id}{test_id}".encode()).hexdigest()
        variant_index = int(hash_value, 16) % len(
            self.active_tests[test_id]['variants']
        )
        return self.active_tests[test_id]['variants'][variant_index]

    def calculate_winner(self, test_id):
        """Statistical significance testing"""
        from scipy.stats import chi2_contingency

        data = self.results[test_id]
        chi2, p_value, dof, expected = chi2_contingency(data)

        if p_value < 0.05:
            return self.identify_winner(data)
        return None  # Not significant yet

The integration between marketing and statistics is the point. Without significance checks, “winning variants” are noise. With them, the system stops calling small differences victories.

4. CRM sync

The platform doesn’t replace the CRM — it amplifies it. Contacts flow in from the CRM, engagement signals flow back out. Real-time webhooks plus periodic polling cover both freshness and reliability.

class CRMIntegration:
    def __init__(self, crm_type):
        self.connector = self.setup_connector(crm_type)
        self.sync_interval = 300  # seconds

    def sync_contacts(self):
        """Bidirectional contact sync"""
        new_contacts = self.connector.get_new_contacts()

        engagement_data = self.get_engagement_metrics()
        self.connector.update_contacts(engagement_data)

        self.resolve_conflicts()

    def setup_webhooks(self):
        """Real-time event sync"""
        for event in ['contact.created', 'contact.updated',
                      'deal.closed', 'task.completed']:
            self.connector.register_webhook(event, self.handle_webhook)

Analytics

Every email event — open, click, conversion, unsubscribe — gets streamed to Redis on the fast path and batched into PostgreSQL (or ClickHouse, for the larger deployments) for long-term analytics.

class AnalyticsCollector:
    def __init__(self):
        self.events = []
        self.redis_client = redis.Redis()

    def track_event(self, event_type, data):
        event = {
            'type': event_type,
            'timestamp': datetime.now(),
            'data': data,
            'session_id': data.get('session_id'),
            'user_id': data.get('user_id')
        }

        # Fast path
        self.redis_client.xadd('email_events', event)

        # Batch path
        self.events.append(event)
        if len(self.events) >= 1000:
            self.flush_to_database()

The metrics that come out of that: open rates by segment, click-through rate per variant, conversion attribution, per-user engagement scores. Standard email-analytics surface, with the actual lift measured under significance tests instead of headline numbers.

Sending at 100K/hour

Celery distributes the send queue across workers. A per-provider rate limiter ensures the platform doesn’t trip SendGrid, Mailchimp, or whichever ESP is in use; emails that hit the limit get re-queued for retry rather than dropped.

class EmailDispatcher:
    def __init__(self):
        self.celery_app = Celery('email_dispatcher')
        self.rate_limiter = RateLimiter()

    @celery_app.task(rate_limit='1000/m')
    def send_email_batch(self, batch):
        for email in batch:
            if self.rate_limiter.allow(email['provider']):
                send_email.delay(email)
            else:
                schedule_retry(email)

Connection pooling for SMTP, template caching, CDN for embedded assets. The optimizations are unglamorous and load-bearing.

Stack

  • Backend: Python, FastAPI/Django, Celery for async work
  • Data: PostgreSQL for transactional; ClickHouse for analytics at scale; Redis for caching and rate limiting
  • Message bus: RabbitMQ or Kafka, depending on the deployment
  • Email infrastructure: MJML for responsive templates, Litmus for cross-client testing, SpamAssassin checks pre-send
  • Monitoring: Prometheus + Grafana, Sentry for errors

Numbers

  • 100K+ emails/hour throughput
  • 98% inbox deliverability
  • <500ms trigger-to-send latency
  • 99.9% uptime
  • 40% open-rate improvement, 25% click-rate improvement against baseline campaigns
  • 3× faster campaign deployment
  • ~50% reduction in manual work on the marketing side

Compliance

Email regulation is a compliance burden, and getting it wrong has financial and reputational costs. The system bakes in: GDPR consent and right-to-erasure flows, CAN-SPAM unsubscribe handling, automated bounce processing, and reputation monitoring across the sending domain.

These aren’t optional features. They’re the difference between an email platform that scales and an email platform that gets blocklisted.


Specific client details and proprietary algorithms have been omitted.