Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
201c231
feat(workflow-executor): add ReadRecordStepExecutor
Mar 18, 2026
14b4783
refactor(workflow-executor): apply review suggestions on ReadRecordSt…
Mar 18, 2026
ecbd234
docs: document feat/prd-214 as main working branch for workflow-executor
Mar 18, 2026
a6af521
fix(workflow-executor): add displayName to FieldReadError for interfa…
Mar 18, 2026
2b3f2b6
refactor(workflow-executor): unify FieldReadSuccess/FieldReadError in…
Mar 18, 2026
9d912a9
refactor(workflow-executor): extract FieldReadBase to share common fi…
Mar 18, 2026
0dfb857
refactor(workflow-executor): move toRecordIdentifier to private stati…
Mar 18, 2026
f174395
feat(workflow-executor): include step index in record identifier
Mar 18, 2026
9e71c08
refactor(workflow-executor): make stepIndex required on RecordData
Mar 18, 2026
e2c9cf8
refactor(workflow-executor): rename fieldName to fieldNames and force…
Mar 18, 2026
2fade25
refactor(workflow-executor): type ReadRecordStepExecutionData precisely
Mar 18, 2026
2e72741
refactor(workflow-executor): make ConditionStepExecutionData fields r…
Mar 18, 2026
bb42564
fix(workflow-executor): fix lint errors from CI
Mar 18, 2026
d668f3f
refactor(workflow-executor): separate schema types from record data t…
Mar 18, 2026
cd98937
refactor(workflow-executor): rename ActionRef to ActionSchema and rem…
Mar 18, 2026
49b05bf
refactor(workflow-executor): remove dead RunStore methods
Mar 18, 2026
5de8947
refactor(workflow-executor): replace RunStore.getRecords with baseRec…
Mar 18, 2026
c1d2ce0
feat(workflow-executor): include executionResult in buildStepSummary
Mar 18, 2026
0de5c49
refactor(workflow-executor): rename Result to Input in buildStepSummary
Mar 18, 2026
45e8909
refactor(workflow-executor): rename selectedRecord to selectedRecordRef
Mar 18, 2026
3a12b01
chore(workflow-executor): add TODO on isExecutedStepOnExecutor
Mar 18, 2026
2a71cee
feat(workflow-executor): change baseRecord to RecordRef and fetch val…
Mar 18, 2026
33e5552
fix(workflow-executor): update adapter to use renamed types after rebase
Mar 18, 2026
c8a12c2
refactor(workflow-executor): narrow catch block and clean up RecordDa…
Mar 18, 2026
ac1e6c7
refactor(workflow-executor): move step and stepHistory into Execution…
Mar 18, 2026
ba6ac85
refactor(workflow-executor): rename agentRecord and readFieldValues
Mar 18, 2026
23d8413
fix(workflow-executor): guard against undefined executionParams in st…
Mar 18, 2026
c4ab613
chore(workflow-executor): remove obsolete ts-expect-error on ServerUt…
Mar 18, 2026
fcec371
test(workflow-executor): cover executionParams guard and NoRecordsError
Mar 18, 2026
20368db
refactor(workflow-executor): use currentStep StepRecord in PendingSte…
Mar 18, 2026
f9a6fbd
refactor(workflow-executor): remove stepHistory from ExecutionContext…
Mar 19, 2026
5f46199
refactor(workflow-executor): cache collection schemas and remove unus…
Mar 19, 2026
650e70b
refactor(workflow-executor): rename StepHistory to StepOutcome and ex…
Mar 19, 2026
8f638c0
feat(workflow-executor): add optional fieldNames to AgentPort.getRecord
Mar 19, 2026
761f080
refactor(workflow-executor): skip getRecord when no fields resolve
Mar 19, 2026
9e6b56b
feat(workflow-executor): return step error when no fields can be reso…
Mar 19, 2026
6dac969
refactor(workflow-executor): remove resolveFieldNames, inline field n…
Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ yarn workspace @forestadmin/agent test
5. Are edge cases handled?
6. Is the naming clear and consistent?

## Git Workflow

The **main working branch** for workflow-executor development is `feat/prd-214-setup-workflow-executor-package`.
All feature branches for this area should be based on and PRs targeted at this branch (not `main`).

## Linear Tickets

### MCP Setup
Expand Down
9 changes: 5 additions & 4 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ──

```
src/
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError
├── types/ # Core type definitions (@draft)
│ ├── step-definition.ts # StepType enum + step definition interfaces
│ ├── step-history.ts # Step outcome tracking types
│ ├── step-outcome.ts # Step outcome tracking types (StepOutcome, sent to orchestrator)
│ ├── step-execution-data.ts # Runtime state for in-progress steps
│ ├── record.ts # Record references and data types
│ └── execution.ts # Top-level execution types (context, results)
Expand All @@ -55,15 +55,16 @@ src/
│ └── run-store.ts # Interface for persisting run state (scoped to a run)
├── executors/ # Step executor implementations
│ ├── base-step-executor.ts # Abstract base class (context injection + shared helpers)
│ └── condition-step-executor.ts # AI-powered condition step (chooses among options)
│ ├── condition-step-executor.ts # AI-powered condition step (chooses among options)
│ └── read-record-step-executor.ts # AI-powered record field reading step
└── index.ts # Barrel exports
```

## Architecture Principles

- **Pull-based** — The executor polls for pending steps via a port interface (`WorkflowPort.getPendingStepExecutions`; polling loop not yet implemented).
- **Atomic** — Each step executes in isolation. A run store (scoped per run) maintains continuity between steps.
- **Privacy** — Zero client data leaves the client's infrastructure. `StepHistory` is sent to the orchestrator and must NEVER contain client data. Privacy-sensitive information (e.g. AI reasoning) must stay in `StepExecutionData` (persisted in the RunStore, client-side only).
- **Privacy** — Zero client data leaves the client's infrastructure. `StepOutcome` is sent to the orchestrator and must NEVER contain client data. Privacy-sensitive information (e.g. AI reasoning) must stay in `StepExecutionData` (persisted in the RunStore, client-side only).
- **Ports (IO injection)** — All external IO goes through injected port interfaces, keeping the core pure and testable.
- **AI integration** — Uses `@langchain/core` (`BaseChatModel`, `DynamicStructuredTool`) for AI-powered steps. `ExecutionContext.model` is a `BaseChatModel`.
- **No recovery/retry** — Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once.
Expand Down
44 changes: 19 additions & 25 deletions packages/workflow-executor/src/adapters/agent-client-agent-port.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { AgentPort } from '../ports/agent-port';
import type { ActionRef, CollectionRef, RecordData } from '../types/record';
import type { CollectionSchema } from '../types/record';
import type { RemoteAgentClient, SelectOptions } from '@forestadmin/agent-client';

import { RecordNotFoundError } from '../errors';
Expand Down Expand Up @@ -36,68 +36,62 @@ function extractRecordId(

export default class AgentClientAgentPort implements AgentPort {
private readonly client: RemoteAgentClient;
private readonly collectionRefs: Record<string, CollectionRef>;
private readonly collectionSchemas: Record<string, CollectionSchema>;

constructor(params: {
client: RemoteAgentClient;
collectionRefs: Record<string, CollectionRef>;
collectionSchemas: Record<string, CollectionSchema>;
}) {
this.client = params.client;
this.collectionRefs = params.collectionRefs;
this.collectionSchemas = params.collectionSchemas;
}

async getRecord(collectionName: string, recordId: Array<string | number>): Promise<RecordData> {
const ref = this.getCollectionRef(collectionName);
async getRecord(collectionName: string, recordId: Array<string | number>, fieldNames?: string[]) {
const schema = this.resolveSchema(collectionName);
const records = await this.client.collection(collectionName).list<Record<string, unknown>>({
filters: buildPkFilter(ref.primaryKeyFields, recordId),
filters: buildPkFilter(schema.primaryKeyFields, recordId),
pagination: { size: 1, number: 1 },
...(fieldNames?.length && { fields: fieldNames }),
});

if (records.length === 0) {
throw new RecordNotFoundError(collectionName, encodePk(recordId));
}

return { ...ref, recordId, values: records[0] };
return { collectionName, recordId, values: records[0] };
}

async updateRecord(
collectionName: string,
recordId: Array<string | number>,
values: Record<string, unknown>,
): Promise<RecordData> {
const ref = this.getCollectionRef(collectionName);
) {
const updatedRecord = await this.client
.collection(collectionName)
.update<Record<string, unknown>>(encodePk(recordId), values);

return { ...ref, recordId, values: updatedRecord };
return { collectionName, recordId, values: updatedRecord };
}

async getRelatedData(
collectionName: string,
recordId: Array<string | number>,
relationName: string,
): Promise<RecordData[]> {
const relatedRef = this.getCollectionRef(relationName);
) {
const relatedSchema = this.resolveSchema(relationName);

const records = await this.client
.collection(collectionName)
.relation(relationName, encodePk(recordId))
.list<Record<string, unknown>>();

return records.map(record => ({
...relatedRef,
recordId: extractRecordId(relatedRef.primaryKeyFields, record),
collectionName: relatedSchema.collectionName,
recordId: extractRecordId(relatedSchema.primaryKeyFields, record),
values: record,
}));
}

async getActions(collectionName: string): Promise<ActionRef[]> {
const ref = this.collectionRefs[collectionName];

return ref ? ref.actions : [];
}

async executeAction(
collectionName: string,
actionName: string,
Expand All @@ -111,10 +105,10 @@ export default class AgentClientAgentPort implements AgentPort {
return action.execute();
}

private getCollectionRef(collectionName: string): CollectionRef {
const ref = this.collectionRefs[collectionName];
private resolveSchema(collectionName: string): CollectionSchema {
const schema = this.collectionSchemas[collectionName];

if (!ref) {
if (!schema) {
return {
collectionName,
collectionDisplayName: collectionName,
Expand All @@ -124,6 +118,6 @@ export default class AgentClientAgentPort implements AgentPort {
};
}

return ref;
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { McpConfiguration, WorkflowPort } from '../ports/workflow-port';
import type { PendingStepExecution } from '../types/execution';
import type { CollectionRef } from '../types/record';
import type { StepHistory } from '../types/step-history';
import type { CollectionSchema } from '../types/record';
import type { StepOutcome } from '../types/step-outcome';
import type { HttpOptions } from '@forestadmin/forestadmin-client';

import { ServerUtils } from '@forestadmin/forestadmin-client';
Expand All @@ -10,7 +10,7 @@ import { ServerUtils } from '@forestadmin/forestadmin-client';
const ROUTES = {
pendingStepExecutions: '/liana/v1/workflow-step-executions/pending',
updateStepExecution: (runId: string) => `/liana/v1/workflow-step-executions/${runId}/complete`,
collectionRef: (collectionName: string) => `/liana/v1/collections/${collectionName}`,
collectionSchema: (collectionName: string) => `/liana/v1/collections/${collectionName}`,
mcpServerConfigs: '/liana/mcp-server-configs-with-details',
};

Expand All @@ -29,21 +29,21 @@ export default class ForestServerWorkflowPort implements WorkflowPort {
);
}

async updateStepExecution(runId: string, stepHistory: StepHistory): Promise<void> {
async updateStepExecution(runId: string, stepOutcome: StepOutcome): Promise<void> {
await ServerUtils.query(
this.options,
'post',
ROUTES.updateStepExecution(runId),
{},
stepHistory,
stepOutcome,
);
}

async getCollectionRef(collectionName: string): Promise<CollectionRef> {
return ServerUtils.query<CollectionRef>(
async getCollectionSchema(collectionName: string): Promise<CollectionSchema> {
return ServerUtils.query<CollectionSchema>(
this.options,
'get',
ROUTES.collectionRef(collectionName),
ROUTES.collectionSchema(collectionName),
);
}

Expand Down
18 changes: 18 additions & 0 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,21 @@ export class RecordNotFoundError extends WorkflowExecutorError {
super(`Record not found: collection "${collectionName}", id "${recordId}"`);
}
}

export class NoRecordsError extends WorkflowExecutorError {
constructor() {
super('No records available');
}
}

export class NoReadableFieldsError extends WorkflowExecutorError {
constructor(collectionName: string) {
super(`No readable fields on record from collection "${collectionName}"`);
}
}

export class NoResolvedFieldsError extends WorkflowExecutorError {
constructor(fieldNames: string[]) {
super(`None of the requested fields could be resolved: ${fieldNames.join(', ')}`);
}
}
42 changes: 23 additions & 19 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
import type { ExecutionContext, StepExecutionResult } from '../types/execution';
import type { StepDefinition } from '../types/step-definition';
import type { StepExecutionData } from '../types/step-execution-data';
import type { StepHistory } from '../types/step-history';
import type { StepOutcome } from '../types/step-outcome';
import type { AIMessage, BaseMessage } from '@langchain/core/messages';
import type { DynamicStructuredTool } from '@langchain/core/tools';

import { SystemMessage } from '@langchain/core/messages';

import { MalformedToolCallError, MissingToolCallError } from '../errors';
import { isExecutedStepOnExecutor } from '../types/step-execution-data';

export default abstract class BaseStepExecutor<
TStep extends StepDefinition = StepDefinition,
THistory extends StepHistory = StepHistory,
> {
protected readonly context: ExecutionContext;
export default abstract class BaseStepExecutor<TStep extends StepDefinition = StepDefinition> {
protected readonly context: ExecutionContext<TStep>;

constructor(context: ExecutionContext) {
constructor(context: ExecutionContext<TStep>) {
this.context = context;
}

abstract execute(step: TStep, stepHistory: THistory): Promise<StepExecutionResult>;
abstract execute(): Promise<StepExecutionResult>;

/**
* Returns a SystemMessage array summarizing previously executed steps.
Expand All @@ -35,35 +33,41 @@ export default abstract class BaseStepExecutor<

/**
* Builds a text summary of previously executed steps for AI prompts.
* Correlates history entries (step + stepHistory pairs) with executionParams
* from the RunStore (matched by stepHistory.stepIndex).
* When no executionParams is available, falls back to StepHistory details.
* Correlates history entries (step + stepOutcome pairs) with execution data
* from the RunStore (matched by stepOutcome.stepIndex).
* When no execution data is available, falls back to StepOutcome details.
*/
private async summarizePreviousSteps(): Promise<string> {
const allStepExecutions = await this.context.runStore.getStepExecutions();

return this.context.history
.map(({ step, stepHistory }) => {
const execution = allStepExecutions.find(e => e.stepIndex === stepHistory.stepIndex);
.map(({ stepDefinition, stepOutcome }) => {
const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex);

return this.buildStepSummary(step, stepHistory, execution);
return this.buildStepSummary(stepDefinition, stepOutcome, execution);
})
.join('\n\n');
}

private buildStepSummary(
step: StepDefinition,
stepHistory: StepHistory,
stepOutcome: StepOutcome,
execution: StepExecutionData | undefined,
): string {
const prompt = step.prompt ?? '(no prompt)';
const header = `Step "${step.id}" (index ${stepHistory.stepIndex}):`;
const header = `Step "${stepOutcome.stepId}" (index ${stepOutcome.stepIndex}):`;
const lines = [header, ` Prompt: ${prompt}`];

if (execution?.executionParams) {
lines.push(` Result: ${JSON.stringify(execution.executionParams)}`);
if (isExecutedStepOnExecutor(execution)) {
if (execution.executionParams !== undefined) {
lines.push(` Input: ${JSON.stringify(execution.executionParams)}`);
}

if (execution.executionResult) {
lines.push(` Output: ${JSON.stringify(execution.executionResult)}`);
}
} else {
const { stepId, stepIndex, type, ...historyDetails } = stepHistory;
const { stepId, stepIndex, type, ...historyDetails } = stepOutcome;
lines.push(` History: ${JSON.stringify(historyDetails)}`);
}

Expand Down
40 changes: 25 additions & 15 deletions packages/workflow-executor/src/executors/condition-step-executor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { StepExecutionResult } from '../types/execution';
import type { ConditionStepDefinition } from '../types/step-definition';
import type { ConditionStepHistory } from '../types/step-history';

import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { DynamicStructuredTool } from '@langchain/core/tools';
Expand Down Expand Up @@ -36,14 +35,10 @@ const GATEWAY_SYSTEM_PROMPT = `You are an AI agent selecting the correct option
- If selecting null: explain why options don't match the question
- Do not refer to yourself as "I" in the response, use a passive formulation instead.`;

export default class ConditionStepExecutor extends BaseStepExecutor<
ConditionStepDefinition,
ConditionStepHistory
> {
async execute(
step: ConditionStepDefinition,
stepHistory: ConditionStepHistory,
): Promise<StepExecutionResult> {
export default class ConditionStepExecutor extends BaseStepExecutor<ConditionStepDefinition> {
async execute(): Promise<StepExecutionResult> {
const { stepDefinition: step } = this.context;

const tool = new DynamicStructuredTool({
name: 'choose-gateway-option',
description:
Expand All @@ -58,7 +53,7 @@ export default class ConditionStepExecutor extends BaseStepExecutor<
.nullable()
.describe('The chosen option, or null if no option clearly answers the question.'),
}),
func: async input => JSON.stringify(input),
func: undefined,
});

const messages = [
Expand All @@ -73,8 +68,10 @@ export default class ConditionStepExecutor extends BaseStepExecutor<
args = await this.invokeWithTool<GatewayToolArgs>(messages, tool);
} catch (error: unknown) {
return {
stepHistory: {
...stepHistory,
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status: 'error',
error: (error as Error).message,
},
Expand All @@ -85,17 +82,30 @@ export default class ConditionStepExecutor extends BaseStepExecutor<

await this.context.runStore.saveStepExecution({
type: 'condition',
stepIndex: stepHistory.stepIndex,
stepIndex: this.context.stepIndex,
executionParams: { answer: selectedOption, reasoning },
executionResult: selectedOption ? { answer: selectedOption } : undefined,
});

if (!selectedOption) {
return { stepHistory: { ...stepHistory, status: 'manual-decision' } };
return {
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status: 'manual-decision',
},
};
}

return {
stepHistory: { ...stepHistory, status: 'success', selectedOption },
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status: 'success',
selectedOption,
},
};
}
}
Loading
Loading