REAL-TIME LOGGING POC

Dual Database: Durable Objects + Analytics Engine

LRU CACHE + EFFECT 2025.01.15

DUAL DATABASE POC

Problem: Analytics Engine has 30-60s write delay. Need instant feedback + reliable history.

Solution: Dual database approach - Durable Object for immediate storage (no schema), Analytics Engine for long-term queryable data. LRU cache prevents explosion.

User Action → LRU Cache (instant)
              ↓
           Durable Object (immediate, no schema)
              ↓
           WebSocket (broadcast)
              ↓
        Analytics Engine (long-term, queryable)

HOW IT WORKS

  1. Page Load: Fetch events from server (seamless DO + AE stitching)
  2. User Action: Write to LRU cache immediately (optimistic update)
  3. Server: Store in Durable Object (immediate) + Analytics Engine (buffered)
  4. Broadcast: Stream live to connected clients via WebSocket
  5. Purge: Effect periodically cleans LRU cache, data flows to AE
  6. Seamless: Client gets unified view of recent (DO) + historical (AE) data

BENEFITS

  • No schema complexity: Durable Object stores raw JSON, no migrations
  • Self-cleaning: LRU cache prevents memory explosion
  • Dual persistence: Immediate (DO) + long-term (AE) storage
  • Seamless stitching: Client gets unified view of recent + historical data
  • Effect-driven purging: Automated cleanup with proper error handling
  • Real-time broadcasting: Live updates to all connected clients

INFRASTRUCTURE

alchemy.run.ts

// alchemy.run.ts - Dual Database Setup
import alchemy from "alchemy";
import { Worker, DurableObjectNamespace, AnalyticsEngineDataset } from "alchemy/cloudflare";

const app = await alchemy("real-time-logging");

// Analytics Engine for long-term event storage
const analytics = await AnalyticsEngineDataset("events", {
  title: "Event Analytics"
});

// Durable Object for simple JSON logging (no schema)
const loggerNamespace = DurableObjectNamespace("logger", {
  className: "Logger"
});

// Durable Object for WebSocket coordination
const roomNamespace = DurableObjectNamespace("room", {
  className: "Room"
});

// Main worker with dual database bindings
const worker = await Worker("api", {
  entrypoint: "./src/worker.ts",
  bindings: {
    ANALYTICS: analytics,      // Long-term storage
    LOGGER: loggerNamespace,   // Fast, immediate storage
    ROOM: roomNamespace        // Real-time broadcasting
  }
});

console.log({ url: worker.url });
await app.finalize();

WORKER WITH RECENT EVENTS ENDPOINT

Enhanced Worker

// src/worker.ts - Hono-based Dual Database Worker
import { Hono } from 'hono';
import type { worker } from "../alchemy.run";

const app = new Hono<{ Bindings: typeof worker.Env }>();

// POST /api/events - Create new event
app.post('/api/events', async (c) => {
  const { type, orgId, payload } = await c.req.json();
  
  if (!type || !orgId) {
    return c.json({ error: 'type and orgId required' }, 400);
  }

  const timestamp = Date.now();
  const eventId = `${timestamp}-${Math.random().toString(36).substr(2, 9)}`;

  // 1) Store in Durable Object (immediate, no schema)
  const doId = c.env.LOGGER.idFromName(`org:${orgId}`);
  const loggerStub = c.env.LOGGER.get(doId);
  await loggerStub.fetch('https://do/log', {
    method: 'POST',
    body: JSON.stringify({ 
      id: eventId, 
      type, 
      orgId, 
      payload, 
      timestamp 
    })
  });

  // 2) Write to Analytics Engine (buffered, for long-term storage)
  await c.env.ANALYTICS.writeDataPoint({
    indexes: [String(orgId), String(type)],
    blobs: [JSON.stringify(payload ?? {})],
    doubles: [timestamp]
  });

  // 3) Broadcast to connected clients
  const roomId = c.env.ROOM.idFromName(`org:${orgId}`);
  const roomStub = c.env.ROOM.get(roomId);
  await roomStub.fetch('https://do/publish', {
    method: 'POST',
    body: JSON.stringify({ 
      id: eventId,
      type, 
      orgId, 
      payload, 
      timestamp 
    })
  });

  return c.json({ id: eventId, timestamp });
});

// GET /api/events - Get events (seamless DO + AE stitching)
app.get('/api/events', async (c) => {
  const orgId = c.req.query('orgId');
  const limit = parseInt(c.req.query('limit') || '100');

  if (!orgId) {
    return c.json({ error: 'orgId required' }, 400);
  }

  // Get recent events from Durable Object (fast, immediate)
  const doId = c.env.LOGGER.idFromName(`org:${orgId}`);
  const loggerStub = c.env.LOGGER.get(doId);
  const doResponse = await loggerStub.fetch(`https://do/events?limit=${limit}`);
  const doEvents = await doResponse.json();

  // Get older events from Analytics Engine (slower, but comprehensive)
  const aeResponse = await loggerStub.fetch(`https://do/events/analytics?limit=${limit}&orgId=${orgId}`);
  const aeEvents = await aeResponse.json();

  // Stitch together: DO events (recent) + AE events (older, not in DO)
  const doEventIds = new Set(doEvents.map((e: any) => e.id));
  const uniqueAEEvents = aeEvents.filter((e: any) => !doEventIds.has(e.id));
  
  const allEvents = [...doEvents, ...uniqueAEEvents]
    .sort((a: any, b: any) => b.timestamp - a.timestamp)
    .slice(0, limit);

  return c.json({ events: allEvents });
});

// WebSocket connection
app.get('/ws/:orgId', async (c) => {
  const orgId = c.req.param('orgId');
  const id = c.env.ROOM.idFromName(`org:${orgId}`);
  const stub = c.env.ROOM.get(id);
  return await stub.fetch('https://do/connect');
});

export default app;

DURABLE OBJECT ROOM

WebSocket Coordination

// src/room.ts - Hono-based WebSocket Broadcasting
import { Hono } from 'hono';
import { DurableObject } from "cloudflare:workers";

export class Room extends DurableObject {
  private sockets: Set<WebSocket> = new Set();

  async fetch(req: Request) {
    const app = new Hono();

    // WebSocket connection endpoint
    app.get('/connect', (c) => {
      const pair = new WebSocketPair();
      const server = pair[1];
      server.accept();

      server.addEventListener('close', () =>
        this.sockets.delete(server));
      server.addEventListener('error', () =>
        this.sockets.delete(server));

      this.sockets.add(server);
      return new Response(null, {
        status: 101,
        webSocket: pair[0]
      });
    });

    // Publish event to all connected clients
    app.post('/publish', async (c) => {
      const event = await c.req.json();
      const msg = JSON.stringify(event);

      // Broadcast to all connected clients
      for (const ws of this.sockets) {
        try { 
          ws.send(msg); 
        } catch (e) {
          // Remove dead connections
          this.sockets.delete(ws);
        }
      }

      return c.text('ok');
    });

    // Get connection stats
    app.get('/stats', (c) => {
      return c.json({
        connectedClients: this.sockets.size
      });
    });

    return app.fetch(req);
  }
}

TYPESCRIPT LRU CACHE WITH EFFECT

TypeScript LRU Cache

// lru-cache.ts - TypeScript LRU Cache with Effect
import { Effect, pipe } from "effect";

export interface Event {
  id: string;
  type: string;
  orgId: string;
  payload: any;
  timestamp: number;
  source?: 'local' | 'confirmed' | 'failed' | 'realtime' | 'server';
}

export class LRUCache {
  private cache = new Map<string, Event>();
  private maxSize: number;

  constructor(maxSize = 1000) {
    this.maxSize = maxSize;
  }

  get(key: string): Event | null {
    if (this.cache.has(key)) {
      // Move to end (most recently used)
      const value = this.cache.get(key)!;
      this.cache.delete(key);
      this.cache.set(key, value);
      return value;
    }
    return null;
  }

  set(key: string, value: Event): void {
    if (this.cache.has(key)) {
      this.cache.delete(key);
    } else if (this.cache.size >= this.maxSize) {
      // Remove least recently used (first item)
      const firstKey = this.cache.keys().next().value;
      if (firstKey) {
        this.cache.delete(firstKey);
      }
    }
    this.cache.set(key, value);
  }

  has(key: string): boolean {
    return this.cache.has(key);
  }

  delete(key: string): boolean {
    return this.cache.delete(key);
  }

  clear(): void {
    this.cache.clear();
  }

  size(): number {
    return this.cache.size;
  }

  // Get all values as array
  values(): Event[] {
    return Array.from(this.cache.values());
  }

  // Purge old entries based on timestamp (write-based only)
  purge(olderThan: number): number {
    const toDelete: string[] = [];
    for (const [key, value] of this.cache) {
      if (value.timestamp < olderThan) {
        toDelete.push(key);
      }
    }
    toDelete.forEach(key => this.cache.delete(key));
    return toDelete.length;
  }

  // Effect-based purging with error handling
  purgeWithEffect(olderThan: number): Effect.Effect<number, Error> {
    return pipe(
      Effect.sync(() => this.purge(olderThan)),
      Effect.mapError(error => new Error(`Purge failed: ${error}`))
    );
  }

  // Get cache statistics
  getStats(): { size: number; maxSize: number; utilization: number } {
    return {
      size: this.cache.size,
      maxSize: this.maxSize,
      utilization: (this.cache.size / this.maxSize) * 100
    };
  }
}

CLIENT: SVELTE 5 + TYPESCRIPT

EventLogger Class

// client-svelte.ts - Svelte 5 with LRU Cache + Fetch API
import { LRUCache, type Event } from './lru-cache';

export class EventLogger {
  private cache: LRUCache;
  private ws: WebSocket | null = null;
  private orgId: string;
  private lastSync = 0;

  constructor(orgId: string) {
    this.orgId = orgId;
    this.cache = new LRUCache(1000); // Max 1000 events
    this.loadInitialData();
    this.connectWebSocket();
  }

  // Load initial data from server
  private async loadInitialData(): Promise<void> {
    try {
      const resp = await fetch(`/api/events?orgId=${this.orgId}`);
      const { events } = await resp.json();

      // Populate cache with server events
      events.forEach((event: Event) => {
        this.cache.set(event.id, { ...event, source: 'server' });
      });

      this.lastSync = Date.now();
    } catch (err) {
      console.error('Failed to load initial data:', err);
    }
  }

  // Connect WebSocket for real-time events
  private connectWebSocket(): void {
    const wsUrl = `${location.origin.replace(/^http/, 'ws')}/ws/${this.orgId}`;
    this.ws = new WebSocket(wsUrl);

    this.ws.onmessage = (e) => {
      const event: Event = JSON.parse(e.data);
      event.source = 'realtime';

      // Add to cache
      this.cache.set(event.id, event);
    };

    this.ws.onclose = () => {
      setTimeout(() => this.connectWebSocket(), 1000);
    };
  }

  // Log a new event
  async logEvent(type: string, payload: any): Promise<void> {
    const eventId = `${Date.now()}-${Math.random()}`;
    const event: Event = {
      id: eventId,
      type,
      orgId: this.orgId,
      payload,
      timestamp: Date.now(),
      source: 'local'
    };

    // Optimistic update - add to cache immediately
    this.cache.set(eventId, event);

    // Send to server
    try {
      const resp = await fetch('/api/events', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ type, orgId: this.orgId, payload })
      });

      const { id, timestamp } = await resp.json();

      // Update cache with server response
      const cachedEvent = this.cache.get(eventId);
      if (cachedEvent) {
        cachedEvent.id = id;
        cachedEvent.timestamp = timestamp;
        cachedEvent.source = 'confirmed';
        this.cache.set(id, cachedEvent);
        this.cache.delete(eventId); // Remove old key
      }
    } catch (err) {
      // Mark as failed
      const cachedEvent = this.cache.get(eventId);
      if (cachedEvent) {
        cachedEvent.source = 'failed';
      }
    }
  }

  // Get events for dashboard
  getEvents(filter: { type?: string; since?: number } = {}): Event[] {
    return this.cache.values().filter(event => {
      if (filter.type && event.type !== filter.type) return false;
      if (filter.since && event.timestamp < filter.since) return false;
      return true;
    });
  }

  // Get cache stats
  getStats() {
    return {
      ...this.cache.getStats(),
      lastSync: this.lastSync
    };
  }

  // Get all events sorted by timestamp
  getAllEvents(): Event[] {
    return this.cache.values()
      .sort((a, b) => b.timestamp - a.timestamp);
  }
}

SVELTE 5 COMPONENT

EventLogger Component

<!-- EventLogger.svelte - Svelte 5 Component -->
<script lang="ts">
  import { EventLogger } from './client-svelte';
  import type { Event } from './lru-cache';

  interface Props {
    orgId: string;
  }

  let { orgId }: Props = $props();
  
  let logger: EventLogger;
  let events: Event[] = $state([]);
  let connected = $state(false);
  let cacheStats = $state({ size: 0, maxSize: 1000, utilization: 0 });

  // Initialize logger
  $effect(() => {
    logger = new EventLogger(orgId);
    
    // Update events when cache changes
    const updateEvents = () => {
      events = logger.getAllEvents();
      cacheStats = logger.getStats();
    };

    // Poll for updates (in real app, use reactive patterns)
    const interval = setInterval(updateEvents, 1000);
    
    return () => clearInterval(interval);
  });

  // WebSocket connection status
  $effect(() => {
    if (logger) {
      // Monitor WebSocket connection
      const checkConnection = () => {
        connected = logger['ws']?.readyState === WebSocket.OPEN;
      };
      
      const interval = setInterval(checkConnection, 1000);
      return () => clearInterval(interval);
    }
  });

  async function logEvent(type: string, payload: any) {
    await logger.logEvent(type, payload);
  }

  function getEventsByType(type: string) {
    return logger.getEvents({ type });
  }
</script>

<div class="event-logger">
  <div class="status-bar">
    <span class="status {connected ? 'connected' : 'disconnected'}">
      {connected ? '🟢 Live' : '🔴 Offline'}
    </span>
    <span class="cache-info">
      Cache: {cacheStats.size}/{cacheStats.maxSize} ({cacheStats.utilization.toFixed(1)}%)
    </span>
  </div>

  <div class="controls">
    <button onclick={() => logEvent('test.click', { time: Date.now() })}>
      Test Event
    </button>
    <button onclick={() => logEvent('user.action', { action: 'button_click' })}>
      User Action
    </button>
  </div>

  <div class="events">
    {#each events.slice(0, 50) as event (event.id)}
      <div class="event event-{event.source}">
        <span class="type">{event.type}</span>
        <span class="time">{new Date(event.timestamp).toLocaleTimeString()}</span>
        <span class="source">{event.source}</span>
        <div class="payload">{JSON.stringify(event.payload, null, 2)}</div>
      </div>
    {/each}
  </div>
</div>

QUERYING HISTORICAL DATA

SQL Queries

-- Recent events for organization
SELECT
  to_timestamp(doubles[1]/1000) AS timestamp,
  indexes[1] AS org_id,
  indexes[2] AS event_type,
  blobs[1] AS payload
FROM events
WHERE indexes[1] = 'acme'
  AND timestamp >= now() - interval '24 hours'
ORDER BY timestamp DESC
LIMIT 1000;

-- Event type distribution
SELECT
  indexes[2] AS event_type,
  count(*) AS frequency
FROM events
WHERE indexes[1] = 'acme'
  AND timestamp >= now() - interval '7 days'
GROUP BY indexes[2]
ORDER BY frequency DESC;

cURL Example

# Query via Cloudflare API
curl -X POST \
  -H "Authorization: Bearer $CF_API_TOKEN" \
  -H "Content-Type: application/json" \
  "https://api.cloudflare.com/client/v4/accounts/$CF_ACCOUNT_ID/analytics_engine/sql" \
  -d '{
    "sql": "SELECT count(*) FROM events WHERE indexes[1]='acme' AND now()-to_timestamp(doubles[1]/1000) < interval '1 day';"
  }'

EVENT FLOW STATES

  • Local: Just written to localStorage, pending server confirmation
  • Confirmed: Server acknowledged, will appear in Analytics Engine soon
  • Failed: Server rejected, needs retry or user attention
  • Realtime: Received via WebSocket from another client
  • Server: Historical data fetched from Analytics Engine

OPERATIONAL CONSIDERATIONS

  • LRU limits: Cache automatically evicts old entries, no manual cleanup needed
  • Write-based purging: Automatic cleanup when writing new events
  • Dual storage: Recent data in DO, historical in AE - seamless stitching
  • No schema: Durable Object stores raw JSON, no migrations or schema changes
  • WebSocket resilience: Auto-reconnects, broadcasts to all connected clients
  • Error handling: Effect provides proper error handling for purge operations
  • Performance: LRU cache prevents memory explosion, DO provides fast access

USE CASES

  • Product analytics dashboards with instant feedback
  • Live chat with message history
  • Collaborative editing with activity streams
  • E-commerce tracking with immediate confirmation
  • Gaming leaderboards with real-time updates
  • DevOps monitoring with live + historical views