From e60f2c444eef3bf43b1be151a82fade73b7fb817 Mon Sep 17 00:00:00 2001 From: Dan Carter Date: Thu, 16 Apr 2026 18:06:52 -0400 Subject: [PATCH] refactor: cherry-pick improvements from PR #3 - add parseStoredJson helper for descriptive JSON parse errors - extract DO cache private helpers (readCacheValue, deleteExpiredCacheValue, upsertCacheValue) - update README to document queue, list, and force-release features - add .pi/ to .gitignore --- .gitignore | 1 + README.md | 11 +++-- src/adapter.ts | 32 ++++++++++----- src/durable-object.ts | 95 ++++++++++++++++++++----------------------- 4 files changed, 74 insertions(+), 65 deletions(-) diff --git a/.gitignore b/.gitignore index 4ab5689..3e08c57 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ coverage/ *.tsbuildinfo .DS_Store .wrangler/ +.pi/ example/package-lock.json diff --git a/README.md b/README.md index 8591ff3..7b16d06 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![npm downloads](https://img.shields.io/npm/dm/chat-state-cloudflare-do)](https://www.npmjs.com/package/chat-state-cloudflare-do) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) -Cloudflare Durable Objects state adapter for [Chat SDK](https://chat-sdk.dev/docs). Uses a SQLite-backed [Durable Object](https://developers.cloudflare.com/durable-objects/) for persistent subscriptions, distributed locking, and caching — with zero external dependencies beyond the Workers runtime. +Cloudflare Durable Objects state adapter for [Chat SDK](https://chat-sdk.dev/docs). Uses a SQLite-backed [Durable Object](https://developers.cloudflare.com/durable-objects/) for persistent subscriptions, distributed locking, queueing, list-backed message history, and caching — with zero external dependencies beyond the Workers runtime. ## Installation @@ -97,7 +97,7 @@ const state = createCloudflareState({ }); ``` -Locks and subscriptions are per-thread, so sharding by any prefix of the thread ID is safe. Cache operations (`get`/`set`/`delete`) always route to the default shard since their keys are not thread-scoped. +Locks, force-release, and queue operations are per-thread, so sharding by any prefix of the thread ID is safe. Cache and list operations (`get`/`set`/`delete`, `appendToList`/`getList`) always route to the default shard since their keys are not thread-scoped. | Strategy | `shardKey` | DOs created | |----------|-----------|-------------| @@ -107,11 +107,13 @@ Locks and subscriptions are per-thread, so sharding by any prefix of the thread ## Architecture -The adapter uses a single Durable Object class (`ChatStateDO`) with three SQLite tables: +The adapter uses a single Durable Object class (`ChatStateDO`) with five SQLite tables: - **`subscriptions`** — thread IDs the bot is subscribed to - **`locks`** — distributed locks with token-based ownership and TTL - **`cache`** — key-value pairs with optional TTL +- **`queue`** — thread-scoped FIFO queue entries with TTL for concurrency strategies +- **`lists`** — ordered list entries for persistent message history All operations are single-threaded within a DO instance, providing distributed locking via DO atomicity rather than Lua scripts. Expired entries are cleaned up automatically via the [Alarms API](https://developers.cloudflare.com/durable-objects/api/alarms/). @@ -121,6 +123,9 @@ Each method call creates a fresh DO stub. Stubs are cheap (just a JS object) and - Persistent subscriptions across deployments - Distributed locking via single-threaded DO atomicity +- Lock force-release for Chat SDK lock conflict handling +- Queue/debounce concurrency primitives +- List-backed persistent message history - Key-value caching with TTL - Automatic TTL cleanup via Alarms - Optional sharding for high-traffic bots diff --git a/src/adapter.ts b/src/adapter.ts index 839c2e6..8841e99 100644 --- a/src/adapter.ts +++ b/src/adapter.ts @@ -2,6 +2,17 @@ import type { Lock, QueueEntry, StateAdapter } from "chat"; import type { ChatStateDO } from "./durable-object"; import type { CloudflareStateOptions } from "./types"; +function parseStoredJson(raw: string, label: string): T { + try { + return JSON.parse(raw) as T; + } catch (error) { + throw new Error( + `CloudflareDOStateAdapter: expected JSON-encoded ${label}`, + { cause: error }, + ); + } +} + /** * Chat SDK state adapter backed by Cloudflare Durable Objects. * @@ -120,8 +131,12 @@ export class CloudflareDOStateAdapter implements StateAdapter { async dequeue(threadId: string): Promise { const raw = await this.stub(threadId).dequeue(threadId); - if (raw === null) return null; - return JSON.parse(raw) as QueueEntry; + return raw === null + ? null + : parseStoredJson( + raw, + `queue entry for thread "${threadId}"`, + ); } async queueDepth(threadId: string): Promise { @@ -147,7 +162,9 @@ export class CloudflareDOStateAdapter implements StateAdapter { async getList(key: string): Promise { const raw = await this.stub().listGet(key); - return raw.map((v: string) => JSON.parse(v) as T); + return raw.map((v: string) => + parseStoredJson(v, `list entry for key "${key}"`), + ); } // -- Cache --------------------------------------------------------------- @@ -165,14 +182,7 @@ export class CloudflareDOStateAdapter implements StateAdapter { if (raw === null) { return null; } - try { - return JSON.parse(raw) as T; - } catch { - // Defensive: set() always JSON.stringify's, so parse should never - // fail through the public API. This handles values written directly - // to the DO's cache table outside of the adapter. - return raw as unknown as T; - } + return parseStoredJson(raw, `cache value for key "${key}"`); } async set(key: string, value: T, ttlMs?: number): Promise { diff --git a/src/durable-object.ts b/src/durable-object.ts index 07ecd5d..bab7a51 100644 --- a/src/durable-object.ts +++ b/src/durable-object.ts @@ -356,69 +356,33 @@ export class ChatStateDO extends DurableObject { // -- Cache --------------------------------------------------------------- cacheGet(key: string): string | null { - const now = Date.now(); - const rows = this.sql - .exec<{ value: string }>( - "SELECT value FROM cache WHERE key = ? AND (expires_at IS NULL OR expires_at > ?)", - key, - now - ) - .toArray(); - return rows.length > 0 ? rows[0].value : null; + return this.readCacheValue(key, Date.now()); } cacheSet(key: string, value: string, ttlMs?: number): void { - // ttlMs of 0, null, or undefined means "no expiry" — matches Redis adapter - // behavior where falsy ttlMs persists the entry forever. const expiresAt = ttlMs ? Date.now() + ttlMs : null; - this.sql.exec( - "INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)", - key, - value, - expiresAt - ); - // Only schedule alarm when we actually added an expiring entry — - // avoids a wasted nextExpiry() SQL scan on permanent cache writes. + this.upsertCacheValue(key, value, expiresAt); + if (expiresAt != null) { this.scheduleCleanupIfNeeded(); } } - /** - * Set the key only if it does not exist (or is expired). Returns true if - * the value was set, false if the key already existed and is not expired. - */ cacheSetIfNotExists(key: string, value: string, ttlMs?: number): boolean { const now = Date.now(); const result = this.ctx.storage.transactionSync(() => { - const existing = this.sql - .exec( - "SELECT 1 FROM cache WHERE key = ? AND (expires_at IS NULL OR expires_at > ?)", - key, - now - ) - .toArray(); - if (existing.length > 0) { + this.deleteExpiredCacheValue(key, now); + + const existing = this.readCacheValue(key, now); + if (existing !== null) { return { inserted: false, expiresAt: null as number | null }; } - // Remove any expired row that's still physically present — - // without this, the INSERT below would hit a PRIMARY KEY violation. - this.sql.exec( - "DELETE FROM cache WHERE key = ? AND expires_at IS NOT NULL AND expires_at <= ?", - key, - now - ); - const expiresAt = ttlMs ? Date.now() + ttlMs : null; - this.sql.exec( - "INSERT INTO cache (key, value, expires_at) VALUES (?, ?, ?)", - key, - value, - expiresAt - ); + + const expiresAt = ttlMs ? now + ttlMs : null; + this.upsertCacheValue(key, value, expiresAt); return { inserted: true, expiresAt }; }); - // Schedule alarm outside the transaction — same pattern as acquireLock(). if (result.inserted && result.expiresAt != null) { this.scheduleCleanupIfNeeded(); } @@ -463,11 +427,40 @@ export class ChatStateDO extends DurableObject { } } - /** - * Find the earliest future expiration timestamp across all tables. - * Filters out already-expired rows to avoid scheduling unnecessary - * immediate alarms. - */ + // -- Cache helpers (private) ----------------------------------------------- + + private readCacheValue(key: string, now: number): string | null { + const rows = this.sql + .exec<{ value: string }>( + "SELECT value FROM cache WHERE key = ? AND (expires_at IS NULL OR expires_at > ?)", + key, + now, + ) + .toArray(); + return rows.length > 0 ? rows[0].value : null; + } + + private deleteExpiredCacheValue(key: string, now: number): void { + this.sql.exec( + "DELETE FROM cache WHERE key = ? AND expires_at IS NOT NULL AND expires_at <= ?", + key, + now, + ); + } + + private upsertCacheValue( + key: string, + value: string, + expiresAt: number | null, + ): void { + this.sql.exec( + "INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)", + key, + value, + expiresAt, + ); + } + private nextExpiry(): number | null { const now = Date.now(); const rows = this.sql