DynamoDB Streams — Fase 4
Processor Lambda
763 LOC — triggered by DynamoDB Streams on every new event. Handles billing, GA4, webhooks, and advanced fraud detection.
Processing Pipeline
DynamoDB Stream (INSERT event)
│
├── 1. dynamodb_to_python() → Convert DDB format to dict
│
├── 2. increment_events_usage() → Billing counter (Supabase RPC)
│ ├── Primary: RPC atomic increment
│ └── Fallback: GET + PATCH (optimistic)
│
├── 3. get_org_config() → Fetch org integrations
│ └── GA4 credentials + webhook URL/secret
│
├── 4. send_to_ga4() → GA4 Measurement Protocol
│ └── client_id: ga4_client_id || fingerprint
│
├── 5. send_to_webhook() → Custom webhook (HMAC-SHA256)
│ └── X-Impressio-Signature header
│
├── 6. detect_impossible_travel()
│ ├── Query previous event (DynamoDB)
│ ├── Haversine distance calculation
│ └── Speed > 900 km/h → alert
│
├── 7. detect_account_sharing()
│ ├── Query GSI_USER (last 24h events)
│ ├── Concurrent sessions (<15 min, diff FP)
│ ├── Geo diversity (3+ countries)
│ └── Device diversity (4+ fingerprints)
│
└── 8. send_push_alert() → Dashboard notification
└── Triggered if risk ≥ 60 OR impossible travel
OR account sharing score ≥ 606 Processing Stages
Billing Counter
Atomic increment via Supabase RPC. Fallback: GET current + PATCH. 2s timeout.
POST /rpc/increment_events_this_month
{ "org_uuid": "org_abc..." }
// 200/204 = successGA4 Measurement Protocol
Server-side event tracking. Uses ga4_client_id from _ga cookie for session stitching.
POST /mp/collect?measurement_id=G-XXX
{
"client_id": "ga4_client_id || fp",
"events": [{
"name": "fingerprint_event",
"params": { fingerprint, site, risk_score }
}]
}Custom Webhooks
HMAC-SHA256 signed payloads. 10s timeout. Supports 200/201/202/204.
POST {webhook_url}
Headers:
X-Impressio-Signature: hmac_sha256(body, secret)
X-Impressio-Signature-Version: v1
User-Agent: Impressio-Webhook/1.0Impossible Travel
Haversine formula between consecutive events. Alert if speed > 900 km/h.
haversine_km(lat1, lon1, lat2, lon2) # R = 6371 km (Earth radius) # Skip if distance < 50 km speed = distance_km / delta_hours # > 900 km/h → risk 50-90 # Risk scales: 50 + (speed-900)/200 * 10
Account Sharing
Detects shared accounts via concurrent sessions, geo/device diversity in 24h window.
Scoring: +40 concurrent_session (diff FP, <15 min) +20/country geo_diversity (3+ countries) +10/fp device_diversity (4+ fingerprints) Cap: 100
Push Notifications
Dashboard alert triggered on high-risk events.
Trigger conditions:
risk_score >= 60
OR impossible_travel detected
OR account_sharing.score >= 60
→ POST {PUSH_API_URL}Environment Variables
| Variable | Purpose | Required |
|---|---|---|
SUPABASE_URL | Supabase project URL | ✅ |
SUPABASE_SERVICE_KEY | Service role key (server-side only) | ✅ |
DYNAMODB_TABLE | DynamoDB table name (default: fingerprint-events) | ❌ |
PUSH_API_URL | Dashboard push notification endpoint | ❌ |
PUSH_API_KEY | Shared secret for push auth | ❌ |
DynamoDB Format Converter
DynamoDB Streams delivers items in their native wire format. The dynamodb_to_python() converter handles recursive type mapping:
# DynamoDB Stream format → Python dict
{'fingerprint': {'S': 'abc123'}} → {'fingerprint': 'abc123'}
{'risk_score': {'N': '75'}} → {'risk_score': 75}
{'is_bot': {'BOOL': True}} → {'is_bot': True}
{'context': {'M': {...}}} → {'context': {...}} # recursive
{'signals': {'L': [...]}} → {'signals': [...]} # recursive