DynamoDB Streams — Fase 4

Processor Lambda

763 LOC — triggered by DynamoDB Streams on every new event. Handles billing, GA4, webhooks, and advanced fraud detection.

Processing Pipeline

DynamoDB Stream (INSERT event)
  │
  ├── 1. dynamodb_to_python()     → Convert DDB format to dict
  │
  ├── 2. increment_events_usage() → Billing counter (Supabase RPC)
  │     ├── Primary: RPC atomic increment
  │     └── Fallback: GET + PATCH (optimistic)
  │
  ├── 3. get_org_config()         → Fetch org integrations
  │     └── GA4 credentials + webhook URL/secret
  │
  ├── 4. send_to_ga4()            → GA4 Measurement Protocol
  │     └── client_id: ga4_client_id || fingerprint
  │
  ├── 5. send_to_webhook()        → Custom webhook (HMAC-SHA256)
  │     └── X-Impressio-Signature header
  │
  ├── 6. detect_impossible_travel()
  │     ├── Query previous event (DynamoDB)
  │     ├── Haversine distance calculation
  │     └── Speed > 900 km/h → alert
  │
  ├── 7. detect_account_sharing()
  │     ├── Query GSI_USER (last 24h events)
  │     ├── Concurrent sessions (<15 min, diff FP)
  │     ├── Geo diversity (3+ countries)
  │     └── Device diversity (4+ fingerprints)
  │
  └── 8. send_push_alert()        → Dashboard notification
        └── Triggered if risk ≥ 60 OR impossible travel
            OR account sharing score ≥ 60

6 Processing Stages

Billing Counter

Atomic increment via Supabase RPC. Fallback: GET current + PATCH. 2s timeout.

POST /rpc/increment_events_this_month
{ "org_uuid": "org_abc..." }
// 200/204 = success

GA4 Measurement Protocol

Server-side event tracking. Uses ga4_client_id from _ga cookie for session stitching.

POST /mp/collect?measurement_id=G-XXX
{
  "client_id": "ga4_client_id || fp",
  "events": [{
    "name": "fingerprint_event",
    "params": { fingerprint, site, risk_score }
  }]
}

Custom Webhooks

HMAC-SHA256 signed payloads. 10s timeout. Supports 200/201/202/204.

POST {webhook_url}
Headers:
  X-Impressio-Signature: hmac_sha256(body, secret)
  X-Impressio-Signature-Version: v1
  User-Agent: Impressio-Webhook/1.0

Impossible Travel

Haversine formula between consecutive events. Alert if speed > 900 km/h.

haversine_km(lat1, lon1, lat2, lon2)
# R = 6371 km (Earth radius)
# Skip if distance < 50 km

speed = distance_km / delta_hours
# > 900 km/h → risk 50-90
# Risk scales: 50 + (speed-900)/200 * 10

Account Sharing

Detects shared accounts via concurrent sessions, geo/device diversity in 24h window.

Scoring:
+40  concurrent_session (diff FP, <15 min)
+20/country  geo_diversity (3+ countries)
+10/fp  device_diversity (4+ fingerprints)
Cap: 100

Push Notifications

Dashboard alert triggered on high-risk events.

Trigger conditions:
risk_score >= 60
OR impossible_travel detected
OR account_sharing.score >= 60

→ POST {PUSH_API_URL}

Environment Variables

VariablePurposeRequired
SUPABASE_URLSupabase project URL
SUPABASE_SERVICE_KEYService role key (server-side only)
DYNAMODB_TABLEDynamoDB table name (default: fingerprint-events)
PUSH_API_URLDashboard push notification endpoint
PUSH_API_KEYShared secret for push auth

DynamoDB Format Converter

DynamoDB Streams delivers items in their native wire format. The dynamodb_to_python() converter handles recursive type mapping:

# DynamoDB Stream format        →  Python dict
{'fingerprint': {'S': 'abc123'}} →  {'fingerprint': 'abc123'}
{'risk_score':  {'N': '75'}}     →  {'risk_score': 75}
{'is_bot':      {'BOOL': True}}  →  {'is_bot': True}
{'context':     {'M': {...}}}    →  {'context': {...}}  # recursive
{'signals':     {'L': [...]}}    →  {'signals': [...]}  # recursive