Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ coverage/
*.tsbuildinfo
.DS_Store
.wrangler/
.pi/
example/package-lock.json
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![npm downloads](https://img.shields.io/npm/dm/chat-state-cloudflare-do)](https://www.npmjs.com/package/chat-state-cloudflare-do)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)

Cloudflare Durable Objects state adapter for [Chat SDK](https://chat-sdk.dev/docs). Uses a SQLite-backed [Durable Object](https://developers.cloudflare.com/durable-objects/) for persistent subscriptions, distributed locking, and caching — with zero external dependencies beyond the Workers runtime.
Cloudflare Durable Objects state adapter for [Chat SDK](https://chat-sdk.dev/docs). Uses a SQLite-backed [Durable Object](https://developers.cloudflare.com/durable-objects/) for persistent subscriptions, distributed locking, queueing, list-backed message history, and caching — with zero external dependencies beyond the Workers runtime.

## Installation

Expand Down Expand Up @@ -97,7 +97,7 @@ const state = createCloudflareState({
});
```

Locks and subscriptions are per-thread, so sharding by any prefix of the thread ID is safe. Cache operations (`get`/`set`/`delete`) always route to the default shard since their keys are not thread-scoped.
Locks, force-release, and queue operations are per-thread, so sharding by any prefix of the thread ID is safe. Cache and list operations (`get`/`set`/`delete`, `appendToList`/`getList`) always route to the default shard since their keys are not thread-scoped.

| Strategy | `shardKey` | DOs created |
|----------|-----------|-------------|
Expand All @@ -107,11 +107,13 @@ Locks and subscriptions are per-thread, so sharding by any prefix of the thread

## Architecture

The adapter uses a single Durable Object class (`ChatStateDO`) with three SQLite tables:
The adapter uses a single Durable Object class (`ChatStateDO`) with five SQLite tables:

- **`subscriptions`** — thread IDs the bot is subscribed to
- **`locks`** — distributed locks with token-based ownership and TTL
- **`cache`** — key-value pairs with optional TTL
- **`queue`** — thread-scoped FIFO queue entries with TTL for concurrency strategies
- **`lists`** — ordered list entries for persistent message history

All operations are single-threaded within a DO instance, providing distributed locking via DO atomicity rather than Lua scripts. Expired entries are cleaned up automatically via the [Alarms API](https://developers.cloudflare.com/durable-objects/api/alarms/).

Expand All @@ -121,6 +123,9 @@ Each method call creates a fresh DO stub. Stubs are cheap (just a JS object) and

- Persistent subscriptions across deployments
- Distributed locking via single-threaded DO atomicity
- Lock force-release for Chat SDK lock conflict handling
- Queue/debounce concurrency primitives
- List-backed persistent message history
- Key-value caching with TTL
- Automatic TTL cleanup via Alarms
- Optional sharding for high-traffic bots
Expand Down
32 changes: 21 additions & 11 deletions src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ import type { Lock, QueueEntry, StateAdapter } from "chat";
import type { ChatStateDO } from "./durable-object";
import type { CloudflareStateOptions } from "./types";

function parseStoredJson<T>(raw: string, label: string): T {
try {
return JSON.parse(raw) as T;
} catch (error) {
throw new Error(
`CloudflareDOStateAdapter: expected JSON-encoded ${label}`,
{ cause: error },
);
}
}

/**
* Chat SDK state adapter backed by Cloudflare Durable Objects.
*
Expand Down Expand Up @@ -120,8 +131,12 @@ export class CloudflareDOStateAdapter implements StateAdapter {

async dequeue(threadId: string): Promise<QueueEntry | null> {
const raw = await this.stub(threadId).dequeue(threadId);
if (raw === null) return null;
return JSON.parse(raw) as QueueEntry;
return raw === null
? null
: parseStoredJson<QueueEntry>(
raw,
`queue entry for thread "${threadId}"`,
);
}

async queueDepth(threadId: string): Promise<number> {
Expand All @@ -147,7 +162,9 @@ export class CloudflareDOStateAdapter implements StateAdapter {

async getList<T = unknown>(key: string): Promise<T[]> {
const raw = await this.stub().listGet(key);
return raw.map((v: string) => JSON.parse(v) as T);
return raw.map((v: string) =>
parseStoredJson<T>(v, `list entry for key "${key}"`),
);
}

// -- Cache ---------------------------------------------------------------
Expand All @@ -165,14 +182,7 @@ export class CloudflareDOStateAdapter implements StateAdapter {
if (raw === null) {
return null;
}
try {
return JSON.parse(raw) as T;
} catch {
// Defensive: set() always JSON.stringify's, so parse should never
// fail through the public API. This handles values written directly
// to the DO's cache table outside of the adapter.
return raw as unknown as T;
}
return parseStoredJson<T>(raw, `cache value for key "${key}"`);
}

async set<T = unknown>(key: string, value: T, ttlMs?: number): Promise<void> {
Expand Down
95 changes: 44 additions & 51 deletions src/durable-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,69 +356,33 @@ export class ChatStateDO<TEnv = unknown> extends DurableObject<TEnv> {
// -- Cache ---------------------------------------------------------------

cacheGet(key: string): string | null {
const now = Date.now();
const rows = this.sql
.exec<{ value: string }>(
"SELECT value FROM cache WHERE key = ? AND (expires_at IS NULL OR expires_at > ?)",
key,
now
)
.toArray();
return rows.length > 0 ? rows[0].value : null;
return this.readCacheValue(key, Date.now());
}

cacheSet(key: string, value: string, ttlMs?: number): void {
// ttlMs of 0, null, or undefined means "no expiry" — matches Redis adapter
// behavior where falsy ttlMs persists the entry forever.
const expiresAt = ttlMs ? Date.now() + ttlMs : null;
this.sql.exec(
"INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
key,
value,
expiresAt
);
// Only schedule alarm when we actually added an expiring entry —
// avoids a wasted nextExpiry() SQL scan on permanent cache writes.
this.upsertCacheValue(key, value, expiresAt);

if (expiresAt != null) {
this.scheduleCleanupIfNeeded();
}
}

/**
* Set the key only if it does not exist (or is expired). Returns true if
* the value was set, false if the key already existed and is not expired.
*/
cacheSetIfNotExists(key: string, value: string, ttlMs?: number): boolean {
const now = Date.now();
const result = this.ctx.storage.transactionSync(() => {
const existing = this.sql
.exec(
"SELECT 1 FROM cache WHERE key = ? AND (expires_at IS NULL OR expires_at > ?)",
key,
now
)
.toArray();
if (existing.length > 0) {
this.deleteExpiredCacheValue(key, now);

const existing = this.readCacheValue(key, now);
if (existing !== null) {
return { inserted: false, expiresAt: null as number | null };
}
// Remove any expired row that's still physically present —
// without this, the INSERT below would hit a PRIMARY KEY violation.
this.sql.exec(
"DELETE FROM cache WHERE key = ? AND expires_at IS NOT NULL AND expires_at <= ?",
key,
now
);
const expiresAt = ttlMs ? Date.now() + ttlMs : null;
this.sql.exec(
"INSERT INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
key,
value,
expiresAt
);

const expiresAt = ttlMs ? now + ttlMs : null;
this.upsertCacheValue(key, value, expiresAt);
return { inserted: true, expiresAt };
});

// Schedule alarm outside the transaction — same pattern as acquireLock().
if (result.inserted && result.expiresAt != null) {
this.scheduleCleanupIfNeeded();
}
Expand Down Expand Up @@ -463,11 +427,40 @@ export class ChatStateDO<TEnv = unknown> extends DurableObject<TEnv> {
}
}

/**
* Find the earliest future expiration timestamp across all tables.
* Filters out already-expired rows to avoid scheduling unnecessary
* immediate alarms.
*/
// -- Cache helpers (private) -----------------------------------------------

private readCacheValue(key: string, now: number): string | null {
const rows = this.sql
.exec<{ value: string }>(
"SELECT value FROM cache WHERE key = ? AND (expires_at IS NULL OR expires_at > ?)",
key,
now,
)
.toArray();
return rows.length > 0 ? rows[0].value : null;
}

private deleteExpiredCacheValue(key: string, now: number): void {
this.sql.exec(
"DELETE FROM cache WHERE key = ? AND expires_at IS NOT NULL AND expires_at <= ?",
key,
now,
);
}

private upsertCacheValue(
key: string,
value: string,
expiresAt: number | null,
): void {
this.sql.exec(
"INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
key,
value,
expiresAt,
);
}

private nextExpiry(): number | null {
const now = Date.now();
const rows = this.sql
Expand Down
Loading