Reliable task scheduling for Cloudflare Durable Objects, implementing the "reliable runNow" pattern with Effect-TS for resilient long-running tasks.
Effect<T, E, never> instead of Promise<T>
// Before (v0.1.0)
await scheduler.runNow(taskId, 'my-task', params);
// After (v0.2.0)
await Effect.runPromise(scheduler.runNow(taskId, 'my-task', params));
// Or in Effect context:
const program = Effect.gen(function* () {
yield* scheduler.runNow(taskId, 'my-task', params);
});
await Effect.runPromise(program);Task Handler Signature Change:
// Before
type TaskHandler = (scheduler: ReliableScheduler, taskId: string, params: unknown) => Effect.Effect<void>;
// After
type TaskHandler = (taskId: string, params: unknown) => Effect.Effect<void, unknown, typeof SchedulerService>;Cloudflare Durable Objects can evict your code after ~144 seconds of inactivity. For long-running operations (like AI agent loops), a single eviction mid-task breaks your workflow. ironalarm solves this with a lightweight, checkpointed implementation powered by Effect-TS that persists task state and uses a 30-second safety alarm netβif evicted, the task automatically retries and resumes from checkpoints.
- Effect-TS powered: Public APIs return
Effect<T, E, never>, handlers useEffect<T, E, SchedulerService>for DI - Dependency injection:
SchedulerServiceContext.Tag enables testable services - Reliable execution:
runNow()starts immediately + 30s safety alarm for eviction recovery - Future scheduling:
schedule()for delayed/recurring tasks - Priority queues: High/medium/low priority for execution order when multiple tasks are due
- Checkpoints: User-managed progress tracking for resumable work
- Named handlers: Register task handlers by name (no function serialization)
- Fully serializable: Tasks are just
{ taskName, params, progress, priority }
bun install ironalarm
# or
bun add ironalarmimport { ReliableScheduler, SchedulerService } from 'ironalarm';
import { Effect } from 'effect';
export class MyDO {
private scheduler: ReliableScheduler;
constructor(state: DurableObjectState, env: any) {
this.scheduler = new ReliableScheduler(state.storage);
// TaskHandler signature: (taskId, params) => Effect.Effect<void, never, SchedulerService>
this.scheduler.register('my-task', (taskId, params) => {
return Effect.gen(function* () {
const svc = yield* SchedulerService;
const started = yield* svc.getCheckpoint(taskId, 'started');
if (!started) {
yield* Effect.promise(() => doWork(params));
yield* svc.checkpoint(taskId, 'started', true);
}
yield* Effect.promise(() => expensiveOperation());
yield* svc.completeTask(taskId);
});
});
}
async alarm() {
await Effect.runPromise(this.scheduler.alarm());
}
async startTask(params: any) {
const taskId = crypto.randomUUID();
await Effect.runPromise(this.scheduler.runNow(taskId, 'my-task', params));
}
}For larger workflows, use Effect.gen to compose operations:
export class MyDO {
private scheduler: ReliableScheduler;
constructor(state: DurableObjectState, env: any) {
this.scheduler = new ReliableScheduler(state.storage);
this.scheduler.register('complex-task', (taskId, params) => {
return Effect.gen(function* () {
const svc = yield* SchedulerService;
// Check existing state
const started = yield* svc.getCheckpoint(taskId, 'started');
if (!started) {
yield* svc.checkpoint(taskId, 'started', true);
yield* Effect.log('Task initialized');
}
// Do work
yield* Effect.promise(() => processItem(params));
// Update progress
yield* svc.checkpoint(taskId, 'progress', { items: 1 });
// Complete
yield* svc.completeTask(taskId);
yield* Effect.log('Task completed');
});
});
}
async handleRequest(taskId: string, params: any) {
// Provide scheduler context to Effect
await Effect.runPromise(
this.scheduler.runNow(taskId, 'complex-task', params)
);
}
}For tasks that run forever (like game loops, background processors), use maxRetries: Infinity:
// Register an infinite loop handler that reschedules itself
this.scheduler.register('mining-loop', (taskId, params) => {
return Effect.gen(function* () {
const svc = yield* SchedulerService;
// Check if cancelled/paused
const task = yield* svc.getTask(taskId);
if (!task || task.status === 'paused' || task.status === 'failed') return false;
// Do work
yield* Effect.promise(() => mineResources(params));
// Reschedule for next cycle (critical for loops!)
const nextTime = Date.now() + 5000;
yield* svc.schedule(nextTime, taskId, 'mining-loop', params);
return true; // Indicates we should continue
});
});
// Start with infinite retries so it survives DO restarts
await Effect.runPromise(
this.scheduler.runNow(taskId, 'mining-loop', params, { maxRetries: Infinity })
);Critical: Hibernation Recovery
Durable Objects hibernate after ~30 seconds of inactivity. When a DO hibernates for hours/days:
- Tasks with
scheduledAttimes in past become "stuck" - The alarm processes them, but they may not reschedule correctly
- You MUST check and recover stuck tasks on every DO wake-up
Required Pattern: Add recovery checks in your fetch() and alarm() methods:
export class MyDO extends DurableObject {
private scheduler: ReliableScheduler;
constructor(ctx: any, env: any) {
super(ctx, env);
this.scheduler = new ReliableScheduler(this.ctx.storage);
// ... register handlers ...
// Resume tasks after DO restart
this.resumeRunningTasks();
}
private async resumeRunningTasks() {
const LOOP_TASKS = ['mining-loop', 'game-state'];
const tasks = await Effect.runPromise(this.scheduler.getTasks());
const now = Date.now();
for (const task of tasks) {
if (LOOP_TASKS.includes(task.taskName) &&
(task.status === 'running' || task.status === 'failed' || task.status === 'completed')) {
// Recover failed/completed tasks
if (task.status === 'failed' || task.status === 'completed') {
await Effect.runPromise(this.scheduler.checkpoint(task.taskId, '_recovered', true));
}
// If scheduled time is way in past (or never scheduled), reschedule immediately
const scheduledAt = task.scheduledAt || 0;
if (scheduledAt === 0 || (scheduledAt > 0 && now > scheduledAt + 5000)) {
const params = task.params;
await Effect.runPromise(this.scheduler.schedule(now + 100, task.taskId, task.taskName, params));
continue;
}
// Otherwise resume normally
const handler = this.scheduler.getHandler(task.taskName);
if (handler) {
await Effect.runPromise(handler(task.taskId, task.params));
}
}
}
}
async fetch(request: Request): Promise<Response> {
// CRITICAL: Check for stuck tasks on every wake-up
await this.recoverStuckTasks();
return this.app.fetch(request);
}
async alarm() {
await Effect.runPromise(this.scheduler.alarm());
// CRITICAL: Check again after alarm processing
await this.recoverStuckTasks();
}
private async recoverStuckTasks() {
const LOOP_TASKS = ['mining-loop', 'game-state'];
const tasks = await Effect.runPromise(this.scheduler.getTasks());
const now = Date.now();
for (const task of tasks) {
if (!LOOP_TASKS.includes(task.taskName)) continue;
// Recover failed/completed
if (task.status === 'failed' || task.status === 'completed') {
await Effect.runPromise(this.scheduler.checkpoint(task.taskId, '_recovered', true));
}
// Reschedule if stuck (scheduled >5 seconds ago OR never scheduled)
const scheduledAt = task.scheduledAt || 0;
if ((task.status === 'running' || task.status === 'pending') &&
(scheduledAt === 0 || (scheduledAt > 0 && now > scheduledAt + 5000))) {
const params = task.params;
await Effect.runPromise(this.scheduler.schedule(now + 100, task.taskId, task.taskName, params));
}
}
}
}Why This Matters: Without recovery checks, tasks scheduled for "5 seconds from now" will be stuck if the DO hibernates for hours. The alarm processes overdue tasks, but they may not reschedule correctly. Recovery checks ensure they resume properly.
Recovery Threshold: The recoverStuckTasks() method uses a 5-second threshold (not 60 seconds) to catch stuck tasks quickly. It also handles:
- Tasks with
scheduledAt === 0(never scheduled) - Tasks in "pending" status that should be running
- Tasks that are overdue by more than 5 seconds
Troubleshooting Stuck Tasks: If tasks stop counting after hibernation:
- Check logs for
[recoverStuckTasks]messages - these show what's being recovered - Check logs for
[fetch]and[alarm]messages - these show task states on wake-up - Verify
recoverStuckTasks()is being called in bothfetch()andalarm()methods - Ensure tasks are rescheduling themselves correctly in their handlers
CPU Limit Management: Durable Objects get their CPU time limit "topped up" on each request (including fetch() and alarm() calls). However, if many tasks recover simultaneously, they can exhaust CPU before completing. The scheduler includes:
- Concurrency limits: Processes tasks in batches (default: 10 concurrent) to prevent CPU exhaustion
- Task prioritization: Recovery tasks (stuck >5s) process before normally scheduled tasks
- Execution time monitoring: Logs warnings if alarm processing takes >5s, errors if >10s
- Per-task timing: Logs warnings for individual tasks taking >1s
To adjust concurrency for high-throughput scenarios:
const scheduler = new ReliableScheduler(storage, { maxConcurrentTasks: 20 });new ReliableScheduler(storage: DurableObjectStorage, options?)storage- Durable Object storage instanceoptions.maxConcurrentTasks- Maximum number of tasks to process concurrently (default: 10)
type TaskHandler = (taskId: string, params: unknown) => Effect.Effect<void, never, SchedulerService>;
scheduler.register(taskName: string, handler: TaskHandler): voidHandlers receive SchedulerService from Effect context:
scheduler.register('my-task', (taskId, params) => {
return Effect.gen(function* () {
const svc = yield* SchedulerService;
yield* svc.checkpoint(taskId, 'progress', { step: 1 });
// ... do work ...
yield* svc.completeTask(taskId);
});
});All methods return Effect<T, E, SchedulerService>. Wrap with Effect.runPromise() for async use:
// Start task immediately (returns Effect<void, HandlerMissing, SchedulerService>)
scheduler.runNow(taskId, taskName, params?, options?)
.pipe(Effect.runPromise)
// Schedule for future time (returns Effect<void, HandlerMissing, SchedulerService>)
scheduler.schedule(at: Date | number, taskId, taskName, params?, options?)
.pipe(Effect.runPromise)Options:
options.maxRetriesβ Override retry limit (default: 3, useInfinityfor loop tasks)options.priorityβ Task priority: 0=high, 1=medium, 2=low (default: 1)
// Save progress (returns Effect<void, never, SchedulerService>)
scheduler.checkpoint(taskId, key, value)
.pipe(Effect.runPromise)
// Retrieve progress (returns Effect<unknown, never, SchedulerService>)
scheduler.getCheckpoint(taskId, key)
.pipe(Effect.runPromise)
// Batch save (returns Effect<void, never, SchedulerService>)
scheduler.checkpointMultiple(taskId, updates: Record<string, unknown>)
.pipe(Effect.runPromise)// Mark complete (returns Effect<void, never, SchedulerService>)
scheduler.completeTask(taskId)
.pipe(Effect.runPromise)
// Get single task (returns Effect<Task | undefined, never, SchedulerService>)
scheduler.getTask(taskId)
.pipe(Effect.runPromise)
// List tasks (returns Effect<Task[], never, SchedulerService>)
scheduler.getTasks(status?: TaskStatus)
.pipe(Effect.runPromise)
// Cancel task (returns Effect<boolean, never, SchedulerService>)
scheduler.cancelTask(taskId)
.pipe(Effect.runPromise)
// Pause task (returns Effect<boolean, never, SchedulerService>)
scheduler.pauseTask(taskId)
.pipe(Effect.runPromise)
// Resume task (returns Effect<boolean, never, SchedulerService>)
scheduler.resumeTask(taskId)
.pipe(Effect.runPromise)
// Delete completed (returns Effect<number, never, SchedulerService>)
scheduler.clearCompleted()
.pipe(Effect.runPromise)
// Delete all (returns Effect<number, never, SchedulerService>)
scheduler.clearAll()
.pipe(Effect.runPromise)// Process due tasks (call from DO alarm)
scheduler.alarm()
.pipe(Effect.runPromise)
// Recover stuck tasks after hibernation
scheduler.recoverStuckTasks(taskNames?: string[])
.pipe(Effect.runPromise)
// Get registered handler
scheduler.getHandler(taskName): TaskHandler | undefinedAll errors are tagged and raised in the error channel:
class HandlerMissing extends Data.TaggedError("HandlerMissing")<{
taskName: string
}> {}
class TaskNotFound extends Data.TaggedError("TaskNotFound")<{
taskId: string
}> {}
class TaskConflict extends Data.TaggedError("TaskConflict")<{
taskId: string
currentStatus: string
operation: string
}> {}Example error handling:
const result = await Effect.runPromise(
Effect.either(scheduler.runNow(taskId, 'my-task', params))
);
if (result._tag === 'Left') {
const error = result.left;
if (error._tag === 'HandlerMissing') {
console.error('No handler for task:', error.taskName);
}
}When multiple tasks are due at the same time, priority determines execution order:
// High priority - executes first (0 = highest)
await Effect.runPromise(
scheduler.runNow('urgent-task', 'process', data, { priority: 0 })
);
// Medium priority - default behavior
await Effect.runPromise(
scheduler.runNow('normal-task', 'process', data)
);
// Low priority - executes last
await Effect.runPromise(
scheduler.runNow('background-task', 'cleanup', {}, { priority: 2 })
);
// Also works with schedule()
await Effect.runPromise(
scheduler.schedule(Date.now() + 5000, 'task-id', 'handler', params, { priority: 0 })
);Behavior:
- Tasks due at same time β higher priority (lower number) runs first
- Different
scheduledAtβ earlier time runs first (priority is secondary sort) - Default priority = 1 (medium)
- Backward compatible (existing tasks default to priority 1)
- Effect-TS powered: Composable, testable APIs with dependency injection
- Eviction safety: 30s safety alarm retries if evicted
- Checkpoints: Skip already-done work on resume
- Named handlers: No function serialization required
- Single queue: One alarm drives all tasks, sorted by time then priority
- Retry limits: Tasks automatically fail after 3 retries (configurable via
maxRetries) - Pause/resume: Tasks can be paused and resumed without losing state
- Structured errors: All failures in explicit error channel (no swallowed exceptions)
For high-throughput applications, distribute tasks across multiple DO instances using consistent hashing:
// Sharding by user ID - ensures user tasks stay in same DO
function getShardId(userId: string, totalShards: number): string {
const hash = crypto.createHash('md5').update(userId).digest('hex');
const shardNum = parseInt(hash.slice(0, 8), 16) % totalShards;
return `scheduler-shard-${shardNum}`;
}
// In your Worker
export default {
async fetch(request: Request, env: Env) {
const url = new URL(request.url);
const userId = url.searchParams.get('userId');
const shardId = getShardId(userId, 10); // 10 shards
const id = env.TASK_SCHEDULER_DO.idFromName(shardId);
const stub = env.TASK_SCHEDULER_DO.get(id);
return stub.fetch(request);
}
};Scaling Guidelines:
- 1-10 DOs: Low traffic applications
- 10-100 DOs: Medium traffic (thousands of concurrent users)
- 100+ DOs: High traffic (millions of users)
- Monitor CPU usage per DO - shard when approaching 10s alarm processing time
Critical: All public API calls must be wrapped with Effect.runPromise():
// β
CORRECT: Wrap all scheduler calls
await Effect.runPromise(scheduler.runNow(taskId, 'my-task', params));
await Effect.runPromise(scheduler.schedule(time, taskId, 'my-task', params));
await Effect.runPromise(scheduler.getTasks());
// β WRONG: Direct calls return Effect<T>, not Promise<T>
const result = scheduler.runNow(taskId, 'my-task', params); // Effect<T>
await result; // TypeError: Effect is not a PromiseWhy this matters:
- ironalarm APIs return
Effect<T, E, R>for composability - Cloudflare Workers expect
Promise<T>from fetch handlers - Effect.runPromise() executes the Effect and returns a Promise
// β WRONG: Old v0.1.0 signature
this.scheduler.register('my-task', (scheduler, taskId, params) => {
return Effect.gen(function* () {
// scheduler is passed as first param
yield* scheduler.checkpoint(taskId, 'key', 'value');
});
});
// β
CORRECT: New v0.2.0 signature
this.scheduler.register('my-task', (taskId, params) => {
return Effect.gen(function* () {
const svc = yield* SchedulerService; // Get from context
yield* svc.checkpoint(taskId, 'key', 'value');
});
});Migration: Remove scheduler parameter, use yield* SchedulerService instead.
// β WRONG: DO methods must return Promise<T>
async startTask(params: any) {
return this.scheduler.runNow(taskId, 'my-task', params); // Returns Effect<T>
}
// β
CORRECT: Wrap with Effect.runPromise
async startTask(params: any) {
return Effect.runPromise(this.scheduler.runNow(taskId, 'my-task', params));
}// β WRONG: Default 3 retries causes loops to fail after eviction
await Effect.runPromise(
scheduler.runNow(taskId, 'mining-loop', params)
);
// β
CORRECT: Explicit infinite retries for loops
await Effect.runPromise(
scheduler.runNow(taskId, 'mining-loop', params, { maxRetries: Infinity })
);// β WRONG: Tasks get stuck after DO hibernation
async fetch(request: Request): Promise<Response> {
return this.app.fetch(request); // No recovery check
}
// β
CORRECT: Always check for stuck tasks
async fetch(request: Request): Promise<Response> {
await this.recoverStuckTasks(); // Critical!
return this.app.fetch(request);
}// β WRONG: Allows path traversal attacks
await Effect.runPromise(
scheduler.runNow("../etc/passwd", "handler", params)
);
// β
CORRECT: Validation happens automatically
// TaskLimitExceeded thrown for invalid identifiers// β WRONG: Unbounded task creation
const scheduler = new ReliableScheduler(storage); // Default 10000 limit
// β
CORRECT: Configure limits for your use case
const scheduler = new ReliableScheduler(storage, {
maxTotalTasks: 50000, // Adjust based on storage capacity
maxConcurrentTasks: 50 // Adjust based on CPU limits
});// β WRONG: Unhandled Effect errors crash the DO
await Effect.runPromise(scheduler.runNow(taskId, 'missing-handler', params));
// β
CORRECT: Handle errors explicitly
const result = await Effect.runPromise(
Effect.either(scheduler.runNow(taskId, 'missing-handler', params))
);
if (result._tag === 'Left') {
// Handle HandlerMissing, ValidationError, TaskLimitExceeded, etc.
}Key Metrics to Monitor:
- DO CPU time per alarm cycle (>10s = too many tasks)
- Task queue length (spikes indicate backlogs)
- Recovery frequency (high = hibernation issues)
- Error rates by type (ValidationError = bad inputs)
Debug Commands:
# Check task states
curl "https://your-worker.dev/tasks"
# Force alarm processing
curl "https://your-worker.dev/_alarm" -X POST
# Check DO logs for recovery messages
# Look for: [recoverStuckTasks], [fetch] DO woke up, [alarm] RecoveredMIT