REAL-TIME LOGGING POC
Dual Database: Durable Objects + Analytics Engine
LRU CACHE + EFFECT 2025.01.15
DUAL DATABASE POC
Problem: Analytics Engine has 30-60s write delay. Need instant feedback + reliable history.
Solution: Dual database approach - Durable Object for immediate storage (no schema), Analytics Engine for long-term queryable data. LRU cache prevents explosion.
User Action → LRU Cache (instant) ↓ Durable Object (immediate, no schema) ↓ WebSocket (broadcast) ↓ Analytics Engine (long-term, queryable)
HOW IT WORKS
- Page Load: Fetch events from server (seamless DO + AE stitching)
- User Action: Write to LRU cache immediately (optimistic update)
- Server: Store in Durable Object (immediate) + Analytics Engine (buffered)
- Broadcast: Stream live to connected clients via WebSocket
- Purge: Effect periodically cleans LRU cache, data flows to AE
- Seamless: Client gets unified view of recent (DO) + historical (AE) data
BENEFITS
- No schema complexity: Durable Object stores raw JSON, no migrations
- Self-cleaning: LRU cache prevents memory explosion
- Dual persistence: Immediate (DO) + long-term (AE) storage
- Seamless stitching: Client gets unified view of recent + historical data
- Effect-driven purging: Automated cleanup with proper error handling
- Real-time broadcasting: Live updates to all connected clients
INFRASTRUCTURE
alchemy.run.ts
// alchemy.run.ts - Dual Database Setup import alchemy from "alchemy"; import { Worker, DurableObjectNamespace, AnalyticsEngineDataset } from "alchemy/cloudflare"; const app = await alchemy("real-time-logging"); // Analytics Engine for long-term event storage const analytics = await AnalyticsEngineDataset("events", { title: "Event Analytics" }); // Durable Object for simple JSON logging (no schema) const loggerNamespace = DurableObjectNamespace("logger", { className: "Logger" }); // Durable Object for WebSocket coordination const roomNamespace = DurableObjectNamespace("room", { className: "Room" }); // Main worker with dual database bindings const worker = await Worker("api", { entrypoint: "./src/worker.ts", bindings: { ANALYTICS: analytics, // Long-term storage LOGGER: loggerNamespace, // Fast, immediate storage ROOM: roomNamespace // Real-time broadcasting } }); console.log({ url: worker.url }); await app.finalize();
WORKER WITH RECENT EVENTS ENDPOINT
Enhanced Worker
// src/worker.ts - Hono-based Dual Database Worker import { Hono } from 'hono'; import type { worker } from "../alchemy.run"; const app = new Hono<{ Bindings: typeof worker.Env }>(); // POST /api/events - Create new event app.post('/api/events', async (c) => { const { type, orgId, payload } = await c.req.json(); if (!type || !orgId) { return c.json({ error: 'type and orgId required' }, 400); } const timestamp = Date.now(); const eventId = `${timestamp}-${Math.random().toString(36).substr(2, 9)}`; // 1) Store in Durable Object (immediate, no schema) const doId = c.env.LOGGER.idFromName(`org:${orgId}`); const loggerStub = c.env.LOGGER.get(doId); await loggerStub.fetch('https://do/log', { method: 'POST', body: JSON.stringify({ id: eventId, type, orgId, payload, timestamp }) }); // 2) Write to Analytics Engine (buffered, for long-term storage) await c.env.ANALYTICS.writeDataPoint({ indexes: [String(orgId), String(type)], blobs: [JSON.stringify(payload ?? {})], doubles: [timestamp] }); // 3) Broadcast to connected clients const roomId = c.env.ROOM.idFromName(`org:${orgId}`); const roomStub = c.env.ROOM.get(roomId); await roomStub.fetch('https://do/publish', { method: 'POST', body: JSON.stringify({ id: eventId, type, orgId, payload, timestamp }) }); return c.json({ id: eventId, timestamp }); }); // GET /api/events - Get events (seamless DO + AE stitching) app.get('/api/events', async (c) => { const orgId = c.req.query('orgId'); const limit = parseInt(c.req.query('limit') || '100'); if (!orgId) { return c.json({ error: 'orgId required' }, 400); } // Get recent events from Durable Object (fast, immediate) const doId = c.env.LOGGER.idFromName(`org:${orgId}`); const loggerStub = c.env.LOGGER.get(doId); const doResponse = await loggerStub.fetch(`https://do/events?limit=${limit}`); const doEvents = await doResponse.json(); // Get older events from Analytics Engine (slower, but comprehensive) const aeResponse = await loggerStub.fetch(`https://do/events/analytics?limit=${limit}&orgId=${orgId}`); const aeEvents = await aeResponse.json(); // Stitch together: DO events (recent) + AE events (older, not in DO) const doEventIds = new Set(doEvents.map((e: any) => e.id)); const uniqueAEEvents = aeEvents.filter((e: any) => !doEventIds.has(e.id)); const allEvents = [...doEvents, ...uniqueAEEvents] .sort((a: any, b: any) => b.timestamp - a.timestamp) .slice(0, limit); return c.json({ events: allEvents }); }); // WebSocket connection app.get('/ws/:orgId', async (c) => { const orgId = c.req.param('orgId'); const id = c.env.ROOM.idFromName(`org:${orgId}`); const stub = c.env.ROOM.get(id); return await stub.fetch('https://do/connect'); }); export default app;
DURABLE OBJECT ROOM
WebSocket Coordination
// src/room.ts - Hono-based WebSocket Broadcasting import { Hono } from 'hono'; import { DurableObject } from "cloudflare:workers"; export class Room extends DurableObject { private sockets: Set<WebSocket> = new Set(); async fetch(req: Request) { const app = new Hono(); // WebSocket connection endpoint app.get('/connect', (c) => { const pair = new WebSocketPair(); const server = pair[1]; server.accept(); server.addEventListener('close', () => this.sockets.delete(server)); server.addEventListener('error', () => this.sockets.delete(server)); this.sockets.add(server); return new Response(null, { status: 101, webSocket: pair[0] }); }); // Publish event to all connected clients app.post('/publish', async (c) => { const event = await c.req.json(); const msg = JSON.stringify(event); // Broadcast to all connected clients for (const ws of this.sockets) { try { ws.send(msg); } catch (e) { // Remove dead connections this.sockets.delete(ws); } } return c.text('ok'); }); // Get connection stats app.get('/stats', (c) => { return c.json({ connectedClients: this.sockets.size }); }); return app.fetch(req); } }
TYPESCRIPT LRU CACHE WITH EFFECT
TypeScript LRU Cache
// lru-cache.ts - TypeScript LRU Cache with Effect import { Effect, pipe } from "effect"; export interface Event { id: string; type: string; orgId: string; payload: any; timestamp: number; source?: 'local' | 'confirmed' | 'failed' | 'realtime' | 'server'; } export class LRUCache { private cache = new Map<string, Event>(); private maxSize: number; constructor(maxSize = 1000) { this.maxSize = maxSize; } get(key: string): Event | null { if (this.cache.has(key)) { // Move to end (most recently used) const value = this.cache.get(key)!; this.cache.delete(key); this.cache.set(key, value); return value; } return null; } set(key: string, value: Event): void { if (this.cache.has(key)) { this.cache.delete(key); } else if (this.cache.size >= this.maxSize) { // Remove least recently used (first item) const firstKey = this.cache.keys().next().value; if (firstKey) { this.cache.delete(firstKey); } } this.cache.set(key, value); } has(key: string): boolean { return this.cache.has(key); } delete(key: string): boolean { return this.cache.delete(key); } clear(): void { this.cache.clear(); } size(): number { return this.cache.size; } // Get all values as array values(): Event[] { return Array.from(this.cache.values()); } // Purge old entries based on timestamp (write-based only) purge(olderThan: number): number { const toDelete: string[] = []; for (const [key, value] of this.cache) { if (value.timestamp < olderThan) { toDelete.push(key); } } toDelete.forEach(key => this.cache.delete(key)); return toDelete.length; } // Effect-based purging with error handling purgeWithEffect(olderThan: number): Effect.Effect<number, Error> { return pipe( Effect.sync(() => this.purge(olderThan)), Effect.mapError(error => new Error(`Purge failed: ${error}`)) ); } // Get cache statistics getStats(): { size: number; maxSize: number; utilization: number } { return { size: this.cache.size, maxSize: this.maxSize, utilization: (this.cache.size / this.maxSize) * 100 }; } }
CLIENT: SVELTE 5 + TYPESCRIPT
EventLogger Class
// client-svelte.ts - Svelte 5 with LRU Cache + Fetch API import { LRUCache, type Event } from './lru-cache'; export class EventLogger { private cache: LRUCache; private ws: WebSocket | null = null; private orgId: string; private lastSync = 0; constructor(orgId: string) { this.orgId = orgId; this.cache = new LRUCache(1000); // Max 1000 events this.loadInitialData(); this.connectWebSocket(); } // Load initial data from server private async loadInitialData(): Promise<void> { try { const resp = await fetch(`/api/events?orgId=${this.orgId}`); const { events } = await resp.json(); // Populate cache with server events events.forEach((event: Event) => { this.cache.set(event.id, { ...event, source: 'server' }); }); this.lastSync = Date.now(); } catch (err) { console.error('Failed to load initial data:', err); } } // Connect WebSocket for real-time events private connectWebSocket(): void { const wsUrl = `${location.origin.replace(/^http/, 'ws')}/ws/${this.orgId}`; this.ws = new WebSocket(wsUrl); this.ws.onmessage = (e) => { const event: Event = JSON.parse(e.data); event.source = 'realtime'; // Add to cache this.cache.set(event.id, event); }; this.ws.onclose = () => { setTimeout(() => this.connectWebSocket(), 1000); }; } // Log a new event async logEvent(type: string, payload: any): Promise<void> { const eventId = `${Date.now()}-${Math.random()}`; const event: Event = { id: eventId, type, orgId: this.orgId, payload, timestamp: Date.now(), source: 'local' }; // Optimistic update - add to cache immediately this.cache.set(eventId, event); // Send to server try { const resp = await fetch('/api/events', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ type, orgId: this.orgId, payload }) }); const { id, timestamp } = await resp.json(); // Update cache with server response const cachedEvent = this.cache.get(eventId); if (cachedEvent) { cachedEvent.id = id; cachedEvent.timestamp = timestamp; cachedEvent.source = 'confirmed'; this.cache.set(id, cachedEvent); this.cache.delete(eventId); // Remove old key } } catch (err) { // Mark as failed const cachedEvent = this.cache.get(eventId); if (cachedEvent) { cachedEvent.source = 'failed'; } } } // Get events for dashboard getEvents(filter: { type?: string; since?: number } = {}): Event[] { return this.cache.values().filter(event => { if (filter.type && event.type !== filter.type) return false; if (filter.since && event.timestamp < filter.since) return false; return true; }); } // Get cache stats getStats() { return { ...this.cache.getStats(), lastSync: this.lastSync }; } // Get all events sorted by timestamp getAllEvents(): Event[] { return this.cache.values() .sort((a, b) => b.timestamp - a.timestamp); } }
SVELTE 5 COMPONENT
EventLogger Component
<!-- EventLogger.svelte - Svelte 5 Component --> <script lang="ts"> import { EventLogger } from './client-svelte'; import type { Event } from './lru-cache'; interface Props { orgId: string; } let { orgId }: Props = $props(); let logger: EventLogger; let events: Event[] = $state([]); let connected = $state(false); let cacheStats = $state({ size: 0, maxSize: 1000, utilization: 0 }); // Initialize logger $effect(() => { logger = new EventLogger(orgId); // Update events when cache changes const updateEvents = () => { events = logger.getAllEvents(); cacheStats = logger.getStats(); }; // Poll for updates (in real app, use reactive patterns) const interval = setInterval(updateEvents, 1000); return () => clearInterval(interval); }); // WebSocket connection status $effect(() => { if (logger) { // Monitor WebSocket connection const checkConnection = () => { connected = logger['ws']?.readyState === WebSocket.OPEN; }; const interval = setInterval(checkConnection, 1000); return () => clearInterval(interval); } }); async function logEvent(type: string, payload: any) { await logger.logEvent(type, payload); } function getEventsByType(type: string) { return logger.getEvents({ type }); } </script> <div class="event-logger"> <div class="status-bar"> <span class="status {connected ? 'connected' : 'disconnected'}"> {connected ? '🟢 Live' : '🔴 Offline'} </span> <span class="cache-info"> Cache: {cacheStats.size}/{cacheStats.maxSize} ({cacheStats.utilization.toFixed(1)}%) </span> </div> <div class="controls"> <button onclick={() => logEvent('test.click', { time: Date.now() })}> Test Event </button> <button onclick={() => logEvent('user.action', { action: 'button_click' })}> User Action </button> </div> <div class="events"> {#each events.slice(0, 50) as event (event.id)} <div class="event event-{event.source}"> <span class="type">{event.type}</span> <span class="time">{new Date(event.timestamp).toLocaleTimeString()}</span> <span class="source">{event.source}</span> <div class="payload">{JSON.stringify(event.payload, null, 2)}</div> </div> {/each} </div> </div>
QUERYING HISTORICAL DATA
SQL Queries
-- Recent events for organization SELECT to_timestamp(doubles[1]/1000) AS timestamp, indexes[1] AS org_id, indexes[2] AS event_type, blobs[1] AS payload FROM events WHERE indexes[1] = 'acme' AND timestamp >= now() - interval '24 hours' ORDER BY timestamp DESC LIMIT 1000; -- Event type distribution SELECT indexes[2] AS event_type, count(*) AS frequency FROM events WHERE indexes[1] = 'acme' AND timestamp >= now() - interval '7 days' GROUP BY indexes[2] ORDER BY frequency DESC;
cURL Example
# Query via Cloudflare API curl -X POST \ -H "Authorization: Bearer $CF_API_TOKEN" \ -H "Content-Type: application/json" \ "https://api.cloudflare.com/client/v4/accounts/$CF_ACCOUNT_ID/analytics_engine/sql" \ -d '{ "sql": "SELECT count(*) FROM events WHERE indexes[1]='acme' AND now()-to_timestamp(doubles[1]/1000) < interval '1 day';" }'
EVENT FLOW STATES
- Local: Just written to localStorage, pending server confirmation
- Confirmed: Server acknowledged, will appear in Analytics Engine soon
- Failed: Server rejected, needs retry or user attention
- Realtime: Received via WebSocket from another client
- Server: Historical data fetched from Analytics Engine
OPERATIONAL CONSIDERATIONS
- LRU limits: Cache automatically evicts old entries, no manual cleanup needed
- Write-based purging: Automatic cleanup when writing new events
- Dual storage: Recent data in DO, historical in AE - seamless stitching
- No schema: Durable Object stores raw JSON, no migrations or schema changes
- WebSocket resilience: Auto-reconnects, broadcasts to all connected clients
- Error handling: Effect provides proper error handling for purge operations
- Performance: LRU cache prevents memory explosion, DO provides fast access
USE CASES
- Product analytics dashboards with instant feedback
- Live chat with message history
- Collaborative editing with activity streams
- E-commerce tracking with immediate confirmation
- Gaming leaderboards with real-time updates
- DevOps monitoring with live + historical views