REAL-TIME LOGGING POC
Dual Database: Durable Objects + Analytics Engine
LRU CACHE + EFFECT 2025.09.15
DUAL DATABASE POC
Problem: Analytics Engine has 30-60s write delay. Need instant feedback + reliable history.
Solution: Dual database approach - Durable Object for immediate storage (no schema), Analytics Engine for long-term queryable data. LRU cache prevents explosion.
User Action → LRU Cache (instant)
↓
Durable Object (immediate, no schema)
↓
WebSocket (broadcast)
↓
Analytics Engine (long-term, queryable)HOW IT WORKS
- Page Load: Fetch events from server (seamless DO + AE stitching)
- User Action: Write to LRU cache immediately (optimistic update)
- Server: Store in Durable Object (immediate) + Analytics Engine (buffered)
- Broadcast: Stream live to connected clients via WebSocket
- Purge: Effect periodically cleans LRU cache, data flows to AE
- Seamless: Client gets unified view of recent (DO) + historical (AE) data
BENEFITS
- No schema complexity: Durable Object stores raw JSON, no migrations
- Self-cleaning: LRU cache prevents memory explosion
- Dual persistence: Immediate (DO) + long-term (AE) storage
- Seamless stitching: Client gets unified view of recent + historical data
- Effect-driven purging: Automated cleanup with proper error handling
- Real-time broadcasting: Live updates to all connected clients
INFRASTRUCTURE
alchemy.run.ts
// alchemy.run.ts - Dual Database Setup
import alchemy from "alchemy";
import { Worker, DurableObjectNamespace, AnalyticsEngineDataset } from "alchemy/cloudflare";
const app = await alchemy("real-time-logging");
// Analytics Engine for long-term event storage
const analytics = await AnalyticsEngineDataset("events", {
title: "Event Analytics"
});
// Durable Object for simple JSON logging (no schema)
const loggerNamespace = DurableObjectNamespace("logger", {
className: "Logger"
});
// Durable Object for WebSocket coordination
const roomNamespace = DurableObjectNamespace("room", {
className: "Room"
});
// Main worker with dual database bindings
const worker = await Worker("api", {
entrypoint: "./src/worker.ts",
bindings: {
ANALYTICS: analytics, // Long-term storage
LOGGER: loggerNamespace, // Fast, immediate storage
ROOM: roomNamespace // Real-time broadcasting
}
});
console.log({ url: worker.url });
await app.finalize();WORKER WITH RECENT EVENTS ENDPOINT
Enhanced Worker
// src/worker.ts - Hono-based Dual Database Worker
import { Hono } from 'hono';
import type { worker } from "../alchemy.run";
const app = new Hono<{ Bindings: typeof worker.Env }>();
// POST /api/events - Create new event
app.post('/api/events', async (c) => {
const { type, orgId, payload } = await c.req.json();
if (!type || !orgId) {
return c.json({ error: 'type and orgId required' }, 400);
}
const timestamp = Date.now();
const eventId = `${timestamp}-${Math.random().toString(36).substr(2, 9)}`;
// 1) Store in Durable Object (immediate, no schema)
const doId = c.env.LOGGER.idFromName(`org:${orgId}`);
const loggerStub = c.env.LOGGER.get(doId);
await loggerStub.fetch('https://do/log', {
method: 'POST',
body: JSON.stringify({
id: eventId,
type,
orgId,
payload,
timestamp
})
});
// 2) Write to Analytics Engine (buffered, for long-term storage)
await c.env.ANALYTICS.writeDataPoint({
indexes: [String(orgId), String(type)],
blobs: [JSON.stringify(payload ?? {})],
doubles: [timestamp]
});
// 3) Broadcast to connected clients
const roomId = c.env.ROOM.idFromName(`org:${orgId}`);
const roomStub = c.env.ROOM.get(roomId);
await roomStub.fetch('https://do/publish', {
method: 'POST',
body: JSON.stringify({
id: eventId,
type,
orgId,
payload,
timestamp
})
});
return c.json({ id: eventId, timestamp });
});
// GET /api/events - Get events (seamless DO + AE stitching)
app.get('/api/events', async (c) => {
const orgId = c.req.query('orgId');
const limit = parseInt(c.req.query('limit') || '100');
if (!orgId) {
return c.json({ error: 'orgId required' }, 400);
}
// Get recent events from Durable Object (fast, immediate)
const doId = c.env.LOGGER.idFromName(`org:${orgId}`);
const loggerStub = c.env.LOGGER.get(doId);
const doResponse = await loggerStub.fetch(`https://do/events?limit=${limit}`);
const doEvents = await doResponse.json();
// Get older events from Analytics Engine (slower, but comprehensive)
const aeResponse = await loggerStub.fetch(`https://do/events/analytics?limit=${limit}&orgId=${orgId}`);
const aeEvents = await aeResponse.json();
// Stitch together: DO events (recent) + AE events (older, not in DO)
const doEventIds = new Set(doEvents.map((e: any) => e.id));
const uniqueAEEvents = aeEvents.filter((e: any) => !doEventIds.has(e.id));
const allEvents = [...doEvents, ...uniqueAEEvents]
.sort((a: any, b: any) => b.timestamp - a.timestamp)
.slice(0, limit);
return c.json({ events: allEvents });
});
// WebSocket connection
app.get('/ws/:orgId', async (c) => {
const orgId = c.req.param('orgId');
const id = c.env.ROOM.idFromName(`org:${orgId}`);
const stub = c.env.ROOM.get(id);
return await stub.fetch('https://do/connect');
});
export default app;DURABLE OBJECT ROOM
WebSocket Coordination
// src/room.ts - Hono-based WebSocket Broadcasting
import { Hono } from 'hono';
import { DurableObject } from "cloudflare:workers";
export class Room extends DurableObject {
private sockets: Set<WebSocket> = new Set();
async fetch(req: Request) {
const app = new Hono();
// WebSocket connection endpoint
app.get('/connect', (c) => {
const pair = new WebSocketPair();
const server = pair[1];
server.accept();
server.addEventListener('close', () =>
this.sockets.delete(server));
server.addEventListener('error', () =>
this.sockets.delete(server));
this.sockets.add(server);
return new Response(null, {
status: 101,
webSocket: pair[0]
});
});
// Publish event to all connected clients
app.post('/publish', async (c) => {
const event = await c.req.json();
const msg = JSON.stringify(event);
// Broadcast to all connected clients
for (const ws of this.sockets) {
try {
ws.send(msg);
} catch (e) {
// Remove dead connections
this.sockets.delete(ws);
}
}
return c.text('ok');
});
// Get connection stats
app.get('/stats', (c) => {
return c.json({
connectedClients: this.sockets.size
});
});
return app.fetch(req);
}
}TYPESCRIPT LRU CACHE WITH EFFECT
TypeScript LRU Cache
// lru-cache.ts - TypeScript LRU Cache with Effect
import { Effect, pipe } from "effect";
export interface Event {
id: string;
type: string;
orgId: string;
payload: any;
timestamp: number;
source?: 'local' | 'confirmed' | 'failed' | 'realtime' | 'server';
}
export class LRUCache {
private cache = new Map<string, Event>();
private maxSize: number;
constructor(maxSize = 1000) {
this.maxSize = maxSize;
}
get(key: string): Event | null {
if (this.cache.has(key)) {
// Move to end (most recently used)
const value = this.cache.get(key)!;
this.cache.delete(key);
this.cache.set(key, value);
return value;
}
return null;
}
set(key: string, value: Event): void {
if (this.cache.has(key)) {
this.cache.delete(key);
} else if (this.cache.size >= this.maxSize) {
// Remove least recently used (first item)
const firstKey = this.cache.keys().next().value;
if (firstKey) {
this.cache.delete(firstKey);
}
}
this.cache.set(key, value);
}
has(key: string): boolean {
return this.cache.has(key);
}
delete(key: string): boolean {
return this.cache.delete(key);
}
clear(): void {
this.cache.clear();
}
size(): number {
return this.cache.size;
}
// Get all values as array
values(): Event[] {
return Array.from(this.cache.values());
}
// Purge old entries based on timestamp (write-based only)
purge(olderThan: number): number {
const toDelete: string[] = [];
for (const [key, value] of this.cache) {
if (value.timestamp < olderThan) {
toDelete.push(key);
}
}
toDelete.forEach(key => this.cache.delete(key));
return toDelete.length;
}
// Effect-based purging with error handling
purgeWithEffect(olderThan: number): Effect.Effect<number, Error> {
return pipe(
Effect.sync(() => this.purge(olderThan)),
Effect.mapError(error => new Error(`Purge failed: ${error}`))
);
}
// Get cache statistics
getStats(): { size: number; maxSize: number; utilization: number } {
return {
size: this.cache.size,
maxSize: this.maxSize,
utilization: (this.cache.size / this.maxSize) * 100
};
}
}CLIENT: SVELTE 5 + TYPESCRIPT
EventLogger Class
// client-svelte.ts - Svelte 5 with LRU Cache + Fetch API
import { LRUCache, type Event } from './lru-cache';
export class EventLogger {
private cache: LRUCache;
private ws: WebSocket | null = null;
private orgId: string;
private lastSync = 0;
constructor(orgId: string) {
this.orgId = orgId;
this.cache = new LRUCache(1000); // Max 1000 events
this.loadInitialData();
this.connectWebSocket();
}
// Load initial data from server
private async loadInitialData(): Promise<void> {
try {
const resp = await fetch(`/api/events?orgId=${this.orgId}`);
const { events } = await resp.json();
// Populate cache with server events
events.forEach((event: Event) => {
this.cache.set(event.id, { ...event, source: 'server' });
});
this.lastSync = Date.now();
} catch (err) {
console.error('Failed to load initial data:', err);
}
}
// Connect WebSocket for real-time events
private connectWebSocket(): void {
const wsUrl = `${location.origin.replace(/^http/, 'ws')}/ws/${this.orgId}`;
this.ws = new WebSocket(wsUrl);
this.ws.onmessage = (e) => {
const event: Event = JSON.parse(e.data);
event.source = 'realtime';
// Add to cache
this.cache.set(event.id, event);
};
this.ws.onclose = () => {
setTimeout(() => this.connectWebSocket(), 1000);
};
}
// Log a new event
async logEvent(type: string, payload: any): Promise<void> {
const eventId = `${Date.now()}-${Math.random()}`;
const event: Event = {
id: eventId,
type,
orgId: this.orgId,
payload,
timestamp: Date.now(),
source: 'local'
};
// Optimistic update - add to cache immediately
this.cache.set(eventId, event);
// Send to server
try {
const resp = await fetch('/api/events', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ type, orgId: this.orgId, payload })
});
const { id, timestamp } = await resp.json();
// Update cache with server response
const cachedEvent = this.cache.get(eventId);
if (cachedEvent) {
cachedEvent.id = id;
cachedEvent.timestamp = timestamp;
cachedEvent.source = 'confirmed';
this.cache.set(id, cachedEvent);
this.cache.delete(eventId); // Remove old key
}
} catch (err) {
// Mark as failed
const cachedEvent = this.cache.get(eventId);
if (cachedEvent) {
cachedEvent.source = 'failed';
}
}
}
// Get events for dashboard
getEvents(filter: { type?: string; since?: number } = {}): Event[] {
return this.cache.values().filter(event => {
if (filter.type && event.type !== filter.type) return false;
if (filter.since && event.timestamp < filter.since) return false;
return true;
});
}
// Get cache stats
getStats() {
return {
...this.cache.getStats(),
lastSync: this.lastSync
};
}
// Get all events sorted by timestamp
getAllEvents(): Event[] {
return this.cache.values()
.sort((a, b) => b.timestamp - a.timestamp);
}
}SVELTE 5 COMPONENT
EventLogger Component
<!-- EventLogger.svelte - Svelte 5 Component -->
<script lang="ts">
import { EventLogger } from './client-svelte';
import type { Event } from './lru-cache';
interface Props {
orgId: string;
}
let { orgId }: Props = $props();
let logger: EventLogger;
let events: Event[] = $state([]);
let connected = $state(false);
let cacheStats = $state({ size: 0, maxSize: 1000, utilization: 0 });
// Initialize logger
$effect(() => {
logger = new EventLogger(orgId);
// Update events when cache changes
const updateEvents = () => {
events = logger.getAllEvents();
cacheStats = logger.getStats();
};
// Poll for updates (in real app, use reactive patterns)
const interval = setInterval(updateEvents, 1000);
return () => clearInterval(interval);
});
// WebSocket connection status
$effect(() => {
if (logger) {
// Monitor WebSocket connection
const checkConnection = () => {
connected = logger['ws']?.readyState === WebSocket.OPEN;
};
const interval = setInterval(checkConnection, 1000);
return () => clearInterval(interval);
}
});
async function logEvent(type: string, payload: any) {
await logger.logEvent(type, payload);
}
function getEventsByType(type: string) {
return logger.getEvents({ type });
}
</script>
<div class="event-logger">
<div class="status-bar">
<span class="status {connected ? 'connected' : 'disconnected'}">
{connected ? '🟢 Live' : '🔴 Offline'}
</span>
<span class="cache-info">
Cache: {cacheStats.size}/{cacheStats.maxSize} ({cacheStats.utilization.toFixed(1)}%)
</span>
</div>
<div class="controls">
<button onclick={() => logEvent('test.click', { time: Date.now() })}>
Test Event
</button>
<button onclick={() => logEvent('user.action', { action: 'button_click' })}>
User Action
</button>
</div>
<div class="events">
{#each events.slice(0, 50) as event (event.id)}
<div class="event event-{event.source}">
<span class="type">{event.type}</span>
<span class="time">{new Date(event.timestamp).toLocaleTimeString()}</span>
<span class="source">{event.source}</span>
<div class="payload">{JSON.stringify(event.payload, null, 2)}</div>
</div>
{/each}
</div>
</div>
QUERYING HISTORICAL DATA
SQL Queries
-- Recent events for organization
SELECT
to_timestamp(doubles[1]/1000) AS timestamp,
indexes[1] AS org_id,
indexes[2] AS event_type,
blobs[1] AS payload
FROM events
WHERE indexes[1] = 'acme'
AND timestamp >= now() - interval '24 hours'
ORDER BY timestamp DESC
LIMIT 1000;
-- Event type distribution
SELECT
indexes[2] AS event_type,
count(*) AS frequency
FROM events
WHERE indexes[1] = 'acme'
AND timestamp >= now() - interval '7 days'
GROUP BY indexes[2]
ORDER BY frequency DESC;cURL Example
# Query via Cloudflare API
curl -X POST \
-H "Authorization: Bearer $CF_API_TOKEN" \
-H "Content-Type: application/json" \
"https://api.cloudflare.com/client/v4/accounts/$CF_ACCOUNT_ID/analytics_engine/sql" \
-d '{
"sql": "SELECT count(*) FROM events WHERE indexes[1]='acme' AND now()-to_timestamp(doubles[1]/1000) < interval '1 day';"
}'EVENT FLOW STATES
- Local: Just written to localStorage, pending server confirmation
- Confirmed: Server acknowledged, will appear in Analytics Engine soon
- Failed: Server rejected, needs retry or user attention
- Realtime: Received via WebSocket from another client
- Server: Historical data fetched from Analytics Engine
OPERATIONAL CONSIDERATIONS
- LRU limits: Cache automatically evicts old entries, no manual cleanup needed
- Write-based purging: Automatic cleanup when writing new events
- Dual storage: Recent data in DO, historical in AE - seamless stitching
- No schema: Durable Object stores raw JSON, no migrations or schema changes
- WebSocket resilience: Auto-reconnects, broadcasts to all connected clients
- Error handling: Effect provides proper error handling for purge operations
- Performance: LRU cache prevents memory explosion, DO provides fast access
USE CASES
- Product analytics dashboards with instant feedback
- Live chat with message history
- Collaborative editing with activity streams
- E-commerce tracking with immediate confirmation
- Gaming leaderboards with real-time updates
- DevOps monitoring with live + historical views