Skip to content

aashahin/workflows-sdk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

13 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

@abshahin/workflows-sdk

Runtime-neutral TypeScript workflow definitions with adapters for Bun and Cloudflare Workflows.

The SDK gives you one typed workflow contract:

  • Define workflows once with defineWorkflow().
  • Dispatch standard envelopes with createWorkflowClient().
  • Run the same workflow definitions on Bun, a custom Cloudflare Worker endpoint, or Cloudflare's public Workflows REST API.
  • Persist idempotency, cron claims, workflow state, step results, retries, and dead letters through runtime adapters.

Installation

Install the package with your preferred JavaScript package manager:

"@abshahin/workflows-sdk": "^0.1.0"

The package exports TypeScript source files directly. Use it from runtimes and bundlers that can load TypeScript subpath exports, or compile it as part of your application build.

Exports

Import path Purpose
@abshahin/workflows-sdk Core workflow definition, registry, client, envelope, scheduler, and error types
@abshahin/workflows-sdk/http SignedHttpAdapter for a custom worker endpoint with /dispatch and /status/:id
@abshahin/workflows-sdk/cloudflare Cloudflare Worker dispatch handler, Workflow entrypoint helper, and REST API adapter
@abshahin/workflows-sdk/bun Bun runtime plus SQLite and Redis adapters
@abshahin/workflows-sdk/scheduler Cron helpers and cron definition types
@abshahin/workflows-sdk/testing In-memory adapter for tests

Core API

Define workflow logic with a name, optional schema, optional cron definitions, optional retry/timeout defaults, and a run() function.

import {
  createWorkflowClient,
  defineWorkflow,
  defineWorkflowRegistry,
} from "@abshahin/workflows-sdk";
import { SignedHttpAdapter } from "@abshahin/workflows-sdk/http";

const sendEmail = defineWorkflow("email/send", {
  cron: [
    {
      name: "daily-digest",
      schedule: "0 9 * * *",
      payload: { kind: "daily-digest" },
    },
  ],
  retry: {
    maxAttempts: 3,
    initialIntervalMs: 1_000,
    multiplier: 2,
    maxIntervalMs: 30_000,
  },
  async run(ctx, payload) {
    await ctx.step("send", async () => {
      console.log("send email", payload);
    });
  },
});

export const registry = defineWorkflowRegistry([sendEmail]);

const client = createWorkflowClient({
  adapter: new SignedHttpAdapter({
    baseUrl: "https://workflows.example.com",
    authToken: process.env.WORKFLOWS_AUTH_TOKEN!,
  }),
});

await client.dispatch("email/send", { tenantId: "tenant_123" });

Dispatch Options

client.dispatch(name, payload, options) accepts:

Option Meaning
id Explicit workflow instance ID
idempotencyKey Deduplication key used by adapters that support idempotency
delayMs Relative delay before the workflow should run
scheduledAt Absolute ISO string or Date for delayed execution
traceId Trace/correlation ID
metadata Extra envelope metadata

Delayed envelopes are stored as scheduled by Bun adapters. Cloudflare runner helpers sleep inside the Workflow before running user code.

Run Context

Every workflow receives a ctx object:

Method/property Purpose
ctx.step(name, fn, options?) Runs a durable/idempotent step when the adapter/runtime supports step storage
ctx.sleep(name, durationOrDate) Sleeps by duration string, milliseconds, or until a Date
ctx.dispatch(name, payload, options?) Dispatches another workflow through the configured client
ctx.event Original workflow envelope
ctx.traceId Trace ID from the envelope
ctx.idempotencyKey Idempotency key from the envelope
ctx.logger Runtime logger

Step results are cached by step name and workflow instance ID. Reusing the same step name for different side effects inside one workflow instance will reuse the first stored result.

Bun Runtime

Use the Bun runtime when you want to execute workflows in a Bun process.

import {
  BunSqliteWorkflowAdapter,
  createBunWorkflowRuntime,
} from "@abshahin/workflows-sdk/bun";
import { registry } from "./workflows";

const runtime = createBunWorkflowRuntime({
  registry,
  adapter: new BunSqliteWorkflowAdapter({ path: "workflows.sqlite" }),
  concurrency: 4,
  scheduler: {
    mode: "external",
  },
});

await runtime.client.dispatch("email/send", { tenantId: "tenant_123" });
await runtime.tick();
await runtime.processReady();

SQLite Adapter

BunSqliteWorkflowAdapter is the recommended single-server adapter. It stores:

  • workflow instances
  • scheduled and queued state
  • idempotency keys
  • cron run claims
  • step results, including undefined results
  • dead letters
import { BunSqliteWorkflowAdapter } from "@abshahin/workflows-sdk/bun";

const adapter = new BunSqliteWorkflowAdapter({
  path: "workflows.sqlite",
  namespace: "production",
});

Redis Adapter

BunRedisWorkflowAdapter is intended for multi-instance Bun deployments. It uses Redis sorted sets, leases, idempotency keys, and step-result hashes.

import { BunRedisWorkflowAdapter } from "@abshahin/workflows-sdk/bun";

const adapter = new BunRedisWorkflowAdapter({
  url: Bun.env.REDIS_URL,
  namespace: "workflows",
  leaseTtlMs: 30_000,
});

You can also pass an existing Redis-like client:

const adapter = new BunRedisWorkflowAdapter({
  client: myRedisClient,
});

The adapter uses Bun's RedisClient when available. Raw Redis commands are sent through redis.send(command, stringArgs).

Scheduler Modes

Mode Use case Behavior
external Kubernetes CronJob, systemd timer, queue worker, tests You call runtime.tick() and/or runtime.processReady() yourself
in-process Long-running Bun process Registers Bun.cron(schedule, handler) and processes due work in the same process
os Single-server production cron Registers Bun.cron(path, schedule, title) and expects the target module to export scheduled()
redis Multi-instance Bun deployment Uses Bun cron as a wake-up mechanism and Redis for claims/leases

For OS-level Bun cron, register cron jobs from the long-running app:

const runtime = createBunWorkflowRuntime({
  registry,
  adapter,
  scheduler: {
    mode: "os",
    scriptPath: import.meta.path,
    titlePrefix: "my-app-workflows",
  },
});

runtime.start();

The target module must export Bun's scheduled handler:

import {
  BunSqliteWorkflowAdapter,
  createBunWorkflowScheduledHandler,
} from "@abshahin/workflows-sdk/bun";
import { registry } from "./workflows";

export default createBunWorkflowScheduledHandler({
  registry,
  adapter: new BunSqliteWorkflowAdapter({ path: "workflows.sqlite" }),
  scheduler: {
    mode: "os",
  },
});

Bun cron uses standard 5-field cron expressions. Bun parses and runs in-process cron schedules in UTC. OS-level Bun cron follows the host timezone because it delegates to the platform scheduler. The SDK accounts for that in scheduled() by evaluating OS cron ticks in the local timezone unless a cron definition sets timezone.

Cron Idempotency

Cron runs use deterministic keys:

${workflowName}:${cronName}:${scheduledAt.toISOString()}

SQLite or Redis stores the run key before dispatch, so duplicate wake-ups do not create duplicate workflow instances.

missedRunPolicy defaults to skip. Use catch-up mode when you explicitly want multiple missed runs:

const workflow = defineWorkflow("billing/hourly", {
  cron: [
    {
      name: "hourly",
      schedule: "0 * * * *",
      missedRunPolicy: { mode: "catch-up-all", maxRuns: 3 },
    },
  ],
  run: async () => {},
});

Bun Retries and Recovery

The Bun runtime retries failed workflow runs when the workflow has a retry policy and the adapter implements requeue().

  • Step-level retry happens inside ctx.step().
  • Workflow-level retry requeues the same instance with attempt metadata.
  • If retries are exhausted, the instance is marked dead and recorded as a dead letter when the adapter supports it.
  • recoverStalled() returns stuck running instances to queued or scheduled.

Cloudflare Workflows

There are two Cloudflare integration paths.

Custom Worker Endpoint

Use SignedHttpAdapter from any producer service that dispatches to your own Worker endpoint:

import { createWorkflowClient } from "@abshahin/workflows-sdk";
import { SignedHttpAdapter } from "@abshahin/workflows-sdk/http";

const client = createWorkflowClient({
  adapter: new SignedHttpAdapter({
    baseUrl: "https://workflows.worker.example.com",
    authToken: Bun.env.WORKFLOWS_AUTH_TOKEN!,
  }),
});

await client.dispatch("email/send", { tenantId: "tenant_123" });
await client.getInstance("wf_123", { name: "email/send" });

The custom Worker endpoint must expose:

Endpoint Purpose
POST /dispatch Accepts { events: WorkflowEventEnvelope[] } and creates Workflow instances
GET /status/:id?name=<eventName> Returns the instance status
GET /health Optional health check

You can build that Worker with createCloudflareDispatchHandler():

import { createCloudflareDispatchHandler } from "@abshahin/workflows-sdk/cloudflare";
import { registry } from "./workflows";

export default createCloudflareDispatchHandler({
  registry,
  auth: {
    bearerToken: (env: { AUTH_TOKEN: string }) => env.AUTH_TOKEN,
  },
  resolveWorkflow(eventName, env: { EMAIL_WORKFLOW: Workflow }) {
    if (eventName.startsWith("email/")) return env.EMAIL_WORKFLOW;
    return null;
  },
});

The handler calls the Cloudflare binding with Workflow.create({ id, params }). Scheduled envelopes are passed as params and delayed by the Workflow entrypoint helper.

Workflow Entrypoint Helper

Use createCloudflareWorkflowEntrypoint() when you want Cloudflare Workflows to execute SDK workflow definitions directly.

import { WorkflowEntrypoint } from "cloudflare:workers";
import { createCloudflareWorkflowEntrypoint } from "@abshahin/workflows-sdk/cloudflare";
import { registry } from "./workflows";

interface Env {
  AUTH_TOKEN: string;
}

const EmailWorkflowBase = createCloudflareWorkflowEntrypoint(
  WorkflowEntrypoint<Env>,
  { registry },
);

export class EmailWorkflow extends EmailWorkflowBase {}

The helper maps:

  • ctx.step() to step.do()
  • ctx.sleep() to step.sleep() or step.sleepUntil()
  • workflow retry/timeout options to Cloudflare step config
  • future scheduledAt envelopes to a durable Cloudflare sleep before user code runs

Direct Cloudflare REST API

Use CloudflareRestWorkflowAdapter when you want to dispatch directly to Cloudflare's public Workflows REST API instead of your own Worker endpoint.

import { createWorkflowClient } from "@abshahin/workflows-sdk";
import { CloudflareRestWorkflowAdapter } from "@abshahin/workflows-sdk/cloudflare";

const client = createWorkflowClient({
  adapter: new CloudflareRestWorkflowAdapter({
    accountId: Bun.env.CLOUDFLARE_ACCOUNT_ID!,
    apiToken: Bun.env.CLOUDFLARE_API_TOKEN!,
    workflowName(eventName) {
      if (eventName.startsWith("email/")) return "email-workflow";
      throw new Error(`No Cloudflare Workflow for ${eventName}`);
    },
  }),
});

await client.dispatch("email/send", { tenantId: "tenant_123" });
await client.getInstance("wf_123", { name: "email/send" });

The adapter posts:

{
  "instance_id": "wf_123",
  "params": {
    "id": "wf_123",
    "name": "email/send",
    "payload": {}
  }
}

to:

/accounts/{account_id}/workflows/{workflow_name}/instances

Status lookup requires either a prior dispatch through the same adapter instance or getInstance(id, { name }), because Cloudflare status endpoints are scoped to a workflow name.

Testing

Use InMemoryWorkflowAdapter for unit tests:

import { createWorkflowClient } from "@abshahin/workflows-sdk";
import { InMemoryWorkflowAdapter } from "@abshahin/workflows-sdk/testing";

const adapter = new InMemoryWorkflowAdapter();
const client = createWorkflowClient({ adapter });

await client.dispatch("email/send", { tenantId: "tenant_123" });

For runtime-level tests, use BunSqliteWorkflowAdapter({ path: ":memory:" }).

Error Classes

The root export includes:

  • WorkflowError
  • WorkflowSendError
  • WorkflowValidationError
  • WorkflowRetryExhaustedError
  • WorkflowNotFoundError
  • WorkflowAlreadyClaimedError

SignedHttpAdapter marks most non-429 4xx responses as non-retryable by setting error.nonRetryable = true.

Production Notes

  • Use Cloudflare Workflows for Worker deployments that need managed durable execution.
  • Use SQLite for one Bun process/server.
  • Use Redis for multiple Bun workers or multiple scheduler instances.
  • Keep workflow instance IDs under Cloudflare's current instance ID limit when using the REST adapter.
  • Keep workflow names under Cloudflare's current workflow name limit when using the REST adapter.
  • Use unique, stable step names. Step results are keyed by instance ID and step name.
  • Do not rely on Bun's fallback cron parser for production semantics. Production Bun scheduling should use Bun's native cron support.
  • In-process Bun cron uses UTC. OS-level Bun cron uses the host timezone.
  • Bun.cron(path, schedule, title) re-registers the same title in place, so keep titlePrefix, workflow name, and cron name stable.
  • This package currently ships TypeScript source via exports; compile before publishing to runtimes that cannot load TypeScript directly.

Verification

Useful package-level checks:

bun test packages/workflows-sdk/src
bunx tsc -p packages/workflows-sdk/tsconfig.json --noEmit

About

TypeScript SDK for dispatching typed background workflow events to a Cloudflare Worker runtime.

Topics

Resources

License

Stars

Watchers

Forks

Contributors