Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@clawnify/clawflow",
"version": "1.0.0",
"version": "1.1.0",
"description": "The n8n for agents. A declarative, AI-native workflow format that agents can read, write, and run.",
"type": "module",
"main": "./dist/index.js",
Expand Down
176 changes: 176 additions & 0 deletions src/core/custom-steps.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// ---- Custom Step Registry ------------------------------------------------------
// Public extension API: hosts register additional `do:` step types at startup.
// Built-in step types (ai, code, http, …) are dispatched first; custom types
// participate in the same do: resolution and the same validation/error shapes.

import type { FlowState } from "./types.js";

/** Names that cannot be overridden by custom steps. Mirrors NODE_KEYS in types.ts. */
const BUILT_IN_STEP_NAMES = new Set([
"ai",
"agent",
"branch",
"condition",
"loop",
"parallel",
"http",
"memory",
"wait",
"sleep",
"code",
"exec",
]);

/** Validator return shape — matches validate.ts ValidationError so errors are indistinguishable from native. */
export interface CustomStepValidationFailure {
field?: string;
message: string;
}

export type CustomStepValidatorResult =
| { ok: true }
| { ok: false; errors: CustomStepValidationFailure[] };

export type CustomStepValidator = (input: unknown) => CustomStepValidatorResult;

/** Runtime context handed to a custom step's run() function. */
export interface CustomStepContext {
/** Frozen deep copy of the flow's current state. Reads only. */
readonly state: Readonly<FlowState>;
/** Resolved env (merged from flow.env + process.env). Empty object if none. */
readonly env: Readonly<Record<string, string>>;
/** Logger that prefixes the node name. */
readonly logger: {
debug: (msg: string, ...rest: unknown[]) => void;
info: (msg: string, ...rest: unknown[]) => void;
warn: (msg: string, ...rest: unknown[]) => void;
error: (msg: string, ...rest: unknown[]) => void;
};
/** Aborts when the flow is cancelled. Custom steps should pass this to fetch()/etc. */
readonly abortSignal: AbortSignal;
/** The node's `name` field — useful for log prefixing or error messages. */
readonly nodeName: string;
/**
* Resolve a `{{ template }}` string against the live flow state.
* Useful when a custom step accepts non-string input shapes that contain templates.
*/
resolveTemplate: (template: string) => string;
}

/**
* Definition passed to `registerStepType`.
*
* The runner pre-resolves `{{ template }}` strings in top-level string fields
* and (one level deep) in object fields named `body` or `headers`, mirroring
* how `do: http` handles them. Other fields are passed through unchanged.
* If you need finer control, declare the field as raw and call
* `ctx.resolveTemplate()` yourself.
*/
export interface CustomStepDefinition<TInput = Record<string, unknown>, TOutput = unknown> {
/** Step name as it will appear in flow JSON (`do: "<name>"`). */
name: string;
/**
* Top-level node fields (excluding base keys like `name`, `do`, `output`,
* `retry`, `timeout`) that this step accepts. Used by the validator to
* reject typos. If empty, the step takes no input fields.
*/
allowedKeys: readonly string[];
/**
* Optional field-level validation. Called with the post-template-resolution
* input object. Errors are surfaced through `validateFlow` with the same
* `{ node, field, message }` shape as native validation failures.
*/
validate?: CustomStepValidator;
/**
* Step body. Whatever this returns lands in `state[node.output]`, identical
* to built-ins. Throw to mark the node failed; the error message is traced.
*/
run: (input: TInput, ctx: CustomStepContext) => Promise<TOutput> | TOutput;
}

/**
* Registry of custom step definitions. A FlowRunner can use a private registry
* (passed via `cfg.customSteps`) for test isolation, or share the module-level
* default via `registerStepType()`.
*/
export class StepRegistry {
private readonly steps = new Map<string, CustomStepDefinition>();

register(def: CustomStepDefinition): void {
if (!def || typeof def !== "object") {
throw new Error("registerStepType: definition must be an object");
}
if (typeof def.name !== "string" || !def.name) {
throw new Error("registerStepType: definition.name is required");
}
if (typeof def.run !== "function") {
throw new Error(`registerStepType("${def.name}"): definition.run must be a function`);
}
if (!Array.isArray(def.allowedKeys)) {
throw new Error(`registerStepType("${def.name}"): definition.allowedKeys must be an array`);
}
if (BUILT_IN_STEP_NAMES.has(def.name)) {
throw new Error(
`registerStepType("${def.name}"): name collides with built-in step type. Pick a different name.`,
);
}
if (this.steps.has(def.name)) {
throw new Error(
`registerStepType("${def.name}"): already registered. Re-registration is not supported.`,
);
}
this.steps.set(def.name, def);
}

get(name: string): CustomStepDefinition | undefined {
return this.steps.get(name);
}

has(name: string): boolean {
return this.steps.has(name);
}

/** All registered step names — for diagnostics/error messages. */
names(): string[] {
return [...this.steps.keys()];
}

/** Test/embedder helper: clear all registered steps. Not part of the public stability contract. */
clear(): void {
this.steps.clear();
}
}

/** Module-level default registry shared by host applications. */
export const defaultRegistry = new StepRegistry();

/**
* Register a custom step type on the default registry. Call once at startup.
*
* @example
* import { registerStepType } from "@clawnify/clawflow";
*
* registerStepType({
* name: "clawnify_app",
* allowedKeys: ["app_id", "method", "path", "body"],
* validate: (input) => {
* const i = input as Record<string, unknown>;
* if (typeof i.app_id !== "string") {
* return { ok: false, errors: [{ field: "app_id", message: "app_id must be a string" }] };
* }
* return { ok: true };
* },
* async run(input, ctx) {
* const res = await fetch(buildUrl(input), { signal: ctx.abortSignal });
* return { status: res.status, ok: res.ok, body: await res.json() };
* },
* });
*/
export function registerStepType(def: CustomStepDefinition): void {
defaultRegistry.register(def);
}

/** Internal — exposed for the validator and runner. */
export function isBuiltInStepName(name: string): boolean {
return BUILT_IN_STEP_NAMES.has(name);
}
78 changes: 75 additions & 3 deletions src/core/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import {
} from "./types.js";
import { StateStore } from "./store.js";
import { validateFlow } from "./validate.js";
import {
defaultRegistry,
type CustomStepContext,
type CustomStepDefinition,
type StepRegistry,
} from "./custom-steps.js";

// ---- Event Bus ------------------------------------------------------------------
// External systems call sendEvent(instanceId, type, payload) to unblock
Expand Down Expand Up @@ -85,10 +91,18 @@ function applyFilter(val: unknown, filter: string): unknown {
export class FlowRunner {
private cfg: PluginConfig;
private store: StateStore;
private registry: StepRegistry;
/**
* Per-flow AbortControllers. Lazily created in execCustomStep on first
* use so the same flow run sees the same signal across steps. When a
* flow-cancel API is wired in, it should call abort() and delete here.
*/
private abortControllers = new Map<string, AbortController>();

constructor(cfg: PluginConfig) {
this.cfg = cfg;
this.store = new StateStore(cfg.stateDir);
this.registry = cfg.customSteps ?? defaultRegistry;
}

// ---- Start a new run ----------------------------------------------------------
Expand All @@ -99,7 +113,7 @@ export class FlowRunner {
instanceId?: string,
): Promise<FlowResult> {
// Static validation before execution
const validation = validateFlow(flow);
const validation = validateFlow(flow, { registry: this.registry });
if (!validation.ok) {
const id = instanceId ?? crypto.randomUUID();
const messages = validation.errors.map((e) =>
Expand Down Expand Up @@ -510,8 +524,18 @@ export class FlowRunner {
return this.execCode(node as CodeNode, state);
case "exec":
return this.execExec(node as ExecNode, state);
default:
throw new Error(`Unknown node type: "${(node as FlowNode & { do: string }).do}"`);
default: {
const stepName = (node as FlowNode & { do: string }).do;
const def = this.registry.get(stepName);
if (!def) {
const known = this.registry.names();
const hint = known.length
? ` Registered custom steps: ${known.join(", ")}.`
: " No custom steps are registered — is the plugin loaded?";
throw new Error(`Unknown step type: "${stepName}".${hint}`);
}
return this.execCustomStep(node, def, state, instanceId);
}
}
}

Expand Down Expand Up @@ -1172,6 +1196,54 @@ export class FlowRunner {
}
}

// ---- do: <custom step> --------------------------------------------------------

private async execCustomStep(
node: FlowNode,
def: CustomStepDefinition,
state: FlowState,
instanceId: string,
): Promise<{ output: unknown }> {
// Build the input view: every declared field, with templates pre-resolved
// (mirrors do: http for url/body/headers). Non-string scalars pass through.
const input: Record<string, unknown> = {};
const raw = node as unknown as Record<string, unknown>;
for (const field of def.allowedKeys) {
const value = raw[field];
if (value === undefined) continue;
input[field] = this.resolveBodyObject(value, state);
}

// Lazily create a per-flow AbortController. Same signal across all custom
// steps in the run, so cancelling the flow aborts every in-flight step.
let controller = this.abortControllers.get(instanceId);
if (!controller) {
controller = new AbortController();
this.abortControllers.set(instanceId, controller);
}

const env = (state.env && typeof state.env === "object")
? (state.env as Record<string, string>)
: {};
const logPrefix = `[${node.name}:${def.name}]`;
const ctx: CustomStepContext = {
state: this.deepFreeze(JSON.parse(JSON.stringify(state))),
env,
logger: {
debug: (msg, ...rest) => console.debug(logPrefix, msg, ...rest),
info: (msg, ...rest) => console.info(logPrefix, msg, ...rest),
warn: (msg, ...rest) => console.warn(logPrefix, msg, ...rest),
error: (msg, ...rest) => console.error(logPrefix, msg, ...rest),
},
abortSignal: controller.signal,
nodeName: node.name,
resolveTemplate: (template: string) => this.resolveTemplate(template, state),
};

const output = await def.run(input, ctx);
return { output };
}

// ---- do: memory ---------------------------------------------------------------

private execMemory(
Expand Down
6 changes: 6 additions & 0 deletions src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@ export interface PluginConfig {
defaultAgent?: string;
/** Optional HTTP server config — exposes a generic run endpoint per flow. */
serve?: ServeConfig;
/**
* Optional custom step registry. Defaults to the module-level singleton
* populated by `registerStepType()`. Provide a private registry for
* test isolation or to run multiple FlowRunners with different step sets.
*/
customSteps?: import("./custom-steps.js").StepRegistry;
}

// ---- Model Shorthands -----------------------------------------------------------
Expand Down
Loading
Loading