diff --git a/package.json b/package.json index c5a759c..ce66ad9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@clawnify/clawflow", - "version": "1.0.0", + "version": "1.1.0", "description": "The n8n for agents. A declarative, AI-native workflow format that agents can read, write, and run.", "type": "module", "main": "./dist/index.js", diff --git a/src/core/custom-steps.ts b/src/core/custom-steps.ts new file mode 100644 index 0000000..a6a07f1 --- /dev/null +++ b/src/core/custom-steps.ts @@ -0,0 +1,176 @@ +// ---- Custom Step Registry ------------------------------------------------------ +// Public extension API: hosts register additional `do:` step types at startup. +// Built-in step types (ai, code, http, …) are dispatched first; custom types +// participate in the same do: resolution and the same validation/error shapes. + +import type { FlowState } from "./types.js"; + +/** Names that cannot be overridden by custom steps. Mirrors NODE_KEYS in types.ts. */ +const BUILT_IN_STEP_NAMES = new Set([ + "ai", + "agent", + "branch", + "condition", + "loop", + "parallel", + "http", + "memory", + "wait", + "sleep", + "code", + "exec", +]); + +/** Validator return shape — matches validate.ts ValidationError so errors are indistinguishable from native. */ +export interface CustomStepValidationFailure { + field?: string; + message: string; +} + +export type CustomStepValidatorResult = + | { ok: true } + | { ok: false; errors: CustomStepValidationFailure[] }; + +export type CustomStepValidator = (input: unknown) => CustomStepValidatorResult; + +/** Runtime context handed to a custom step's run() function. */ +export interface CustomStepContext { + /** Frozen deep copy of the flow's current state. Reads only. */ + readonly state: Readonly; + /** Resolved env (merged from flow.env + process.env). Empty object if none. */ + readonly env: Readonly>; + /** Logger that prefixes the node name. */ + readonly logger: { + debug: (msg: string, ...rest: unknown[]) => void; + info: (msg: string, ...rest: unknown[]) => void; + warn: (msg: string, ...rest: unknown[]) => void; + error: (msg: string, ...rest: unknown[]) => void; + }; + /** Aborts when the flow is cancelled. Custom steps should pass this to fetch()/etc. */ + readonly abortSignal: AbortSignal; + /** The node's `name` field — useful for log prefixing or error messages. */ + readonly nodeName: string; + /** + * Resolve a `{{ template }}` string against the live flow state. + * Useful when a custom step accepts non-string input shapes that contain templates. + */ + resolveTemplate: (template: string) => string; +} + +/** + * Definition passed to `registerStepType`. + * + * The runner pre-resolves `{{ template }}` strings in top-level string fields + * and (one level deep) in object fields named `body` or `headers`, mirroring + * how `do: http` handles them. Other fields are passed through unchanged. + * If you need finer control, declare the field as raw and call + * `ctx.resolveTemplate()` yourself. + */ +export interface CustomStepDefinition, TOutput = unknown> { + /** Step name as it will appear in flow JSON (`do: ""`). */ + name: string; + /** + * Top-level node fields (excluding base keys like `name`, `do`, `output`, + * `retry`, `timeout`) that this step accepts. Used by the validator to + * reject typos. If empty, the step takes no input fields. + */ + allowedKeys: readonly string[]; + /** + * Optional field-level validation. Called with the post-template-resolution + * input object. Errors are surfaced through `validateFlow` with the same + * `{ node, field, message }` shape as native validation failures. + */ + validate?: CustomStepValidator; + /** + * Step body. Whatever this returns lands in `state[node.output]`, identical + * to built-ins. Throw to mark the node failed; the error message is traced. + */ + run: (input: TInput, ctx: CustomStepContext) => Promise | TOutput; +} + +/** + * Registry of custom step definitions. A FlowRunner can use a private registry + * (passed via `cfg.customSteps`) for test isolation, or share the module-level + * default via `registerStepType()`. + */ +export class StepRegistry { + private readonly steps = new Map(); + + register(def: CustomStepDefinition): void { + if (!def || typeof def !== "object") { + throw new Error("registerStepType: definition must be an object"); + } + if (typeof def.name !== "string" || !def.name) { + throw new Error("registerStepType: definition.name is required"); + } + if (typeof def.run !== "function") { + throw new Error(`registerStepType("${def.name}"): definition.run must be a function`); + } + if (!Array.isArray(def.allowedKeys)) { + throw new Error(`registerStepType("${def.name}"): definition.allowedKeys must be an array`); + } + if (BUILT_IN_STEP_NAMES.has(def.name)) { + throw new Error( + `registerStepType("${def.name}"): name collides with built-in step type. Pick a different name.`, + ); + } + if (this.steps.has(def.name)) { + throw new Error( + `registerStepType("${def.name}"): already registered. Re-registration is not supported.`, + ); + } + this.steps.set(def.name, def); + } + + get(name: string): CustomStepDefinition | undefined { + return this.steps.get(name); + } + + has(name: string): boolean { + return this.steps.has(name); + } + + /** All registered step names — for diagnostics/error messages. */ + names(): string[] { + return [...this.steps.keys()]; + } + + /** Test/embedder helper: clear all registered steps. Not part of the public stability contract. */ + clear(): void { + this.steps.clear(); + } +} + +/** Module-level default registry shared by host applications. */ +export const defaultRegistry = new StepRegistry(); + +/** + * Register a custom step type on the default registry. Call once at startup. + * + * @example + * import { registerStepType } from "@clawnify/clawflow"; + * + * registerStepType({ + * name: "clawnify_app", + * allowedKeys: ["app_id", "method", "path", "body"], + * validate: (input) => { + * const i = input as Record; + * if (typeof i.app_id !== "string") { + * return { ok: false, errors: [{ field: "app_id", message: "app_id must be a string" }] }; + * } + * return { ok: true }; + * }, + * async run(input, ctx) { + * const res = await fetch(buildUrl(input), { signal: ctx.abortSignal }); + * return { status: res.status, ok: res.ok, body: await res.json() }; + * }, + * }); + */ +export function registerStepType(def: CustomStepDefinition): void { + defaultRegistry.register(def); +} + +/** Internal — exposed for the validator and runner. */ +export function isBuiltInStepName(name: string): boolean { + return BUILT_IN_STEP_NAMES.has(name); +} diff --git a/src/core/runner.ts b/src/core/runner.ts index 97a1cb0..563ec93 100644 --- a/src/core/runner.ts +++ b/src/core/runner.ts @@ -30,6 +30,12 @@ import { } from "./types.js"; import { StateStore } from "./store.js"; import { validateFlow } from "./validate.js"; +import { + defaultRegistry, + type CustomStepContext, + type CustomStepDefinition, + type StepRegistry, +} from "./custom-steps.js"; // ---- Event Bus ------------------------------------------------------------------ // External systems call sendEvent(instanceId, type, payload) to unblock @@ -85,10 +91,18 @@ function applyFilter(val: unknown, filter: string): unknown { export class FlowRunner { private cfg: PluginConfig; private store: StateStore; + private registry: StepRegistry; + /** + * Per-flow AbortControllers. Lazily created in execCustomStep on first + * use so the same flow run sees the same signal across steps. When a + * flow-cancel API is wired in, it should call abort() and delete here. + */ + private abortControllers = new Map(); constructor(cfg: PluginConfig) { this.cfg = cfg; this.store = new StateStore(cfg.stateDir); + this.registry = cfg.customSteps ?? defaultRegistry; } // ---- Start a new run ---------------------------------------------------------- @@ -99,7 +113,7 @@ export class FlowRunner { instanceId?: string, ): Promise { // Static validation before execution - const validation = validateFlow(flow); + const validation = validateFlow(flow, { registry: this.registry }); if (!validation.ok) { const id = instanceId ?? crypto.randomUUID(); const messages = validation.errors.map((e) => @@ -510,8 +524,18 @@ export class FlowRunner { return this.execCode(node as CodeNode, state); case "exec": return this.execExec(node as ExecNode, state); - default: - throw new Error(`Unknown node type: "${(node as FlowNode & { do: string }).do}"`); + default: { + const stepName = (node as FlowNode & { do: string }).do; + const def = this.registry.get(stepName); + if (!def) { + const known = this.registry.names(); + const hint = known.length + ? ` Registered custom steps: ${known.join(", ")}.` + : " No custom steps are registered — is the plugin loaded?"; + throw new Error(`Unknown step type: "${stepName}".${hint}`); + } + return this.execCustomStep(node, def, state, instanceId); + } } } @@ -1172,6 +1196,54 @@ export class FlowRunner { } } + // ---- do: -------------------------------------------------------- + + private async execCustomStep( + node: FlowNode, + def: CustomStepDefinition, + state: FlowState, + instanceId: string, + ): Promise<{ output: unknown }> { + // Build the input view: every declared field, with templates pre-resolved + // (mirrors do: http for url/body/headers). Non-string scalars pass through. + const input: Record = {}; + const raw = node as unknown as Record; + for (const field of def.allowedKeys) { + const value = raw[field]; + if (value === undefined) continue; + input[field] = this.resolveBodyObject(value, state); + } + + // Lazily create a per-flow AbortController. Same signal across all custom + // steps in the run, so cancelling the flow aborts every in-flight step. + let controller = this.abortControllers.get(instanceId); + if (!controller) { + controller = new AbortController(); + this.abortControllers.set(instanceId, controller); + } + + const env = (state.env && typeof state.env === "object") + ? (state.env as Record) + : {}; + const logPrefix = `[${node.name}:${def.name}]`; + const ctx: CustomStepContext = { + state: this.deepFreeze(JSON.parse(JSON.stringify(state))), + env, + logger: { + debug: (msg, ...rest) => console.debug(logPrefix, msg, ...rest), + info: (msg, ...rest) => console.info(logPrefix, msg, ...rest), + warn: (msg, ...rest) => console.warn(logPrefix, msg, ...rest), + error: (msg, ...rest) => console.error(logPrefix, msg, ...rest), + }, + abortSignal: controller.signal, + nodeName: node.name, + resolveTemplate: (template: string) => this.resolveTemplate(template, state), + }; + + const output = await def.run(input, ctx); + return { output }; + } + // ---- do: memory --------------------------------------------------------------- private execMemory( diff --git a/src/core/types.ts b/src/core/types.ts index 91c3b32..747693b 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -387,6 +387,12 @@ export interface PluginConfig { defaultAgent?: string; /** Optional HTTP server config — exposes a generic run endpoint per flow. */ serve?: ServeConfig; + /** + * Optional custom step registry. Defaults to the module-level singleton + * populated by `registerStepType()`. Provide a private registry for + * test isolation or to run multiple FlowRunners with different step sets. + */ + customSteps?: import("./custom-steps.js").StepRegistry; } // ---- Model Shorthands ----------------------------------------------------------- diff --git a/src/core/validate.ts b/src/core/validate.ts index 31a34c7..346ebfe 100644 --- a/src/core/validate.ts +++ b/src/core/validate.ts @@ -15,6 +15,7 @@ import { type CodeNode, type ExecNode, } from "./types.js"; +import { defaultRegistry, type StepRegistry } from "./custom-steps.js"; // ---- Flow Validator ------------------------------------------------------------- // Static checks run before execution to catch common authoring mistakes. @@ -30,7 +31,16 @@ export interface ValidationResult { errors: ValidationError[]; } -export function validateFlow(flow: FlowDefinition): ValidationResult { +export interface ValidateFlowOptions { + /** Registry of custom step types. Defaults to the module-level singleton. */ + registry?: StepRegistry; +} + +export function validateFlow( + flow: FlowDefinition, + opts: ValidateFlowOptions = {}, +): ValidationResult { + const registry = opts.registry ?? defaultRegistry; const errors: ValidationError[] = []; if (!flow.flow || typeof flow.flow !== "string") { @@ -62,7 +72,7 @@ export function validateFlow(flow: FlowDefinition): ValidationResult { errors.push(...nameErrors); // Walk all nodes and validate - validateNodes(flow.nodes, new Set(), errors); + validateNodes(flow.nodes, new Set(), errors, registry); return { ok: errors.length === 0, errors }; } @@ -128,6 +138,7 @@ function validateNodes( nodes: FlowNode[], parentAvailable: Set, errors: ValidationError[], + registry: StepRegistry, ): void { // Available keys: inputs and env are always available + anything from parent scope const available = new Set(parentAvailable); @@ -136,16 +147,16 @@ function validateNodes( for (const node of nodes) { // Validate required fields per node type - validateNodeFields(node, errors); + validateNodeFields(node, errors, registry); // Check template references in string fields - checkTemplateRefs(node, available, errors); + checkTemplateRefs(node, available, errors, registry); // Check state references in branch `on` and condition `if` checkStateRefs(node, available, errors); // Recurse into sub-flows with current available set - validateSubFlows(node, available, errors); + validateSubFlows(node, available, errors, registry); // After this node, its output key becomes available if (node.output) { @@ -154,8 +165,21 @@ function validateNodes( } } +// Base keys allowed on any node (mirrors BASE_KEYS in types.ts). +const BASE_NODE_KEYS: ReadonlySet = new Set([ + "name", + "do", + "output", + "retry", + "timeout", +]); + /** Validate required fields per node type */ -function validateNodeFields(node: FlowNode, errors: ValidationError[]): void { +function validateNodeFields( + node: FlowNode, + errors: ValidationError[], + registry: StepRegistry, +): void { const e = (field: string, msg: string) => errors.push({ node: node.name, field, message: msg }); @@ -165,8 +189,11 @@ function validateNodeFields(node: FlowNode, errors: ValidationError[]): void { return; } - // Check for unknown keys - const allowed = NODE_KEYS[nodeType]; + // Built-in: allowed keys come from NODE_KEYS. Custom: from the registry's allowedKeys. + const customStep = registry.get(nodeType); + const allowed = NODE_KEYS[nodeType] ?? (customStep + ? new Set([...BASE_NODE_KEYS, ...customStep.allowedKeys]) + : undefined); if (allowed) { for (const key of Object.keys(node)) { if (!allowed.has(key)) { @@ -265,12 +292,35 @@ function validateNodeFields(node: FlowNode, errors: ValidationError[]): void { if (!n.command) e("command", `exec node "${node.name}" requires "command"`); break; } - default: - errors.push({ - node: node.name, - field: "do", - message: `Unknown node type "${nodeType}"`, - }); + default: { + if (!customStep) { + errors.push({ + node: node.name, + field: "do", + message: `Unknown node type "${nodeType}"`, + }); + break; + } + // Custom-step validator: build the input view (top-level fields excluding base + // keys) and surface errors with the same `{ node, field, message }` shape. + if (customStep.validate) { + const input: Record = {}; + for (const [k, v] of Object.entries(node)) { + if (!BASE_NODE_KEYS.has(k)) input[k] = v; + } + const result = customStep.validate(input); + if (!result.ok) { + for (const failure of result.errors) { + errors.push({ + node: node.name, + field: failure.field, + message: failure.message, + }); + } + } + } + break; + } } } @@ -279,8 +329,9 @@ function checkTemplateRefs( node: FlowNode, available: Set, errors: ValidationError[], + registry: StepRegistry, ): void { - const strings = collectStringFields(node); + const strings = collectStringFields(node, registry); // Simple path + optional filter const templatePattern = /\{\{\s*([\w.]+)\s*(?:\|\s*\w+)?\s*\}\}/g; // Wildcard: {{ path[*].field }} @@ -398,23 +449,24 @@ function validateSubFlows( node: FlowNode, available: Set, errors: ValidationError[], + registry: StepRegistry, ): void { switch (node.do) { case "branch": { const n = node as BranchNode; for (const nodes of Object.values(n.paths)) { - validateNodes(nodes, available, errors); + validateNodes(nodes, available, errors, registry); } if (n.default) { - validateNodes(n.default, available, errors); + validateNodes(n.default, available, errors, registry); } break; } case "condition": { const n = node as ConditionNode; - validateNodes(n.then, available, errors); + validateNodes(n.then, available, errors, registry); if (n.else) { - validateNodes(n.else, available, errors); + validateNodes(n.else, available, errors, registry); } break; } @@ -422,14 +474,14 @@ function validateSubFlows( const n = node as LoopNode; const loopAvailable = new Set(available); loopAvailable.add(n.as); // loop variable is available inside - validateNodes(n.nodes, loopAvailable, errors); + validateNodes(n.nodes, loopAvailable, errors, registry); break; } case "parallel": { const n = node as ParallelNode; // Each parallel branch has the same available set (they run concurrently) for (const child of n.nodes) { - validateNodes([child], available, errors); + validateNodes([child], available, errors, registry); } break; } @@ -437,10 +489,18 @@ function validateSubFlows( } /** Collect all string-valued fields from a node (for template checking) */ -function collectStringFields(node: FlowNode): { field: string; value: string }[] { +function collectStringFields( + node: FlowNode, + registry: StepRegistry, +): { field: string; value: string }[] { const result: { field: string; value: string }[] = []; - // Fields that contain template-interpolated strings - const templateFields = ["prompt", "task", "url", "key", "value", "run", "body", "if", "command", "cwd", "preview"]; + // Fields that contain template-interpolated strings on built-in nodes. + const builtInTemplateFields = ["prompt", "task", "url", "key", "value", "run", "body", "if", "command", "cwd", "preview"]; + // For custom-step nodes, every declared field is treated as potentially template-bearing. + const customStep = typeof node.do === "string" ? registry.get(node.do) : undefined; + const templateFields = customStep + ? Array.from(new Set([...builtInTemplateFields, ...customStep.allowedKeys])) + : builtInTemplateFields; for (const field of templateFields) { const val = (node as unknown as Record)[field]; diff --git a/src/index.ts b/src/index.ts index d8d00c3..e1dc62f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,7 +3,23 @@ export { FlowRunner, sendEvent } from "./core/runner.js"; export { StateStore } from "./core/store.js"; export { transpileToCloudflare } from "./core/transpile.js"; export { validateFlow } from "./core/validate.js"; -export type { ValidationError, ValidationResult } from "./core/validate.js"; +export type { + ValidationError, + ValidationResult, + ValidateFlowOptions, +} from "./core/validate.js"; +export { + registerStepType, + defaultRegistry, + StepRegistry, +} from "./core/custom-steps.js"; +export type { + CustomStepContext, + CustomStepDefinition, + CustomStepValidator, + CustomStepValidatorResult, + CustomStepValidationFailure, +} from "./core/custom-steps.js"; export type { FlowDefinition, FlowNode, diff --git a/tests/custom-steps.test.ts b/tests/custom-steps.test.ts new file mode 100644 index 0000000..ae2d683 --- /dev/null +++ b/tests/custom-steps.test.ts @@ -0,0 +1,299 @@ +import { describe, it, after } from "node:test"; +import assert from "node:assert/strict"; +import * as fs from "fs"; +import * as path from "path"; +import * as os from "os"; + +import { + FlowRunner, + StepRegistry, + registerStepType, + defaultRegistry, + validateFlow, +} from "../src/index.js"; +import type { FlowDefinition, PluginConfig } from "../src/index.js"; + +const tmpDir = path.join(os.tmpdir(), `ocf-custom-test-${Date.now()}`); +const baseCfg: PluginConfig = { + stateDir: path.join(tmpDir, "state"), + memoryDir: path.join(tmpDir, "memory"), +}; + +function cleanup() { + fs.rmSync(tmpDir, { recursive: true, force: true }); +} + +function freshRunner(extra?: Partial): { runner: FlowRunner; registry: StepRegistry } { + // Use a private registry per test so registrations don't leak across cases. + const registry = new StepRegistry(); + const runner = new FlowRunner({ ...baseCfg, ...extra, customSteps: registry }); + return { runner, registry }; +} + +describe("Custom step types", () => { + after(cleanup); + + it("runs a registered step end-to-end and stores its output", async () => { + const { runner, registry } = freshRunner(); + registry.register({ + name: "echo", + allowedKeys: ["message"], + run: (input) => ({ said: (input as { message: string }).message }), + }); + + const flow: FlowDefinition = { + flow: "test-custom-echo", + nodes: [ + { + name: "say", + do: "echo" as unknown as "code", + // @ts-expect-error custom field not in built-in types + message: "hi", + output: "result", + } as unknown as FlowDefinition["nodes"][number], + ], + }; + + const result = await runner.run(flow, {}); + assert.equal(result.ok, true); + assert.deepEqual(result.state.result, { said: "hi" }); + }); + + it("pre-resolves {{ template }} strings in input fields like http does", async () => { + const { runner, registry } = freshRunner(); + let received: unknown; + registry.register({ + name: "capture", + allowedKeys: ["msg"], + run: (input) => { + received = input; + return null; + }, + }); + + const flow: FlowDefinition = { + flow: "test-custom-template", + nodes: [ + { + name: "cap", + do: "capture" as unknown as "code", + // @ts-expect-error + msg: "hello {{ inputs.who }}", + output: "out", + } as unknown as FlowDefinition["nodes"][number], + ], + }; + + const result = await runner.run(flow, { who: "world" }); + assert.equal(result.ok, true); + assert.deepEqual(received, { msg: "hello world" }); + }); + + it("provides ctx.state, ctx.env, ctx.nodeName, and ctx.abortSignal", async () => { + const { runner, registry } = freshRunner(); + let captured: { hasState: boolean; env: unknown; nodeName: string; signal: boolean } | undefined; + registry.register({ + name: "introspect", + allowedKeys: [], + run: (_input, ctx) => { + captured = { + hasState: typeof ctx.state === "object" && ctx.state !== null, + env: ctx.env, + nodeName: ctx.nodeName, + signal: ctx.abortSignal instanceof AbortSignal, + }; + return "ok"; + }, + }); + + const flow: FlowDefinition = { + flow: "test-custom-ctx", + env: { GREETING: "hi" }, + nodes: [ + { + name: "look", + do: "introspect" as unknown as "code", + output: "out", + } as unknown as FlowDefinition["nodes"][number], + ], + }; + + process.env.GREETING = "hi"; + const result = await runner.run(flow, {}); + delete process.env.GREETING; + assert.equal(result.ok, true, result.error); + assert.ok(captured); + assert.equal(captured!.hasState, true); + assert.equal(captured!.nodeName, "look"); + assert.equal(captured!.signal, true); + assert.deepEqual(captured!.env, { GREETING: "hi" }); + }); + + it("freezes ctx.state so steps can't mutate flow state", async () => { + const { runner, registry } = freshRunner(); + let mutationError: Error | null = null; + registry.register({ + name: "mutator", + allowedKeys: [], + run: (_input, ctx) => { + try { + (ctx.state as Record).foo = "bar"; + } catch (err) { + mutationError = err as Error; + } + return "tried"; + }, + }); + + const flow: FlowDefinition = { + flow: "test-custom-frozen", + nodes: [ + { + name: "mutate", + do: "mutator" as unknown as "code", + output: "out", + } as unknown as FlowDefinition["nodes"][number], + ], + }; + + const result = await runner.run(flow, {}); + assert.equal(result.ok, true); + assert.ok(mutationError instanceof Error); + }); + + it("surfaces validator errors with native validation shape", async () => { + const { registry } = freshRunner(); + registry.register({ + name: "strict", + allowedKeys: ["x"], + validate: (input) => { + const i = input as { x?: unknown }; + if (typeof i.x !== "number") { + return { ok: false, errors: [{ field: "x", message: "x must be a number" }] }; + } + return { ok: true }; + }, + run: () => null, + }); + + const flow: FlowDefinition = { + flow: "test-custom-validate", + nodes: [ + { + name: "v", + do: "strict" as unknown as "code", + // @ts-expect-error + x: "not a number", + } as unknown as FlowDefinition["nodes"][number], + ], + }; + + const result = validateFlow(flow, { registry }); + assert.equal(result.ok, false); + const xErr = result.errors.find((e) => e.field === "x"); + assert.ok(xErr, "expected error on field x"); + assert.equal(xErr!.node, "v"); + assert.match(xErr!.message, /x must be a number/); + }); + + it("rejects unknown fields based on allowedKeys", async () => { + const { registry } = freshRunner(); + registry.register({ + name: "tight", + allowedKeys: ["foo"], + run: () => null, + }); + + const flow: FlowDefinition = { + flow: "test-custom-unknown-field", + nodes: [ + { + name: "t", + do: "tight" as unknown as "code", + // @ts-expect-error + foo: "ok", + // @ts-expect-error + bar: "nope", + } as unknown as FlowDefinition["nodes"][number], + ], + }; + + const result = validateFlow(flow, { registry }); + assert.equal(result.ok, false); + assert.ok(result.errors.some((e) => e.field === "bar")); + }); + + it("throws when a step name collides with a built-in", () => { + const registry = new StepRegistry(); + assert.throws( + () => registry.register({ name: "http", allowedKeys: [], run: () => null }), + /collides with built-in/, + ); + }); + + it("throws on duplicate registration", () => { + const registry = new StepRegistry(); + registry.register({ name: "once", allowedKeys: [], run: () => null }); + assert.throws( + () => registry.register({ name: "once", allowedKeys: [], run: () => null }), + /already registered/, + ); + }); + + it("fails fast with a clear message when the step is not registered", async () => { + const { runner } = freshRunner(); + const flow: FlowDefinition = { + flow: "test-custom-missing", + nodes: [ + { + name: "ghost", + do: "not_registered" as unknown as "code", + } as unknown as FlowDefinition["nodes"][number], + ], + }; + const result = await runner.run(flow, {}); + assert.equal(result.ok, false); + // Static validator catches it first with "Unknown node type" + assert.match(result.error ?? "", /Unknown node type|Unknown step type/); + }); + + it("propagates errors thrown inside run() into the trace", async () => { + const { runner, registry } = freshRunner(); + registry.register({ + name: "boom", + allowedKeys: [], + run: () => { + throw new Error("kaboom"); + }, + }); + + const flow: FlowDefinition = { + flow: "test-custom-throw", + nodes: [ + { + name: "go", + do: "boom" as unknown as "code", + } as unknown as FlowDefinition["nodes"][number], + ], + }; + + const result = await runner.run(flow, {}); + assert.equal(result.ok, false); + assert.match(result.error ?? "", /kaboom/); + const trace = result.trace.find((t) => t.node === "go"); + assert.equal(trace?.status, "error"); + }); + + it("module-level registerStepType writes to the default registry", () => { + const before = defaultRegistry.has("module_level_demo"); + assert.equal(before, false); + registerStepType({ + name: "module_level_demo", + allowedKeys: [], + run: () => "ok", + }); + assert.equal(defaultRegistry.has("module_level_demo"), true); + // Cleanup so this test can be re-run in watch mode + defaultRegistry.clear(); + }); +});