Runtime-neutral TypeScript workflow definitions with adapters for Bun and Cloudflare Workflows.
The SDK gives you one typed workflow contract:
- Define workflows once with
defineWorkflow(). - Dispatch standard envelopes with
createWorkflowClient(). - Run the same workflow definitions on Bun, a custom Cloudflare Worker endpoint, or Cloudflare's public Workflows REST API.
- Persist idempotency, cron claims, workflow state, step results, retries, and dead letters through runtime adapters.
Install the package with your preferred JavaScript package manager:
"@abshahin/workflows-sdk": "^0.1.0"The package exports TypeScript source files directly. Use it from runtimes and bundlers that can load TypeScript subpath exports, or compile it as part of your application build.
| Import path | Purpose |
|---|---|
@abshahin/workflows-sdk |
Core workflow definition, registry, client, envelope, scheduler, and error types |
@abshahin/workflows-sdk/http |
SignedHttpAdapter for a custom worker endpoint with /dispatch and /status/:id |
@abshahin/workflows-sdk/cloudflare |
Cloudflare Worker dispatch handler, Workflow entrypoint helper, and REST API adapter |
@abshahin/workflows-sdk/bun |
Bun runtime plus SQLite and Redis adapters |
@abshahin/workflows-sdk/scheduler |
Cron helpers and cron definition types |
@abshahin/workflows-sdk/testing |
In-memory adapter for tests |
Define workflow logic with a name, optional schema, optional cron definitions, optional retry/timeout defaults, and a run() function.
import {
createWorkflowClient,
defineWorkflow,
defineWorkflowRegistry,
} from "@abshahin/workflows-sdk";
import { SignedHttpAdapter } from "@abshahin/workflows-sdk/http";
const sendEmail = defineWorkflow("email/send", {
cron: [
{
name: "daily-digest",
schedule: "0 9 * * *",
payload: { kind: "daily-digest" },
},
],
retry: {
maxAttempts: 3,
initialIntervalMs: 1_000,
multiplier: 2,
maxIntervalMs: 30_000,
},
async run(ctx, payload) {
await ctx.step("send", async () => {
console.log("send email", payload);
});
},
});
export const registry = defineWorkflowRegistry([sendEmail]);
const client = createWorkflowClient({
adapter: new SignedHttpAdapter({
baseUrl: "https://workflows.example.com",
authToken: process.env.WORKFLOWS_AUTH_TOKEN!,
}),
});
await client.dispatch("email/send", { tenantId: "tenant_123" });client.dispatch(name, payload, options) accepts:
| Option | Meaning |
|---|---|
id |
Explicit workflow instance ID |
idempotencyKey |
Deduplication key used by adapters that support idempotency |
delayMs |
Relative delay before the workflow should run |
scheduledAt |
Absolute ISO string or Date for delayed execution |
traceId |
Trace/correlation ID |
metadata |
Extra envelope metadata |
Delayed envelopes are stored as scheduled by Bun adapters. Cloudflare runner helpers sleep inside the Workflow before running user code.
Every workflow receives a ctx object:
| Method/property | Purpose |
|---|---|
ctx.step(name, fn, options?) |
Runs a durable/idempotent step when the adapter/runtime supports step storage |
ctx.sleep(name, durationOrDate) |
Sleeps by duration string, milliseconds, or until a Date |
ctx.dispatch(name, payload, options?) |
Dispatches another workflow through the configured client |
ctx.event |
Original workflow envelope |
ctx.traceId |
Trace ID from the envelope |
ctx.idempotencyKey |
Idempotency key from the envelope |
ctx.logger |
Runtime logger |
Step results are cached by step name and workflow instance ID. Reusing the same step name for different side effects inside one workflow instance will reuse the first stored result.
Use the Bun runtime when you want to execute workflows in a Bun process.
import {
BunSqliteWorkflowAdapter,
createBunWorkflowRuntime,
} from "@abshahin/workflows-sdk/bun";
import { registry } from "./workflows";
const runtime = createBunWorkflowRuntime({
registry,
adapter: new BunSqliteWorkflowAdapter({ path: "workflows.sqlite" }),
concurrency: 4,
scheduler: {
mode: "external",
},
});
await runtime.client.dispatch("email/send", { tenantId: "tenant_123" });
await runtime.tick();
await runtime.processReady();BunSqliteWorkflowAdapter is the recommended single-server adapter. It stores:
- workflow instances
- scheduled and queued state
- idempotency keys
- cron run claims
- step results, including
undefinedresults - dead letters
import { BunSqliteWorkflowAdapter } from "@abshahin/workflows-sdk/bun";
const adapter = new BunSqliteWorkflowAdapter({
path: "workflows.sqlite",
namespace: "production",
});BunRedisWorkflowAdapter is intended for multi-instance Bun deployments. It uses Redis sorted sets, leases, idempotency keys, and step-result hashes.
import { BunRedisWorkflowAdapter } from "@abshahin/workflows-sdk/bun";
const adapter = new BunRedisWorkflowAdapter({
url: Bun.env.REDIS_URL,
namespace: "workflows",
leaseTtlMs: 30_000,
});You can also pass an existing Redis-like client:
const adapter = new BunRedisWorkflowAdapter({
client: myRedisClient,
});The adapter uses Bun's RedisClient when available. Raw Redis commands are sent through redis.send(command, stringArgs).
| Mode | Use case | Behavior |
|---|---|---|
external |
Kubernetes CronJob, systemd timer, queue worker, tests | You call runtime.tick() and/or runtime.processReady() yourself |
in-process |
Long-running Bun process | Registers Bun.cron(schedule, handler) and processes due work in the same process |
os |
Single-server production cron | Registers Bun.cron(path, schedule, title) and expects the target module to export scheduled() |
redis |
Multi-instance Bun deployment | Uses Bun cron as a wake-up mechanism and Redis for claims/leases |
For OS-level Bun cron, register cron jobs from the long-running app:
const runtime = createBunWorkflowRuntime({
registry,
adapter,
scheduler: {
mode: "os",
scriptPath: import.meta.path,
titlePrefix: "my-app-workflows",
},
});
runtime.start();The target module must export Bun's scheduled handler:
import {
BunSqliteWorkflowAdapter,
createBunWorkflowScheduledHandler,
} from "@abshahin/workflows-sdk/bun";
import { registry } from "./workflows";
export default createBunWorkflowScheduledHandler({
registry,
adapter: new BunSqliteWorkflowAdapter({ path: "workflows.sqlite" }),
scheduler: {
mode: "os",
},
});Bun cron uses standard 5-field cron expressions. Bun parses and runs in-process cron schedules in UTC. OS-level Bun cron follows the host timezone because it delegates to the platform scheduler. The SDK accounts for that in scheduled() by evaluating OS cron ticks in the local timezone unless a cron definition sets timezone.
Cron runs use deterministic keys:
${workflowName}:${cronName}:${scheduledAt.toISOString()}SQLite or Redis stores the run key before dispatch, so duplicate wake-ups do not create duplicate workflow instances.
missedRunPolicy defaults to skip. Use catch-up mode when you explicitly want multiple missed runs:
const workflow = defineWorkflow("billing/hourly", {
cron: [
{
name: "hourly",
schedule: "0 * * * *",
missedRunPolicy: { mode: "catch-up-all", maxRuns: 3 },
},
],
run: async () => {},
});The Bun runtime retries failed workflow runs when the workflow has a retry policy and the adapter implements requeue().
- Step-level retry happens inside
ctx.step(). - Workflow-level retry requeues the same instance with attempt metadata.
- If retries are exhausted, the instance is marked
deadand recorded as a dead letter when the adapter supports it. recoverStalled()returns stuckrunninginstances toqueuedorscheduled.
There are two Cloudflare integration paths.
Use SignedHttpAdapter from any producer service that dispatches to your own Worker endpoint:
import { createWorkflowClient } from "@abshahin/workflows-sdk";
import { SignedHttpAdapter } from "@abshahin/workflows-sdk/http";
const client = createWorkflowClient({
adapter: new SignedHttpAdapter({
baseUrl: "https://workflows.worker.example.com",
authToken: Bun.env.WORKFLOWS_AUTH_TOKEN!,
}),
});
await client.dispatch("email/send", { tenantId: "tenant_123" });
await client.getInstance("wf_123", { name: "email/send" });The custom Worker endpoint must expose:
| Endpoint | Purpose |
|---|---|
POST /dispatch |
Accepts { events: WorkflowEventEnvelope[] } and creates Workflow instances |
GET /status/:id?name=<eventName> |
Returns the instance status |
GET /health |
Optional health check |
You can build that Worker with createCloudflareDispatchHandler():
import { createCloudflareDispatchHandler } from "@abshahin/workflows-sdk/cloudflare";
import { registry } from "./workflows";
export default createCloudflareDispatchHandler({
registry,
auth: {
bearerToken: (env: { AUTH_TOKEN: string }) => env.AUTH_TOKEN,
},
resolveWorkflow(eventName, env: { EMAIL_WORKFLOW: Workflow }) {
if (eventName.startsWith("email/")) return env.EMAIL_WORKFLOW;
return null;
},
});The handler calls the Cloudflare binding with Workflow.create({ id, params }). Scheduled envelopes are passed as params and delayed by the Workflow entrypoint helper.
Use createCloudflareWorkflowEntrypoint() when you want Cloudflare Workflows to execute SDK workflow definitions directly.
import { WorkflowEntrypoint } from "cloudflare:workers";
import { createCloudflareWorkflowEntrypoint } from "@abshahin/workflows-sdk/cloudflare";
import { registry } from "./workflows";
interface Env {
AUTH_TOKEN: string;
}
const EmailWorkflowBase = createCloudflareWorkflowEntrypoint(
WorkflowEntrypoint<Env>,
{ registry },
);
export class EmailWorkflow extends EmailWorkflowBase {}The helper maps:
ctx.step()tostep.do()ctx.sleep()tostep.sleep()orstep.sleepUntil()- workflow retry/timeout options to Cloudflare step config
- future
scheduledAtenvelopes to a durable Cloudflare sleep before user code runs
Use CloudflareRestWorkflowAdapter when you want to dispatch directly to Cloudflare's public Workflows REST API instead of your own Worker endpoint.
import { createWorkflowClient } from "@abshahin/workflows-sdk";
import { CloudflareRestWorkflowAdapter } from "@abshahin/workflows-sdk/cloudflare";
const client = createWorkflowClient({
adapter: new CloudflareRestWorkflowAdapter({
accountId: Bun.env.CLOUDFLARE_ACCOUNT_ID!,
apiToken: Bun.env.CLOUDFLARE_API_TOKEN!,
workflowName(eventName) {
if (eventName.startsWith("email/")) return "email-workflow";
throw new Error(`No Cloudflare Workflow for ${eventName}`);
},
}),
});
await client.dispatch("email/send", { tenantId: "tenant_123" });
await client.getInstance("wf_123", { name: "email/send" });The adapter posts:
{
"instance_id": "wf_123",
"params": {
"id": "wf_123",
"name": "email/send",
"payload": {}
}
}to:
/accounts/{account_id}/workflows/{workflow_name}/instancesStatus lookup requires either a prior dispatch through the same adapter instance or getInstance(id, { name }), because Cloudflare status endpoints are scoped to a workflow name.
Use InMemoryWorkflowAdapter for unit tests:
import { createWorkflowClient } from "@abshahin/workflows-sdk";
import { InMemoryWorkflowAdapter } from "@abshahin/workflows-sdk/testing";
const adapter = new InMemoryWorkflowAdapter();
const client = createWorkflowClient({ adapter });
await client.dispatch("email/send", { tenantId: "tenant_123" });For runtime-level tests, use BunSqliteWorkflowAdapter({ path: ":memory:" }).
The root export includes:
WorkflowErrorWorkflowSendErrorWorkflowValidationErrorWorkflowRetryExhaustedErrorWorkflowNotFoundErrorWorkflowAlreadyClaimedError
SignedHttpAdapter marks most non-429 4xx responses as non-retryable by setting error.nonRetryable = true.
- Use Cloudflare Workflows for Worker deployments that need managed durable execution.
- Use SQLite for one Bun process/server.
- Use Redis for multiple Bun workers or multiple scheduler instances.
- Keep workflow instance IDs under Cloudflare's current instance ID limit when using the REST adapter.
- Keep workflow names under Cloudflare's current workflow name limit when using the REST adapter.
- Use unique, stable step names. Step results are keyed by instance ID and step name.
- Do not rely on Bun's fallback cron parser for production semantics. Production Bun scheduling should use Bun's native cron support.
- In-process Bun cron uses UTC. OS-level Bun cron uses the host timezone.
Bun.cron(path, schedule, title)re-registers the same title in place, so keeptitlePrefix, workflow name, and cron name stable.- This package currently ships TypeScript source via exports; compile before publishing to runtimes that cannot load TypeScript directly.
Useful package-level checks:
bun test packages/workflows-sdk/src
bunx tsc -p packages/workflows-sdk/tsconfig.json --noEmit