diff --git a/CLAUDE.md b/CLAUDE.md index 238598e1f..138e9daaf 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -104,6 +104,11 @@ yarn workspace @forestadmin/agent test 5. Are edge cases handled? 6. Is the naming clear and consistent? +## Git Workflow + +The **main working branch** for workflow-executor development is `feat/prd-214-setup-workflow-executor-package`. +All feature branches for this area should be based on and PRs targeted at this branch (not `main`). + ## Linear Tickets ### MCP Setup diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 2be0522bc..bff9a8414 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -42,10 +42,10 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ── ``` src/ -├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError +├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError ├── types/ # Core type definitions (@draft) │ ├── step-definition.ts # StepType enum + step definition interfaces -│ ├── step-history.ts # Step outcome tracking types +│ ├── step-outcome.ts # Step outcome tracking types (StepOutcome, sent to orchestrator) │ ├── step-execution-data.ts # Runtime state for in-progress steps │ ├── record.ts # Record references and data types │ └── execution.ts # Top-level execution types (context, results) @@ -55,7 +55,8 @@ src/ │ └── run-store.ts # Interface for persisting run state (scoped to a run) ├── executors/ # Step executor implementations │ ├── base-step-executor.ts # Abstract base class (context injection + shared helpers) -│ └── condition-step-executor.ts # AI-powered condition step (chooses among options) +│ ├── condition-step-executor.ts # AI-powered condition step (chooses among options) +│ └── read-record-step-executor.ts # AI-powered record field reading step └── index.ts # Barrel exports ``` @@ -63,7 +64,7 @@ src/ - **Pull-based** — The executor polls for pending steps via a port interface (`WorkflowPort.getPendingStepExecutions`; polling loop not yet implemented). - **Atomic** — Each step executes in isolation. A run store (scoped per run) maintains continuity between steps. -- **Privacy** — Zero client data leaves the client's infrastructure. `StepHistory` is sent to the orchestrator and must NEVER contain client data. Privacy-sensitive information (e.g. AI reasoning) must stay in `StepExecutionData` (persisted in the RunStore, client-side only). +- **Privacy** — Zero client data leaves the client's infrastructure. `StepOutcome` is sent to the orchestrator and must NEVER contain client data. Privacy-sensitive information (e.g. AI reasoning) must stay in `StepExecutionData` (persisted in the RunStore, client-side only). - **Ports (IO injection)** — All external IO goes through injected port interfaces, keeping the core pure and testable. - **AI integration** — Uses `@langchain/core` (`BaseChatModel`, `DynamicStructuredTool`) for AI-powered steps. `ExecutionContext.model` is a `BaseChatModel`. - **No recovery/retry** — Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once. diff --git a/packages/workflow-executor/src/adapters/agent-client-agent-port.ts b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts index 23015e8bc..cf8949a1a 100644 --- a/packages/workflow-executor/src/adapters/agent-client-agent-port.ts +++ b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts @@ -1,5 +1,5 @@ import type { AgentPort } from '../ports/agent-port'; -import type { ActionRef, CollectionRef, RecordData } from '../types/record'; +import type { CollectionSchema } from '../types/record'; import type { RemoteAgentClient, SelectOptions } from '@forestadmin/agent-client'; import { RecordNotFoundError } from '../errors'; @@ -36,49 +36,49 @@ function extractRecordId( export default class AgentClientAgentPort implements AgentPort { private readonly client: RemoteAgentClient; - private readonly collectionRefs: Record; + private readonly collectionSchemas: Record; constructor(params: { client: RemoteAgentClient; - collectionRefs: Record; + collectionSchemas: Record; }) { this.client = params.client; - this.collectionRefs = params.collectionRefs; + this.collectionSchemas = params.collectionSchemas; } - async getRecord(collectionName: string, recordId: Array): Promise { - const ref = this.getCollectionRef(collectionName); + async getRecord(collectionName: string, recordId: Array, fieldNames?: string[]) { + const schema = this.resolveSchema(collectionName); const records = await this.client.collection(collectionName).list>({ - filters: buildPkFilter(ref.primaryKeyFields, recordId), + filters: buildPkFilter(schema.primaryKeyFields, recordId), pagination: { size: 1, number: 1 }, + ...(fieldNames?.length && { fields: fieldNames }), }); if (records.length === 0) { throw new RecordNotFoundError(collectionName, encodePk(recordId)); } - return { ...ref, recordId, values: records[0] }; + return { collectionName, recordId, values: records[0] }; } async updateRecord( collectionName: string, recordId: Array, values: Record, - ): Promise { - const ref = this.getCollectionRef(collectionName); + ) { const updatedRecord = await this.client .collection(collectionName) .update>(encodePk(recordId), values); - return { ...ref, recordId, values: updatedRecord }; + return { collectionName, recordId, values: updatedRecord }; } async getRelatedData( collectionName: string, recordId: Array, relationName: string, - ): Promise { - const relatedRef = this.getCollectionRef(relationName); + ) { + const relatedSchema = this.resolveSchema(relationName); const records = await this.client .collection(collectionName) @@ -86,18 +86,12 @@ export default class AgentClientAgentPort implements AgentPort { .list>(); return records.map(record => ({ - ...relatedRef, - recordId: extractRecordId(relatedRef.primaryKeyFields, record), + collectionName: relatedSchema.collectionName, + recordId: extractRecordId(relatedSchema.primaryKeyFields, record), values: record, })); } - async getActions(collectionName: string): Promise { - const ref = this.collectionRefs[collectionName]; - - return ref ? ref.actions : []; - } - async executeAction( collectionName: string, actionName: string, @@ -111,10 +105,10 @@ export default class AgentClientAgentPort implements AgentPort { return action.execute(); } - private getCollectionRef(collectionName: string): CollectionRef { - const ref = this.collectionRefs[collectionName]; + private resolveSchema(collectionName: string): CollectionSchema { + const schema = this.collectionSchemas[collectionName]; - if (!ref) { + if (!schema) { return { collectionName, collectionDisplayName: collectionName, @@ -124,6 +118,6 @@ export default class AgentClientAgentPort implements AgentPort { }; } - return ref; + return schema; } } diff --git a/packages/workflow-executor/src/adapters/forest-server-workflow-port.ts b/packages/workflow-executor/src/adapters/forest-server-workflow-port.ts index e804e01cf..16037570b 100644 --- a/packages/workflow-executor/src/adapters/forest-server-workflow-port.ts +++ b/packages/workflow-executor/src/adapters/forest-server-workflow-port.ts @@ -1,7 +1,7 @@ import type { McpConfiguration, WorkflowPort } from '../ports/workflow-port'; import type { PendingStepExecution } from '../types/execution'; -import type { CollectionRef } from '../types/record'; -import type { StepHistory } from '../types/step-history'; +import type { CollectionSchema } from '../types/record'; +import type { StepOutcome } from '../types/step-outcome'; import type { HttpOptions } from '@forestadmin/forestadmin-client'; import { ServerUtils } from '@forestadmin/forestadmin-client'; @@ -10,7 +10,7 @@ import { ServerUtils } from '@forestadmin/forestadmin-client'; const ROUTES = { pendingStepExecutions: '/liana/v1/workflow-step-executions/pending', updateStepExecution: (runId: string) => `/liana/v1/workflow-step-executions/${runId}/complete`, - collectionRef: (collectionName: string) => `/liana/v1/collections/${collectionName}`, + collectionSchema: (collectionName: string) => `/liana/v1/collections/${collectionName}`, mcpServerConfigs: '/liana/mcp-server-configs-with-details', }; @@ -29,21 +29,21 @@ export default class ForestServerWorkflowPort implements WorkflowPort { ); } - async updateStepExecution(runId: string, stepHistory: StepHistory): Promise { + async updateStepExecution(runId: string, stepOutcome: StepOutcome): Promise { await ServerUtils.query( this.options, 'post', ROUTES.updateStepExecution(runId), {}, - stepHistory, + stepOutcome, ); } - async getCollectionRef(collectionName: string): Promise { - return ServerUtils.query( + async getCollectionSchema(collectionName: string): Promise { + return ServerUtils.query( this.options, 'get', - ROUTES.collectionRef(collectionName), + ROUTES.collectionSchema(collectionName), ); } diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index d735977d4..b835c391f 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -27,3 +27,21 @@ export class RecordNotFoundError extends WorkflowExecutorError { super(`Record not found: collection "${collectionName}", id "${recordId}"`); } } + +export class NoRecordsError extends WorkflowExecutorError { + constructor() { + super('No records available'); + } +} + +export class NoReadableFieldsError extends WorkflowExecutorError { + constructor(collectionName: string) { + super(`No readable fields on record from collection "${collectionName}"`); + } +} + +export class NoResolvedFieldsError extends WorkflowExecutorError { + constructor(fieldNames: string[]) { + super(`None of the requested fields could be resolved: ${fieldNames.join(', ')}`); + } +} diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 6c4fc7694..ed9de5cb3 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -1,25 +1,23 @@ import type { ExecutionContext, StepExecutionResult } from '../types/execution'; import type { StepDefinition } from '../types/step-definition'; import type { StepExecutionData } from '../types/step-execution-data'; -import type { StepHistory } from '../types/step-history'; +import type { StepOutcome } from '../types/step-outcome'; import type { AIMessage, BaseMessage } from '@langchain/core/messages'; import type { DynamicStructuredTool } from '@langchain/core/tools'; import { SystemMessage } from '@langchain/core/messages'; import { MalformedToolCallError, MissingToolCallError } from '../errors'; +import { isExecutedStepOnExecutor } from '../types/step-execution-data'; -export default abstract class BaseStepExecutor< - TStep extends StepDefinition = StepDefinition, - THistory extends StepHistory = StepHistory, -> { - protected readonly context: ExecutionContext; +export default abstract class BaseStepExecutor { + protected readonly context: ExecutionContext; - constructor(context: ExecutionContext) { + constructor(context: ExecutionContext) { this.context = context; } - abstract execute(step: TStep, stepHistory: THistory): Promise; + abstract execute(): Promise; /** * Returns a SystemMessage array summarizing previously executed steps. @@ -35,35 +33,41 @@ export default abstract class BaseStepExecutor< /** * Builds a text summary of previously executed steps for AI prompts. - * Correlates history entries (step + stepHistory pairs) with executionParams - * from the RunStore (matched by stepHistory.stepIndex). - * When no executionParams is available, falls back to StepHistory details. + * Correlates history entries (step + stepOutcome pairs) with execution data + * from the RunStore (matched by stepOutcome.stepIndex). + * When no execution data is available, falls back to StepOutcome details. */ private async summarizePreviousSteps(): Promise { const allStepExecutions = await this.context.runStore.getStepExecutions(); return this.context.history - .map(({ step, stepHistory }) => { - const execution = allStepExecutions.find(e => e.stepIndex === stepHistory.stepIndex); + .map(({ stepDefinition, stepOutcome }) => { + const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex); - return this.buildStepSummary(step, stepHistory, execution); + return this.buildStepSummary(stepDefinition, stepOutcome, execution); }) .join('\n\n'); } private buildStepSummary( step: StepDefinition, - stepHistory: StepHistory, + stepOutcome: StepOutcome, execution: StepExecutionData | undefined, ): string { const prompt = step.prompt ?? '(no prompt)'; - const header = `Step "${step.id}" (index ${stepHistory.stepIndex}):`; + const header = `Step "${stepOutcome.stepId}" (index ${stepOutcome.stepIndex}):`; const lines = [header, ` Prompt: ${prompt}`]; - if (execution?.executionParams) { - lines.push(` Result: ${JSON.stringify(execution.executionParams)}`); + if (isExecutedStepOnExecutor(execution)) { + if (execution.executionParams !== undefined) { + lines.push(` Input: ${JSON.stringify(execution.executionParams)}`); + } + + if (execution.executionResult) { + lines.push(` Output: ${JSON.stringify(execution.executionResult)}`); + } } else { - const { stepId, stepIndex, type, ...historyDetails } = stepHistory; + const { stepId, stepIndex, type, ...historyDetails } = stepOutcome; lines.push(` History: ${JSON.stringify(historyDetails)}`); } diff --git a/packages/workflow-executor/src/executors/condition-step-executor.ts b/packages/workflow-executor/src/executors/condition-step-executor.ts index b90d47ad8..ee4a60f83 100644 --- a/packages/workflow-executor/src/executors/condition-step-executor.ts +++ b/packages/workflow-executor/src/executors/condition-step-executor.ts @@ -1,6 +1,5 @@ import type { StepExecutionResult } from '../types/execution'; import type { ConditionStepDefinition } from '../types/step-definition'; -import type { ConditionStepHistory } from '../types/step-history'; import { HumanMessage, SystemMessage } from '@langchain/core/messages'; import { DynamicStructuredTool } from '@langchain/core/tools'; @@ -36,14 +35,10 @@ const GATEWAY_SYSTEM_PROMPT = `You are an AI agent selecting the correct option - If selecting null: explain why options don't match the question - Do not refer to yourself as "I" in the response, use a passive formulation instead.`; -export default class ConditionStepExecutor extends BaseStepExecutor< - ConditionStepDefinition, - ConditionStepHistory -> { - async execute( - step: ConditionStepDefinition, - stepHistory: ConditionStepHistory, - ): Promise { +export default class ConditionStepExecutor extends BaseStepExecutor { + async execute(): Promise { + const { stepDefinition: step } = this.context; + const tool = new DynamicStructuredTool({ name: 'choose-gateway-option', description: @@ -58,7 +53,7 @@ export default class ConditionStepExecutor extends BaseStepExecutor< .nullable() .describe('The chosen option, or null if no option clearly answers the question.'), }), - func: async input => JSON.stringify(input), + func: undefined, }); const messages = [ @@ -73,8 +68,10 @@ export default class ConditionStepExecutor extends BaseStepExecutor< args = await this.invokeWithTool(messages, tool); } catch (error: unknown) { return { - stepHistory: { - ...stepHistory, + stepOutcome: { + type: 'condition', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, status: 'error', error: (error as Error).message, }, @@ -85,17 +82,30 @@ export default class ConditionStepExecutor extends BaseStepExecutor< await this.context.runStore.saveStepExecution({ type: 'condition', - stepIndex: stepHistory.stepIndex, + stepIndex: this.context.stepIndex, executionParams: { answer: selectedOption, reasoning }, executionResult: selectedOption ? { answer: selectedOption } : undefined, }); if (!selectedOption) { - return { stepHistory: { ...stepHistory, status: 'manual-decision' } }; + return { + stepOutcome: { + type: 'condition', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'manual-decision', + }, + }; } return { - stepHistory: { ...stepHistory, status: 'success', selectedOption }, + stepOutcome: { + type: 'condition', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'success', + selectedOption, + }, }; } } diff --git a/packages/workflow-executor/src/executors/read-record-step-executor.ts b/packages/workflow-executor/src/executors/read-record-step-executor.ts new file mode 100644 index 000000000..e164e1690 --- /dev/null +++ b/packages/workflow-executor/src/executors/read-record-step-executor.ts @@ -0,0 +1,229 @@ +import type { StepExecutionResult } from '../types/execution'; +import type { CollectionSchema, RecordRef } from '../types/record'; +import type { AiTaskStepDefinition } from '../types/step-definition'; +import type { + FieldReadResult, + LoadRelatedRecordStepExecutionData, +} from '../types/step-execution-data'; + +import { HumanMessage, SystemMessage } from '@langchain/core/messages'; +import { DynamicStructuredTool } from '@langchain/core/tools'; +import { z } from 'zod'; + +import { + NoReadableFieldsError, + NoRecordsError, + NoResolvedFieldsError, + WorkflowExecutorError, +} from '../errors'; +import BaseStepExecutor from './base-step-executor'; + +const READ_RECORD_SYSTEM_PROMPT = `You are an AI agent reading fields from a record to answer a user request. +Select the field(s) that best answer the request. You can read one field or multiple fields at once. + +Important rules: +- Be precise: only read fields that are directly relevant to the request. +- Final answer is definitive, you won't receive any other input from the user. +- Do not refer to yourself as "I" in the response, use a passive formulation instead.`; + +export default class ReadRecordStepExecutor extends BaseStepExecutor { + private readonly schemaCache = new Map(); + + async execute(): Promise { + const { stepDefinition: step } = this.context; + const records = await this.getAvailableRecordRefs(); + + let selectedRecordRef: RecordRef; + let schema: CollectionSchema; + let fieldResults: FieldReadResult[]; + + try { + selectedRecordRef = await this.selectRecordRef(records, step.prompt); + schema = await this.getCollectionSchema(selectedRecordRef.collectionName); + const selectedDisplayNames = await this.selectFields(schema, step.prompt); + const resolvedFieldNames = selectedDisplayNames + .map( + name => + schema.fields.find(f => f.fieldName === name || f.displayName === name)?.fieldName, + ) + .filter((name): name is string => name !== undefined); + + if (resolvedFieldNames.length === 0) { + throw new NoResolvedFieldsError(selectedDisplayNames); + } + + const recordData = await this.context.agentPort.getRecord( + selectedRecordRef.collectionName, + selectedRecordRef.recordId, + resolvedFieldNames, + ); + fieldResults = this.formatFieldResults(recordData.values, schema, selectedDisplayNames); + } catch (error) { + if (error instanceof WorkflowExecutorError) { + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'error', + error: error.message, + }, + }; + } + + throw error; + } + + await this.context.runStore.saveStepExecution({ + type: 'read-record', + stepIndex: this.context.stepIndex, + executionParams: { fieldNames: fieldResults.map(f => f.fieldName) }, + executionResult: { fields: fieldResults }, + selectedRecordRef, + }); + + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'success', + }, + }; + } + + private async selectFields( + schema: CollectionSchema, + prompt: string | undefined, + ): Promise { + const tool = this.buildReadFieldTool(schema); + const messages = [ + ...(await this.buildPreviousStepsMessages()), + new SystemMessage(READ_RECORD_SYSTEM_PROMPT), + new SystemMessage( + `The selected record belongs to the "${schema.collectionDisplayName}" collection.`, + ), + new HumanMessage(`**Request**: ${prompt ?? 'Read the relevant fields.'}`), + ]; + + const args = await this.invokeWithTool<{ fieldNames: string[] }>(messages, tool); + + return args.fieldNames; + } + + private async selectRecordRef( + records: RecordRef[], + prompt: string | undefined, + ): Promise { + if (records.length === 0) throw new NoRecordsError(); + if (records.length === 1) return records[0]; + + const identifiers = await Promise.all(records.map(r => this.toRecordIdentifier(r))); + const identifierTuple = identifiers as [string, ...string[]]; + + const tool = new DynamicStructuredTool({ + name: 'select-record', + description: 'Select the most relevant record for this workflow step.', + schema: z.object({ + recordIdentifier: z.enum(identifierTuple), + }), + func: undefined, + }); + + const messages = [ + ...(await this.buildPreviousStepsMessages()), + new SystemMessage( + 'You are an AI agent selecting the most relevant record for a workflow step.\n' + + 'Choose the record whose collection best matches the user request.\n' + + 'Pay attention to the collection name of each record.', + ), + new HumanMessage(prompt ?? 'Select the most relevant record.'), + ]; + + const { recordIdentifier } = await this.invokeWithTool<{ recordIdentifier: string }>( + messages, + tool, + ); + + const selectedIndex = identifiers.indexOf(recordIdentifier); + + if (selectedIndex === -1) { + throw new WorkflowExecutorError( + `AI selected record "${recordIdentifier}" which does not match any available record`, + ); + } + + return records[selectedIndex]; + } + + private buildReadFieldTool(schema: CollectionSchema): DynamicStructuredTool { + const nonRelationFields = schema.fields.filter(f => !f.isRelationship); + + if (nonRelationFields.length === 0) { + throw new NoReadableFieldsError(schema.collectionName); + } + + const displayNames = nonRelationFields.map(f => f.displayName) as [string, ...string[]]; + + return new DynamicStructuredTool({ + name: 'read-selected-record-fields', + description: 'Read one or more fields from the selected record.', + schema: z.object({ + // z.string() (not z.enum) intentionally: an invalid field name in the array + // does not fail the whole tool call — per-field errors are handled in formatFieldResults. + // This matches the frontend implementation (ISO frontend). + fieldNames: z + .array(z.string()) + .describe( + `Names of the fields to read, possible values are: ${displayNames + .map(n => `"${n}"`) + .join(', ')}`, + ), + }), + func: undefined, + }); + } + + private formatFieldResults( + values: Record, + schema: CollectionSchema, + fieldNames: string[], + ): FieldReadResult[] { + return fieldNames.map(name => { + const field = schema.fields.find(f => f.fieldName === name || f.displayName === name); + + if (!field) return { error: `Field not found: ${name}`, fieldName: name, displayName: name }; + + return { + value: values[field.fieldName], + fieldName: field.fieldName, + displayName: field.displayName, + }; + }); + } + + private async getAvailableRecordRefs(): Promise { + const stepExecutions = await this.context.runStore.getStepExecutions(); + const relatedRecords = stepExecutions + .filter((e): e is LoadRelatedRecordStepExecutionData => e.type === 'load-related-record') + .map(e => e.record); + + return [this.context.baseRecordRef, ...relatedRecords]; + } + + private async getCollectionSchema(collectionName: string): Promise { + const cached = this.schemaCache.get(collectionName); + if (cached) return cached; + + const schema = await this.context.workflowPort.getCollectionSchema(collectionName); + this.schemaCache.set(collectionName, schema); + + return schema; + } + + private async toRecordIdentifier(record: RecordRef): Promise { + const schema = await this.getCollectionSchema(record.collectionName); + + return `Step ${record.stepIndex} - ${schema.collectionDisplayName} #${record.recordId}`; + } +} diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 2918b36c4..16c054cfd 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -1,6 +1,5 @@ export { StepType } from './types/step-definition'; export type { - StepCategory, ConditionStepDefinition, AiTaskStepDefinition, StepDefinition, @@ -8,21 +7,35 @@ export type { export type { StepStatus, - ConditionStepHistory, - AiTaskStepHistory, - StepHistory, -} from './types/step-history'; + ConditionStepOutcome, + AiTaskStepOutcome, + StepOutcome, +} from './types/step-outcome'; export type { + FieldReadSuccess, + FieldReadError, + FieldReadResult, ConditionStepExecutionData, + ReadRecordStepExecutionData, AiTaskStepExecutionData, + LoadRelatedRecordStepExecutionData, + ExecutedStepExecutionData, StepExecutionData, } from './types/step-execution-data'; -export type { RecordFieldRef, ActionRef, CollectionRef, RecordData } from './types/record'; +export { isExecutedStepOnExecutor } from './types/step-execution-data'; export type { - StepRecord, + FieldSchema, + ActionSchema, + CollectionSchema, + RecordRef, + RecordData, +} from './types/record'; + +export type { + Step, UserInput, PendingStepExecution, StepExecutionResult, @@ -38,8 +51,12 @@ export { MissingToolCallError, MalformedToolCallError, RecordNotFoundError, + NoRecordsError, + NoReadableFieldsError, + NoResolvedFieldsError, } from './errors'; export { default as BaseStepExecutor } from './executors/base-step-executor'; export { default as ConditionStepExecutor } from './executors/condition-step-executor'; +export { default as ReadRecordStepExecutor } from './executors/read-record-step-executor'; export { default as AgentClientAgentPort } from './adapters/agent-client-agent-port'; export { default as ForestServerWorkflowPort } from './adapters/forest-server-workflow-port'; diff --git a/packages/workflow-executor/src/ports/agent-port.ts b/packages/workflow-executor/src/ports/agent-port.ts index 6a588f1f2..a0964e250 100644 --- a/packages/workflow-executor/src/ports/agent-port.ts +++ b/packages/workflow-executor/src/ports/agent-port.ts @@ -1,9 +1,13 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ -import type { ActionRef, RecordData } from '../types/record'; +import type { RecordData } from '../types/record'; export interface AgentPort { - getRecord(collectionName: string, recordId: Array): Promise; + getRecord( + collectionName: string, + recordId: Array, + fieldNames?: string[], + ): Promise; updateRecord( collectionName: string, recordId: Array, @@ -14,7 +18,6 @@ export interface AgentPort { recordId: Array, relationName: string, ): Promise; - getActions(collectionName: string): Promise; executeAction( collectionName: string, actionName: string, diff --git a/packages/workflow-executor/src/ports/run-store.ts b/packages/workflow-executor/src/ports/run-store.ts index 212ab1408..6b899da84 100644 --- a/packages/workflow-executor/src/ports/run-store.ts +++ b/packages/workflow-executor/src/ports/run-store.ts @@ -1,13 +1,8 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ -import type { RecordData } from '../types/record'; import type { StepExecutionData } from '../types/step-execution-data'; export interface RunStore { - getRecords(): Promise; - getRecord(collectionName: string, recordId: string): Promise; - saveRecord(record: RecordData): Promise; getStepExecutions(): Promise; - getStepExecution(stepIndex: number): Promise; saveStepExecution(stepExecution: StepExecutionData): Promise; } diff --git a/packages/workflow-executor/src/ports/workflow-port.ts b/packages/workflow-executor/src/ports/workflow-port.ts index 93951f6f0..392473a95 100644 --- a/packages/workflow-executor/src/ports/workflow-port.ts +++ b/packages/workflow-executor/src/ports/workflow-port.ts @@ -1,15 +1,15 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ import type { PendingStepExecution } from '../types/execution'; -import type { CollectionRef } from '../types/record'; -import type { StepHistory } from '../types/step-history'; +import type { CollectionSchema } from '../types/record'; +import type { StepOutcome } from '../types/step-outcome'; /** Placeholder -- will be typed as McpConfiguration from @forestadmin/ai-proxy/mcp-client once added as dependency. */ export type McpConfiguration = unknown; export interface WorkflowPort { getPendingStepExecutions(): Promise; - updateStepExecution(runId: string, stepHistory: StepHistory): Promise; - getCollectionRef(collectionName: string): Promise; + updateStepExecution(runId: string, stepOutcome: StepOutcome): Promise; + getCollectionSchema(collectionName: string): Promise; getMcpServerConfigs(): Promise; } diff --git a/packages/workflow-executor/src/types/execution.ts b/packages/workflow-executor/src/types/execution.ts index d2524403c..406d1e4f0 100644 --- a/packages/workflow-executor/src/types/execution.ts +++ b/packages/workflow-executor/src/types/execution.ts @@ -1,39 +1,44 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ -import type { CollectionRef } from './record'; +import type { RecordRef } from './record'; import type { StepDefinition } from './step-definition'; -import type { StepHistory } from './step-history'; +import type { StepOutcome } from './step-outcome'; import type { AgentPort } from '../ports/agent-port'; import type { RunStore } from '../ports/run-store'; import type { WorkflowPort } from '../ports/workflow-port'; import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; -export interface StepRecord { - step: StepDefinition; - stepHistory: StepHistory; +export interface Step { + stepDefinition: StepDefinition; + stepOutcome: StepOutcome; } export type UserInput = { type: 'confirmation'; confirmed: boolean }; export interface PendingStepExecution { readonly runId: string; - readonly step: StepDefinition; - readonly stepHistory: StepHistory; - readonly previousSteps: ReadonlyArray; - readonly availableRecords: ReadonlyArray; + readonly stepId: string; + readonly stepIndex: number; + readonly baseRecordRef: RecordRef; + readonly stepDefinition: StepDefinition; + readonly previousSteps: ReadonlyArray; readonly userInput?: UserInput; } export interface StepExecutionResult { - stepHistory: StepHistory; + stepOutcome: StepOutcome; } -export interface ExecutionContext { +export interface ExecutionContext { readonly runId: string; + readonly stepId: string; + readonly stepIndex: number; + readonly baseRecordRef: RecordRef; + readonly stepDefinition: TStep; readonly model: BaseChatModel; readonly agentPort: AgentPort; readonly workflowPort: WorkflowPort; readonly runStore: RunStore; - readonly history: ReadonlyArray>; + readonly history: ReadonlyArray>; readonly remoteTools: readonly unknown[]; } diff --git a/packages/workflow-executor/src/types/record.ts b/packages/workflow-executor/src/types/record.ts index 14064fcb1..b5070c39f 100644 --- a/packages/workflow-executor/src/types/record.ts +++ b/packages/workflow-executor/src/types/record.ts @@ -1,27 +1,35 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ -export interface RecordFieldRef { +// -- Schema types (structure of a collection — source: WorkflowPort) -- + +export interface FieldSchema { fieldName: string; displayName: string; - type: string; isRelationship: boolean; - referencedCollectionName?: string; } -export interface ActionRef { +export interface ActionSchema { name: string; displayName: string; } -export interface CollectionRef { +export interface CollectionSchema { collectionName: string; collectionDisplayName: string; primaryKeyFields: string[]; - fields: RecordFieldRef[]; - actions: ActionRef[]; + fields: FieldSchema[]; + actions: ActionSchema[]; } -export interface RecordData extends CollectionRef { +// -- Record types (data — source: AgentPort/RunStore) -- + +/** Lightweight pointer to a specific record. */ +export interface RecordRef { + collectionName: string; recordId: Array; - values: Record; + /** Index of the workflow step that loaded this record. */ + stepIndex: number; } + +/** A record with its loaded field values — no stepIndex (agent doesn't know about steps). */ +export type RecordData = Omit & { values: Record }; diff --git a/packages/workflow-executor/src/types/step-definition.ts b/packages/workflow-executor/src/types/step-definition.ts index dffae8c31..ca23e5b41 100644 --- a/packages/workflow-executor/src/types/step-definition.ts +++ b/packages/workflow-executor/src/types/step-definition.ts @@ -9,34 +9,22 @@ export enum StepType { } interface BaseStepDefinition { - id: string; type: StepType; + prompt?: string; aiConfigName?: string; } export interface ConditionStepDefinition extends BaseStepDefinition { type: StepType.Condition; options: [string, ...string[]]; - prompt?: string; } export interface AiTaskStepDefinition extends BaseStepDefinition { - type: - | StepType.ReadRecord - | StepType.UpdateRecord - | StepType.TriggerAction - | StepType.LoadRelatedRecord; + type: Exclude; recordSourceStepId?: string; - prompt?: string; automaticCompletion?: boolean; allowedTools?: string[]; remoteToolsSourceId?: string; } export type StepDefinition = ConditionStepDefinition | AiTaskStepDefinition; - -/** - * Coarse categorization of steps. StepType has 5 fine-grained values; - * StepCategory collapses the 4 non-condition types into 'ai-task'. - */ -export type StepCategory = 'condition' | 'ai-task'; diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index e2d46eaf4..eb022a273 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -1,23 +1,78 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ -import type { CollectionRef } from './record'; +import type { RecordRef } from './record'; + +// -- Base -- interface BaseStepExecutionData { stepIndex: number; } +// -- Condition -- + export interface ConditionStepExecutionData extends BaseStepExecutionData { type: 'condition'; - executionParams?: { answer: string | null; reasoning?: string }; - executionResult?: { answer: string }; + executionParams: { answer: string | null; reasoning?: string }; + executionResult: { answer: string }; +} + +// -- Read Record -- + +interface FieldReadBase { + fieldName: string; + displayName: string; +} + +export interface FieldReadSuccess extends FieldReadBase { + value: unknown; +} + +export interface FieldReadError extends FieldReadBase { + error: string; +} + +export type FieldReadResult = FieldReadSuccess | FieldReadError; + +export interface ReadRecordStepExecutionData extends BaseStepExecutionData { + type: 'read-record'; + executionParams: { fieldNames: string[] }; + executionResult: { fields: FieldReadResult[] }; + selectedRecordRef: RecordRef; } +// -- Generic AI Task (fallback for untyped steps) -- + export interface AiTaskStepExecutionData extends BaseStepExecutionData { type: 'ai-task'; executionParams?: Record; executionResult?: Record; toolConfirmationInterruption?: Record; - selectedRecord?: CollectionRef; } -export type StepExecutionData = ConditionStepExecutionData | AiTaskStepExecutionData; +// -- Load Related Record -- + +export interface LoadRelatedRecordStepExecutionData extends BaseStepExecutionData { + type: 'load-related-record'; + record: RecordRef; +} + +// -- Union -- + +export type StepExecutionData = + | ConditionStepExecutionData + | ReadRecordStepExecutionData + | AiTaskStepExecutionData + | LoadRelatedRecordStepExecutionData; + +export type ExecutedStepExecutionData = + | ConditionStepExecutionData + | ReadRecordStepExecutionData + | AiTaskStepExecutionData; + +// TODO: this condition should change when load-related-record gets its own executor +// and produces executionParams/executionResult like other steps. +export function isExecutedStepOnExecutor( + data: StepExecutionData | undefined, +): data is ExecutedStepExecutionData { + return !!data && data.type !== 'load-related-record'; +} diff --git a/packages/workflow-executor/src/types/step-history.ts b/packages/workflow-executor/src/types/step-outcome.ts similarity index 77% rename from packages/workflow-executor/src/types/step-history.ts rename to packages/workflow-executor/src/types/step-outcome.ts index bf9b66b61..9a564748e 100644 --- a/packages/workflow-executor/src/types/step-history.ts +++ b/packages/workflow-executor/src/types/step-outcome.ts @@ -12,27 +12,27 @@ export type AiTaskStepStatus = BaseStepStatus | 'awaiting-input'; export type StepStatus = ConditionStepStatus | AiTaskStepStatus; /** - * StepHistory is sent to the orchestrator — it must NEVER contain client data. + * StepOutcome is sent to the orchestrator — it must NEVER contain client data. * Any privacy-sensitive information (e.g. AI reasoning) must stay in * StepExecutionData (persisted in the RunStore, client-side only). */ -interface BaseStepHistory { +interface BaseStepOutcome { stepId: string; stepIndex: number; /** Present when status is 'error'. */ error?: string; } -export interface ConditionStepHistory extends BaseStepHistory { +export interface ConditionStepOutcome extends BaseStepOutcome { type: 'condition'; status: ConditionStepStatus; /** Present when status is 'success'. */ selectedOption?: string; } -export interface AiTaskStepHistory extends BaseStepHistory { +export interface AiTaskStepOutcome extends BaseStepOutcome { type: 'ai-task'; status: AiTaskStepStatus; } -export type StepHistory = ConditionStepHistory | AiTaskStepHistory; +export type StepOutcome = ConditionStepOutcome | AiTaskStepOutcome; diff --git a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts index 878990787..b564eeaf5 100644 --- a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts +++ b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts @@ -1,4 +1,4 @@ -import type { CollectionRef } from '../../src/types/record'; +import type { CollectionSchema } from '../../src/types/record'; import type { RemoteAgentClient } from '@forestadmin/agent-client'; import AgentClientAgentPort from '../../src/adapters/agent-client-agent-port'; @@ -26,7 +26,7 @@ describe('AgentClientAgentPort', () => { let mockCollection: ReturnType['mockCollection']; let mockRelation: ReturnType['mockRelation']; let mockAction: ReturnType['mockAction']; - let collectionRefs: Record; + let collectionSchemas: Record; let port: AgentClientAgentPort; beforeEach(() => { @@ -34,14 +34,14 @@ describe('AgentClientAgentPort', () => { ({ client, mockCollection, mockRelation, mockAction } = createMockClient()); - collectionRefs = { + collectionSchemas = { users: { collectionName: 'users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ - { fieldName: 'id', displayName: 'id', type: 'Number', isRelationship: false }, - { fieldName: 'name', displayName: 'name', type: 'String', isRelationship: false }, + { fieldName: 'id', displayName: 'id', isRelationship: false }, + { fieldName: 'name', displayName: 'name', isRelationship: false }, ], actions: [ { name: 'sendEmail', displayName: 'Send Email' }, @@ -53,8 +53,8 @@ describe('AgentClientAgentPort', () => { collectionDisplayName: 'Orders', primaryKeyFields: ['tenantId', 'orderId'], fields: [ - { fieldName: 'tenantId', displayName: 'Tenant', type: 'Number', isRelationship: false }, - { fieldName: 'orderId', displayName: 'Order', type: 'Number', isRelationship: false }, + { fieldName: 'tenantId', displayName: 'Tenant', isRelationship: false }, + { fieldName: 'orderId', displayName: 'Order', isRelationship: false }, ], actions: [], }, @@ -63,14 +63,14 @@ describe('AgentClientAgentPort', () => { collectionDisplayName: 'Posts', primaryKeyFields: ['id'], fields: [ - { fieldName: 'id', displayName: 'id', type: 'Number', isRelationship: false }, - { fieldName: 'title', displayName: 'title', type: 'String', isRelationship: false }, + { fieldName: 'id', displayName: 'id', isRelationship: false }, + { fieldName: 'title', displayName: 'title', isRelationship: false }, ], actions: [], }, }; - port = new AgentClientAgentPort({ client, collectionRefs }); + port = new AgentClientAgentPort({ client, collectionSchemas }); }); describe('getRecord', () => { @@ -84,12 +84,8 @@ describe('AgentClientAgentPort', () => { pagination: { size: 1, number: 1 }, }); expect(result).toEqual({ - recordId: [42], collectionName: 'users', - collectionDisplayName: 'Users', - primaryKeyFields: ['id'], - fields: collectionRefs.users.fields, - actions: collectionRefs.users.actions, + recordId: [42], values: { id: 42, name: 'Alice' }, }); }); @@ -117,6 +113,40 @@ describe('AgentClientAgentPort', () => { await expect(port.getRecord('users', [999])).rejects.toThrow(RecordNotFoundError); }); + it('should pass fields to list when fieldNames is provided', async () => { + mockCollection.list.mockResolvedValue([{ id: 42, name: 'Alice' }]); + + await port.getRecord('users', [42], ['id', 'name']); + + expect(mockCollection.list).toHaveBeenCalledWith({ + filters: { field: 'id', operator: 'Equal', value: 42 }, + pagination: { size: 1, number: 1 }, + fields: ['id', 'name'], + }); + }); + + it('should not pass fields to list when fieldNames is an empty array', async () => { + mockCollection.list.mockResolvedValue([{ id: 42, name: 'Alice' }]); + + await port.getRecord('users', [42], []); + + expect(mockCollection.list).toHaveBeenCalledWith({ + filters: { field: 'id', operator: 'Equal', value: 42 }, + pagination: { size: 1, number: 1 }, + }); + }); + + it('should not pass fields to list when fieldNames is undefined', async () => { + mockCollection.list.mockResolvedValue([{ id: 42, name: 'Alice' }]); + + await port.getRecord('users', [42]); + + expect(mockCollection.list).toHaveBeenCalledWith({ + filters: { field: 'id', operator: 'Equal', value: 42 }, + pagination: { size: 1, number: 1 }, + }); + }); + it('should fallback to pk field "id" when collection is unknown', async () => { mockCollection.list.mockResolvedValue([{ id: 1 }]); @@ -128,7 +158,6 @@ describe('AgentClientAgentPort', () => { }), ); expect(result.collectionName).toBe('unknown'); - expect(result.fields).toEqual([]); }); }); @@ -140,12 +169,8 @@ describe('AgentClientAgentPort', () => { expect(mockCollection.update).toHaveBeenCalledWith('42', { name: 'Bob' }); expect(result).toEqual({ - recordId: [42], collectionName: 'users', - collectionDisplayName: 'Users', - primaryKeyFields: ['id'], - fields: collectionRefs.users.fields, - actions: collectionRefs.users.actions, + recordId: [42], values: { id: 42, name: 'Bob' }, }); }); @@ -171,27 +196,19 @@ describe('AgentClientAgentPort', () => { expect(mockCollection.relation).toHaveBeenCalledWith('posts', '42'); expect(result).toEqual([ { - recordId: [10], collectionName: 'posts', - collectionDisplayName: 'Posts', - primaryKeyFields: ['id'], - fields: collectionRefs.posts.fields, - actions: collectionRefs.posts.actions, + recordId: [10], values: { id: 10, title: 'Post A' }, }, { - recordId: [11], collectionName: 'posts', - collectionDisplayName: 'Posts', - primaryKeyFields: ['id'], - fields: collectionRefs.posts.fields, - actions: collectionRefs.posts.actions, + recordId: [11], values: { id: 11, title: 'Post B' }, }, ]); }); - it('should fallback to relationName when no CollectionRef exists', async () => { + it('should fallback to relationName when no CollectionSchema exists', async () => { mockRelation.list.mockResolvedValue([{ id: 1 }]); const result = await port.getRelatedData('users', [42], 'unknownRelation'); @@ -207,19 +224,6 @@ describe('AgentClientAgentPort', () => { }); }); - describe('getActions', () => { - it('should return ActionRef[] from CollectionRef', async () => { - expect(await port.getActions('users')).toEqual([ - { name: 'sendEmail', displayName: 'Send Email' }, - { name: 'archive', displayName: 'Archive' }, - ]); - }); - - it('should return an empty array for an unknown collection', async () => { - expect(await port.getActions('unknown')).toEqual([]); - }); - }); - describe('executeAction', () => { it('should encode recordIds to pipe format and call execute', async () => { mockAction.execute.mockResolvedValue({ success: 'done' }); diff --git a/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts b/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts index ff37147e7..9e69a04ea 100644 --- a/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts +++ b/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts @@ -1,6 +1,6 @@ import type { PendingStepExecution } from '../../src/types/execution'; -import type { CollectionRef } from '../../src/types/record'; -import type { StepHistory } from '../../src/types/step-history'; +import type { CollectionSchema } from '../../src/types/record'; +import type { StepOutcome } from '../../src/types/step-outcome'; import { ServerUtils } from '@forestadmin/forestadmin-client'; @@ -39,9 +39,9 @@ describe('ForestServerWorkflowPort', () => { }); describe('updateStepExecution', () => { - it('should post step history to the complete route', async () => { + it('should post step outcome to the complete route', async () => { mockQuery.mockResolvedValue(undefined); - const stepHistory: StepHistory = { + const stepOutcome: StepOutcome = { type: 'condition', stepId: 'step-1', stepIndex: 0, @@ -49,33 +49,33 @@ describe('ForestServerWorkflowPort', () => { selectedOption: 'optionA', }; - await port.updateStepExecution('run-42', stepHistory); + await port.updateStepExecution('run-42', stepOutcome); expect(mockQuery).toHaveBeenCalledWith( options, 'post', '/liana/v1/workflow-step-executions/run-42/complete', {}, - stepHistory, + stepOutcome, ); }); }); - describe('getCollectionRef', () => { - it('should fetch the collection ref by name', async () => { - const collectionRef: CollectionRef = { + describe('getCollectionSchema', () => { + it('should fetch the collection schema by name', async () => { + const collectionSchema: CollectionSchema = { collectionName: 'users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [], actions: [], }; - mockQuery.mockResolvedValue(collectionRef); + mockQuery.mockResolvedValue(collectionSchema); - const result = await port.getCollectionRef('users'); + const result = await port.getCollectionSchema('users'); expect(mockQuery).toHaveBeenCalledWith(options, 'get', '/liana/v1/collections/users'); - expect(result).toEqual(collectionRef); + expect(result).toEqual(collectionSchema); }); }); diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 73f5e716b..4d79c03cf 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -1,8 +1,9 @@ import type { RunStore } from '../../src/ports/run-store'; import type { ExecutionContext, StepExecutionResult } from '../../src/types/execution'; +import type { RecordRef } from '../../src/types/record'; import type { StepDefinition } from '../../src/types/step-definition'; import type { StepExecutionData } from '../../src/types/step-execution-data'; -import type { StepHistory } from '../../src/types/step-history'; +import type { StepOutcome } from '../../src/types/step-outcome'; import type { BaseMessage, SystemMessage } from '@langchain/core/messages'; import type { DynamicStructuredTool } from '@langchain/core/tools'; @@ -30,15 +31,14 @@ class TestableExecutor extends BaseStepExecutor { function makeHistoryEntry( overrides: { stepId?: string; stepIndex?: number; prompt?: string } = {}, -): { step: StepDefinition; stepHistory: StepHistory } { +): { stepDefinition: StepDefinition; stepOutcome: StepOutcome } { return { - step: { - id: overrides.stepId ?? 'step-1', + stepDefinition: { type: StepType.Condition, options: ['A', 'B'], prompt: overrides.prompt ?? 'Pick one', }, - stepHistory: { + stepOutcome: { type: 'condition', stepId: overrides.stepId ?? 'step-1', stepIndex: overrides.stepIndex ?? 0, @@ -49,11 +49,7 @@ function makeHistoryEntry( function makeMockRunStore(stepExecutions: StepExecutionData[] = []): RunStore { return { - getRecords: jest.fn().mockResolvedValue([]), - getRecord: jest.fn().mockResolvedValue(null), - saveRecord: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue(stepExecutions), - getStepExecution: jest.fn().mockResolvedValue(null), saveStepExecution: jest.fn().mockResolvedValue(undefined), }; } @@ -61,6 +57,18 @@ function makeMockRunStore(stepExecutions: StepExecutionData[] = []): RunStore { function makeContext(overrides: Partial = {}): ExecutionContext { return { runId: 'run-1', + stepId: 'step-0', + stepIndex: 0, + baseRecordRef: { + collectionName: 'customers', + recordId: [1], + stepIndex: 0, + } as RecordRef, + stepDefinition: { + type: StepType.Condition, + options: ['A', 'B'], + prompt: 'Pick one', + }, model: {} as ExecutionContext['model'], agentPort: {} as ExecutionContext['agentPort'], workflowPort: {} as ExecutionContext['workflowPort'], @@ -100,22 +108,24 @@ describe('BaseStepExecutor', () => { expect(result).toContain('Step "cond-1"'); expect(result).toContain('Prompt: Approve?'); - expect(result).toContain('Result: {"answer":"Yes","reasoning":"Order is valid"}'); + expect(result).toContain('Input: {"answer":"Yes","reasoning":"Order is valid"}'); + expect(result).toContain('Output: {"answer":"Yes"}'); }); - it('falls back to History when step has no executionParams in RunStore', async () => { + it('uses Input for matched steps and History for unmatched steps', async () => { const executor = new TestableExecutor( makeContext({ history: [ makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0 }), makeHistoryEntry({ stepId: 'cond-2', stepIndex: 1, prompt: 'Second?' }), ], + // Only step 1 has an execution entry — step 0 has no match runStore: makeMockRunStore([ - { type: 'condition', stepIndex: 0 }, { type: 'condition', stepIndex: 1, executionParams: { answer: 'No', reasoning: 'Clearly no' }, + executionResult: { answer: 'No' }, }, ]), }), @@ -128,7 +138,8 @@ describe('BaseStepExecutor', () => { expect(result).toContain('Step "cond-1"'); expect(result).toContain('History: {"status":"success"}'); expect(result).toContain('Step "cond-2"'); - expect(result).toContain('Result: {"answer":"No","reasoning":"Clearly no"}'); + expect(result).toContain('Input: {"answer":"No","reasoning":"Clearly no"}'); + expect(result).toContain('Output: {"answer":"No"}'); }); it('falls back to History when no matching step execution in RunStore', async () => { @@ -143,6 +154,7 @@ describe('BaseStepExecutor', () => { type: 'condition', stepIndex: 1, executionParams: { answer: 'B', reasoning: 'Option B fits' }, + executionResult: { answer: 'B' }, }, ]), }), @@ -155,7 +167,8 @@ describe('BaseStepExecutor', () => { expect(result).toContain('Step "orphan"'); expect(result).toContain('History: {"status":"success"}'); expect(result).toContain('Step "matched"'); - expect(result).toContain('Result: {"answer":"B","reasoning":"Option B fits"}'); + expect(result).toContain('Input: {"answer":"B","reasoning":"Option B fits"}'); + expect(result).toContain('Output: {"answer":"B"}'); }); it('includes selectedOption in History for condition steps', async () => { @@ -164,7 +177,7 @@ describe('BaseStepExecutor', () => { stepIndex: 0, prompt: 'Approved?', }); - (entry.stepHistory as { selectedOption?: string }).selectedOption = 'Yes'; + (entry.stepOutcome as { selectedOption?: string }).selectedOption = 'Yes'; const executor = new TestableExecutor( makeContext({ @@ -187,8 +200,8 @@ describe('BaseStepExecutor', () => { stepIndex: 0, prompt: 'Do something', }); - entry.stepHistory.status = 'error'; - (entry.stepHistory as { error?: string }).error = 'AI could not match an option'; + entry.stepOutcome.status = 'error'; + (entry.stepOutcome as { error?: string }).error = 'AI could not match an option'; const executor = new TestableExecutor( makeContext({ @@ -206,9 +219,17 @@ describe('BaseStepExecutor', () => { }); it('includes status in History for ai-task steps without RunStore data', async () => { - const entry: { step: StepDefinition; stepHistory: StepHistory } = { - step: { id: 'ai-step', type: StepType.ReadRecord, prompt: 'Run task' }, - stepHistory: { type: 'ai-task', stepId: 'ai-step', stepIndex: 0, status: 'awaiting-input' }, + const entry: { stepDefinition: StepDefinition; stepOutcome: StepOutcome } = { + stepDefinition: { + type: StepType.ReadRecord, + prompt: 'Run task', + }, + stepOutcome: { + type: 'ai-task', + stepId: 'ai-step', + stepIndex: 0, + status: 'awaiting-input', + }, }; const executor = new TestableExecutor( @@ -226,17 +247,25 @@ describe('BaseStepExecutor', () => { expect(result).toContain('History: {"status":"awaiting-input"}'); }); - it('uses Result when RunStore has executionParams, History otherwise', async () => { + it('uses Input when RunStore has executionParams, History otherwise', async () => { const condEntry = makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0, prompt: 'Approved?', }); - (condEntry.stepHistory as { selectedOption?: string }).selectedOption = 'Yes'; - - const aiEntry: { step: StepDefinition; stepHistory: StepHistory } = { - step: { id: 'read-customer', type: StepType.ReadRecord, prompt: 'Read name' }, - stepHistory: { type: 'ai-task', stepId: 'read-customer', stepIndex: 1, status: 'success' }, + (condEntry.stepOutcome as { selectedOption?: string }).selectedOption = 'Yes'; + + const aiEntry: { stepDefinition: StepDefinition; stepOutcome: StepOutcome } = { + stepDefinition: { + type: StepType.ReadRecord, + prompt: 'Read name', + }, + stepOutcome: { + type: 'ai-task', + stepId: 'read-customer', + stepIndex: 1, + status: 'success', + }, }; const executor = new TestableExecutor( @@ -259,12 +288,12 @@ describe('BaseStepExecutor', () => { expect(result).toContain('Step "cond-1"'); expect(result).toContain('History: {"status":"success","selectedOption":"Yes"}'); expect(result).toContain('Step "read-customer"'); - expect(result).toContain('Result: {"answer":"John Doe"}'); + expect(result).toContain('Input: {"answer":"John Doe"}'); }); - it('prefers RunStore executionParams over History fallback', async () => { + it('prefers RunStore execution data over History fallback', async () => { const entry = makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0, prompt: 'Pick one' }); - (entry.stepHistory as { selectedOption?: string }).selectedOption = 'A'; + (entry.stepOutcome as { selectedOption?: string }).selectedOption = 'A'; const executor = new TestableExecutor( makeContext({ @@ -274,6 +303,7 @@ describe('BaseStepExecutor', () => { type: 'condition', stepIndex: 0, executionParams: { answer: 'A', reasoning: 'Best fit' }, + executionResult: { answer: 'A' }, }, ]), }), @@ -283,13 +313,49 @@ describe('BaseStepExecutor', () => { .buildPreviousStepsMessages() .then(msgs => msgs[0]?.content ?? ''); - expect(result).toContain('Result: {"answer":"A","reasoning":"Best fit"}'); + expect(result).toContain('Input: {"answer":"A","reasoning":"Best fit"}'); + expect(result).toContain('Output: {"answer":"A"}'); expect(result).not.toContain('History:'); }); + it('omits Input line when executionParams is undefined', async () => { + const entry: { stepDefinition: StepDefinition; stepOutcome: StepOutcome } = { + stepDefinition: { + type: StepType.ReadRecord, + prompt: 'Do something', + }, + stepOutcome: { + type: 'ai-task', + stepId: 'ai-step', + stepIndex: 0, + status: 'success', + }, + }; + + const executor = new TestableExecutor( + makeContext({ + history: [entry], + runStore: makeMockRunStore([ + { + type: 'ai-task', + stepIndex: 0, + }, + ]), + }), + ); + + const result = await executor + .buildPreviousStepsMessages() + .then(msgs => msgs[0]?.content ?? ''); + + expect(result).toContain('Step "ai-step"'); + expect(result).toContain('Prompt: Do something'); + expect(result).not.toContain('Input:'); + }); + it('shows "(no prompt)" when step has no prompt', async () => { const entry = makeHistoryEntry({ stepIndex: 0 }); - entry.step.prompt = undefined; + entry.stepDefinition.prompt = undefined; const executor = new TestableExecutor( makeContext({ @@ -299,6 +365,7 @@ describe('BaseStepExecutor', () => { type: 'condition', stepIndex: 0, executionParams: { answer: 'A', reasoning: 'Only option' }, + executionResult: { answer: 'A' }, }, ]), }), diff --git a/packages/workflow-executor/test/executors/condition-step-executor.test.ts b/packages/workflow-executor/test/executors/condition-step-executor.test.ts index ba7fe7f34..b41cf35bc 100644 --- a/packages/workflow-executor/test/executors/condition-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/condition-step-executor.test.ts @@ -1,14 +1,14 @@ import type { RunStore } from '../../src/ports/run-store'; import type { ExecutionContext } from '../../src/types/execution'; +import type { RecordRef } from '../../src/types/record'; import type { ConditionStepDefinition } from '../../src/types/step-definition'; -import type { ConditionStepHistory } from '../../src/types/step-history'; +import type { ConditionStepOutcome } from '../../src/types/step-outcome'; import ConditionStepExecutor from '../../src/executors/condition-step-executor'; import { StepType } from '../../src/types/step-definition'; function makeStep(overrides: Partial = {}): ConditionStepDefinition { return { - id: 'cond-1', type: StepType.Condition, options: ['Approve', 'Reject'], prompt: 'Should we approve this?', @@ -16,23 +16,9 @@ function makeStep(overrides: Partial = {}): ConditionSt }; } -function makeStepHistory(overrides: Partial = {}): ConditionStepHistory { - return { - type: 'condition', - stepId: 'cond-1', - stepIndex: 0, - status: 'success', - ...overrides, - }; -} - function makeMockRunStore(overrides: Partial = {}): RunStore { return { - getRecords: jest.fn().mockResolvedValue([]), - getRecord: jest.fn().mockResolvedValue(null), - saveRecord: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), - getStepExecution: jest.fn().mockResolvedValue(null), saveStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; @@ -50,9 +36,19 @@ function makeMockModel(toolCallArgs?: Record) { return { model, bindTools, invoke }; } -function makeContext(overrides: Partial = {}): ExecutionContext { +function makeContext( + overrides: Partial> = {}, +): ExecutionContext { return { runId: 'run-1', + stepId: 'cond-1', + stepIndex: 0, + baseRecordRef: { + collectionName: 'customers', + recordId: [1], + stepIndex: 0, + } as RecordRef, + stepDefinition: makeStep(), model: makeMockModel().model, agentPort: {} as ExecutionContext['agentPort'], workflowPort: {} as ExecutionContext['workflowPort'], @@ -64,24 +60,6 @@ function makeContext(overrides: Partial = {}): ExecutionContex } describe('ConditionStepExecutor', () => { - describe('immutability', () => { - it('does not mutate the input stepHistory', async () => { - const mockModel = makeMockModel({ - option: 'Reject', - reasoning: 'Incomplete', - question: 'Approve?', - }); - const stepHistory = makeStepHistory(); - const executor = new ConditionStepExecutor(makeContext({ model: mockModel.model })); - - const result = await executor.execute(makeStep(), stepHistory); - - expect(result.stepHistory).not.toBe(stepHistory); - expect(stepHistory.status).toBe('success'); - expect(stepHistory.selectedOption).toBeUndefined(); - }); - }); - describe('AI decision', () => { it('calls AI and returns selected option on success', async () => { const mockModel = makeMockModel({ @@ -96,10 +74,10 @@ describe('ConditionStepExecutor', () => { }); const executor = new ConditionStepExecutor(context); - const result = await executor.execute(makeStep(), makeStepHistory()); + const result = await executor.execute(); - expect(result.stepHistory.status).toBe('success'); - expect((result.stepHistory as ConditionStepHistory).selectedOption).toBe('Reject'); + expect(result.stepOutcome.status).toBe('success'); + expect((result.stepOutcome as ConditionStepOutcome).selectedOption).toBe('Reject'); expect(mockModel.bindTools).toHaveBeenCalledWith( [expect.objectContaining({ name: 'choose-gateway-option' })], @@ -120,13 +98,15 @@ describe('ConditionStepExecutor', () => { reasoning: 'Looks good', question: 'Should we?', }); - const executor = new ConditionStepExecutor(makeContext({ model: mockModel.model })); - - await executor.execute( - makeStep({ options: ['Approve', 'Reject', 'Defer'] }), - makeStepHistory(), + const executor = new ConditionStepExecutor( + makeContext({ + model: mockModel.model, + stepDefinition: makeStep({ options: ['Approve', 'Reject', 'Defer'] }), + }), ); + await executor.execute(); + const tool = mockModel.bindTools.mock.calls[0][0][0]; expect(tool.name).toBe('choose-gateway-option'); expect(tool.schema.parse({ option: 'Approve', reasoning: 'r', question: 'q' })).toBeTruthy(); @@ -143,13 +123,13 @@ describe('ConditionStepExecutor', () => { reasoning: 'Looks good', question: 'Should we approve?', }); - const context = makeContext({ model: mockModel.model }); + const context = makeContext({ + model: mockModel.model, + stepDefinition: makeStep({ prompt: 'Custom prompt for this step' }), + }); const executor = new ConditionStepExecutor(context); - await executor.execute( - makeStep({ prompt: 'Custom prompt for this step' }), - makeStepHistory(), - ); + await executor.execute(); const messages = mockModel.invoke.mock.calls[0][0]; expect(messages).toHaveLength(2); @@ -164,10 +144,13 @@ describe('ConditionStepExecutor', () => { reasoning: 'Default', question: 'Approve?', }); - const context = makeContext({ model: mockModel.model }); + const context = makeContext({ + model: mockModel.model, + stepDefinition: makeStep({ prompt: undefined }), + }); const executor = new ConditionStepExecutor(context); - await executor.execute(makeStep({ prompt: undefined }), makeStepHistory()); + await executor.execute(); const messages = mockModel.invoke.mock.calls[0][0]; const humanMessage = messages[messages.length - 1]; @@ -181,7 +164,6 @@ describe('ConditionStepExecutor', () => { question: 'Final approval?', }); const runStore = makeMockRunStore({ - getStepExecution: jest.fn().mockResolvedValue(null), getStepExecutions: jest.fn().mockResolvedValue([ { type: 'condition', @@ -195,13 +177,12 @@ describe('ConditionStepExecutor', () => { runStore, history: [ { - step: { - id: 'prev-step', + stepDefinition: { type: StepType.Condition, options: ['Yes', 'No'], prompt: 'Previous question', }, - stepHistory: { + stepOutcome: { type: 'condition', stepId: 'prev-step', stepIndex: 0, @@ -210,12 +191,13 @@ describe('ConditionStepExecutor', () => { }, ], }); - const executor = new ConditionStepExecutor(context); + const executor = new ConditionStepExecutor({ + ...context, + stepId: 'cond-2', + stepIndex: 1, + }); - await executor.execute( - makeStep({ id: 'cond-2' }), - makeStepHistory({ stepId: 'cond-2', stepIndex: 1 }), - ); + await executor.execute(); const messages = mockModel.invoke.mock.calls[0][0]; expect(messages).toHaveLength(3); @@ -240,11 +222,11 @@ describe('ConditionStepExecutor', () => { }); const executor = new ConditionStepExecutor(context); - const result = await executor.execute(makeStep(), makeStepHistory()); + const result = await executor.execute(); - expect(result.stepHistory.status).toBe('manual-decision'); - expect(result.stepHistory.error).toBeUndefined(); - expect((result.stepHistory as ConditionStepHistory).selectedOption).toBeUndefined(); + expect(result.stepOutcome.status).toBe('manual-decision'); + expect(result.stepOutcome.error).toBeUndefined(); + expect((result.stepOutcome as ConditionStepOutcome).selectedOption).toBeUndefined(); expect(runStore.saveStepExecution).toHaveBeenCalledWith({ type: 'condition', stepIndex: 0, @@ -268,10 +250,10 @@ describe('ConditionStepExecutor', () => { }); const executor = new ConditionStepExecutor(context); - const result = await executor.execute(makeStep(), makeStepHistory()); + const result = await executor.execute(); - expect(result.stepHistory.status).toBe('error'); - expect(result.stepHistory.error).toBe( + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( 'AI returned a malformed tool call for "choose-gateway-option": JSON parse error', ); expect(runStore.saveStepExecution).not.toHaveBeenCalled(); @@ -287,10 +269,10 @@ describe('ConditionStepExecutor', () => { }); const executor = new ConditionStepExecutor(context); - const result = await executor.execute(makeStep(), makeStepHistory()); + const result = await executor.execute(); - expect(result.stepHistory.status).toBe('error'); - expect(result.stepHistory.error).toBe('API timeout'); + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('API timeout'); }); it('lets run store errors propagate', async () => { @@ -304,7 +286,7 @@ describe('ConditionStepExecutor', () => { }); const executor = new ConditionStepExecutor(makeContext({ model: mockModel.model, runStore })); - await expect(executor.execute(makeStep(), makeStepHistory())).rejects.toThrow('Storage full'); + await expect(executor.execute()).rejects.toThrow('Storage full'); }); }); }); diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts new file mode 100644 index 000000000..743560066 --- /dev/null +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -0,0 +1,778 @@ +import type { AgentPort } from '../../src/ports/agent-port'; +import type { RunStore } from '../../src/ports/run-store'; +import type { WorkflowPort } from '../../src/ports/workflow-port'; +import type { ExecutionContext } from '../../src/types/execution'; +import type { CollectionSchema, RecordRef } from '../../src/types/record'; +import type { AiTaskStepDefinition } from '../../src/types/step-definition'; + +import { NoRecordsError, RecordNotFoundError } from '../../src/errors'; +import ReadRecordStepExecutor from '../../src/executors/read-record-step-executor'; +import { StepType } from '../../src/types/step-definition'; + +function makeStep(overrides: Partial = {}): AiTaskStepDefinition { + return { + type: StepType.ReadRecord, + prompt: 'Read the customer email', + ...overrides, + }; +} + +function makeRecordRef(overrides: Partial = {}): RecordRef { + return { + collectionName: 'customers', + recordId: [42], + stepIndex: 0, + ...overrides, + }; +} + +function makeMockAgentPort( + recordsByCollection: Record }> = { + customers: { values: { email: 'john@example.com', name: 'John Doe', orders: null } }, + }, +): AgentPort { + return { + getRecord: jest + .fn() + .mockImplementation((collectionName: string) => + Promise.resolve(recordsByCollection[collectionName] ?? { values: {} }), + ), + updateRecord: jest.fn(), + getRelatedData: jest.fn(), + executeAction: jest.fn(), + } as unknown as AgentPort; +} + +function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { + return { + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [ + { fieldName: 'email', displayName: 'Email', isRelationship: false }, + { fieldName: 'name', displayName: 'Full Name', isRelationship: false }, + { fieldName: 'orders', displayName: 'Orders', isRelationship: true }, + ], + actions: [], + ...overrides, + }; +} + +function makeMockRunStore(overrides: Partial = {}): RunStore { + return { + getStepExecutions: jest.fn().mockResolvedValue([]), + saveStepExecution: jest.fn().mockResolvedValue(undefined), + ...overrides, + }; +} + +function makeMockWorkflowPort( + schemasByCollection: Record = { + customers: makeCollectionSchema(), + }, +): WorkflowPort { + return { + getPendingStepExecutions: jest.fn().mockResolvedValue([]), + updateStepExecution: jest.fn().mockResolvedValue(undefined), + getCollectionSchema: jest + .fn() + .mockImplementation((name: string) => + Promise.resolve( + schemasByCollection[name] ?? makeCollectionSchema({ collectionName: name }), + ), + ), + getMcpServerConfigs: jest.fn().mockResolvedValue([]), + }; +} + +function makeMockModel( + toolCallArgs?: Record, + toolName = 'read-selected-record-fields', +) { + const invoke = jest.fn().mockResolvedValue({ + tool_calls: toolCallArgs ? [{ name: toolName, args: toolCallArgs, id: 'call_1' }] : undefined, + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + return { model, bindTools, invoke }; +} + +function makeContext( + overrides: Partial> = {}, +): ExecutionContext { + return { + runId: 'run-1', + stepId: 'read-1', + stepIndex: 0, + baseRecordRef: makeRecordRef(), + stepDefinition: makeStep(), + model: makeMockModel({ fieldNames: ['email'] }).model, + agentPort: makeMockAgentPort(), + workflowPort: makeMockWorkflowPort(), + runStore: makeMockRunStore(), + history: [], + remoteTools: [], + ...overrides, + }; +} + +describe('ReadRecordStepExecutor', () => { + describe('single record, single field', () => { + it('reads a single field and returns success', async () => { + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore(); + const context = makeContext({ model: mockModel.model, runStore }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'read-record', + stepIndex: 0, + executionParams: { fieldNames: ['email'] }, + executionResult: { + fields: [{ value: 'john@example.com', fieldName: 'email', displayName: 'Email' }], + }, + }), + ); + }); + }); + + describe('single record, multiple fields', () => { + it('reads multiple fields in one call and returns success', async () => { + const mockModel = makeMockModel({ fieldNames: ['email', 'name'] }); + const runStore = makeMockRunStore(); + const context = makeContext({ model: mockModel.model, runStore }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + executionParams: { fieldNames: ['email', 'name'] }, + executionResult: { + fields: [ + { value: 'john@example.com', fieldName: 'email', displayName: 'Email' }, + { value: 'John Doe', fieldName: 'name', displayName: 'Full Name' }, + ], + }, + }), + ); + }); + }); + + describe('field resolution by displayName', () => { + it('resolves fields by displayName', async () => { + const mockModel = makeMockModel({ fieldNames: ['Full Name'] }); + const runStore = makeMockRunStore(); + const context = makeContext({ model: mockModel.model, runStore }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + executionParams: { fieldNames: ['name'] }, + executionResult: { + fields: [{ value: 'John Doe', fieldName: 'name', displayName: 'Full Name' }], + }, + }), + ); + }); + }); + + describe('getRecord receives resolved field names', () => { + it('passes resolved field names (not display names) to getRecord', async () => { + const mockModel = makeMockModel({ fieldNames: ['Full Name', 'Email'] }); + const agentPort = makeMockAgentPort(); + const runStore = makeMockRunStore(); + const context = makeContext({ model: mockModel.model, agentPort, runStore }); + const executor = new ReadRecordStepExecutor(context); + + await executor.execute(); + + expect(agentPort.getRecord).toHaveBeenCalledWith('customers', [42], ['name', 'email']); + }); + + it('passes only resolved field names when some fields are unresolved', async () => { + const mockModel = makeMockModel({ fieldNames: ['Email', 'nonexistent'] }); + const agentPort = makeMockAgentPort(); + const runStore = makeMockRunStore(); + const context = makeContext({ model: mockModel.model, agentPort, runStore }); + const executor = new ReadRecordStepExecutor(context); + + await executor.execute(); + + expect(agentPort.getRecord).toHaveBeenCalledWith('customers', [42], ['email']); + }); + + it('returns error when no fields can be resolved', async () => { + const mockModel = makeMockModel({ fieldNames: ['nonexistent', 'unknown'] }); + const agentPort = makeMockAgentPort(); + const runStore = makeMockRunStore(); + const context = makeContext({ model: mockModel.model, agentPort, runStore }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'None of the requested fields could be resolved: nonexistent, unknown', + ); + expect(agentPort.getRecord).not.toHaveBeenCalled(); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + }); + + describe('field not found', () => { + it('returns error per field without failing globally', async () => { + const mockModel = makeMockModel({ fieldNames: ['email', 'nonexistent'] }); + const runStore = makeMockRunStore(); + const context = makeContext({ model: mockModel.model, runStore }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + executionResult: { + fields: [ + { value: 'john@example.com', fieldName: 'email', displayName: 'Email' }, + { + error: 'Field not found: nonexistent', + fieldName: 'nonexistent', + displayName: 'nonexistent', + }, + ], + }, + }), + ); + }); + }); + + describe('relationship fields excluded', () => { + it('excludes relationship fields from tool schema', async () => { + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore(); + const context = makeContext({ model: mockModel.model, runStore }); + const executor = new ReadRecordStepExecutor(context); + + await executor.execute(); + + const tool = mockModel.bindTools.mock.calls[0][0][0]; + expect(tool.name).toBe('read-selected-record-fields'); + + // Valid field names (displayNames and fieldNames) should be accepted in an array + expect(tool.schema.parse({ fieldNames: ['Email'] })).toBeTruthy(); + expect(tool.schema.parse({ fieldNames: ['Full Name'] })).toBeTruthy(); + expect(tool.schema.parse({ fieldNames: ['email'] })).toBeTruthy(); + expect(tool.schema.parse({ fieldNames: ['email', 'name'] })).toBeTruthy(); + + // Schema accepts any strings (per-field errors handled in readFieldValues, ISO frontend) + expect(tool.schema.parse({ fieldNames: ['Orders'] })).toBeTruthy(); + + // But rejects non-array values + expect(() => tool.schema.parse({ fieldNames: 'email' })).toThrow(); + }); + }); + + describe('no records available', () => { + it('returns error when no records are available', () => { + const error = new NoRecordsError(); + + expect(error).toBeInstanceOf(NoRecordsError); + expect(error.message).toBe('No records available'); + }); + }); + + describe('no readable fields', () => { + it('returns error when all fields are relationships', async () => { + const schema = makeCollectionSchema({ + fields: [{ fieldName: 'orders', displayName: 'Orders', isRelationship: true }], + }); + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore(); + const workflowPort = makeMockWorkflowPort({ customers: schema }); + const context = makeContext({ model: mockModel.model, runStore, workflowPort }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'No readable fields on record from collection "customers"', + ); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + }); + + describe('multi-record AI selection', () => { + it('uses AI to select among multiple records then reads fields', async () => { + const baseRecordRef = makeRecordRef({ stepIndex: 1 }); + const relatedRecord = makeRecordRef({ + stepIndex: 2, + recordId: [99], + collectionName: 'orders', + }); + + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionDisplayName: 'Orders', + fields: [{ fieldName: 'total', displayName: 'Total', isRelationship: false }], + }); + + // First call: select-record, second call: read-selected-record-fields + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'select-record', + args: { recordIdentifier: 'Step 1 - Customers #42' }, + id: 'call_1', + }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'read-selected-record-fields', + args: { fieldNames: ['email'] }, + id: 'call_2', + }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([ + { type: 'load-related-record', stepIndex: 2, record: relatedRecord }, + ]), + }); + const workflowPort = makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }); + const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(bindTools).toHaveBeenCalledTimes(2); + + // First call: select-record tool + const selectTool = bindTools.mock.calls[0][0][0]; + expect(selectTool.name).toBe('select-record'); + + // Second call: read-selected-record-fields tool + const readTool = bindTools.mock.calls[1][0][0]; + expect(readTool.name).toBe('read-selected-record-fields'); + + // Record selection includes previous steps context + system prompt + user prompt + const selectMessages = invoke.mock.calls[0][0]; + expect(selectMessages).toHaveLength(2); + expect(selectMessages[0].content).toContain('selecting the most relevant record'); + expect(selectMessages[1].content).toContain('Read the customer email'); + + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + executionResult: { + fields: [{ value: 'john@example.com', fieldName: 'email', displayName: 'Email' }], + }, + selectedRecordRef: expect.objectContaining({ + recordId: [42], + collectionName: 'customers', + }), + }), + ); + }); + + it('reads fields from the second record when AI selects it', async () => { + const baseRecordRef = makeRecordRef({ stepIndex: 1 }); + const relatedRecord = makeRecordRef({ + stepIndex: 2, + recordId: [99], + collectionName: 'orders', + }); + + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionDisplayName: 'Orders', + fields: [{ fieldName: 'total', displayName: 'Total', isRelationship: false }], + }); + + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'select-record', + args: { recordIdentifier: 'Step 2 - Orders #99' }, + id: 'call_1', + }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'read-selected-record-fields', args: { fieldNames: ['total'] }, id: 'call_2' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([ + { type: 'load-related-record', stepIndex: 2, record: relatedRecord }, + ]), + }); + const workflowPort = makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }); + const agentPort = makeMockAgentPort({ + orders: { values: { total: 150 } }, + }); + const context = makeContext({ baseRecordRef, model, runStore, workflowPort, agentPort }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + executionResult: { + fields: [{ value: 150, fieldName: 'total', displayName: 'Total' }], + }, + selectedRecordRef: expect.objectContaining({ + recordId: [99], + collectionName: 'orders', + }), + }), + ); + }); + + it('includes step index in select-record tool schema when records have stepIndex', async () => { + const baseRecordRef = makeRecordRef({ stepIndex: 3 }); + const relatedRecord = makeRecordRef({ + stepIndex: 5, + recordId: [99], + collectionName: 'orders', + }); + + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionDisplayName: 'Orders', + fields: [{ fieldName: 'total', displayName: 'Total', isRelationship: false }], + }); + + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'select-record', + args: { recordIdentifier: 'Step 3 - Customers #42' }, + id: 'call_1', + }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'read-selected-record-fields', args: { fieldNames: ['email'] }, id: 'call_2' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([ + { type: 'load-related-record', stepIndex: 5, record: relatedRecord }, + ]), + }); + const workflowPort = makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }); + const executor = new ReadRecordStepExecutor( + makeContext({ baseRecordRef, model, runStore, workflowPort }), + ); + + await executor.execute(); + + const selectTool = bindTools.mock.calls[0][0][0]; + const schemaShape = selectTool.schema.shape; + // Enum values should include step-prefixed identifiers + expect(schemaShape.recordIdentifier.options).toEqual([ + 'Step 3 - Customers #42', + 'Step 5 - Orders #99', + ]); + }); + }); + + describe('AI record selection failure', () => { + it('returns error when AI selects a non-existent record identifier', async () => { + const baseRecordRef = makeRecordRef(); + const relatedRecord = makeRecordRef({ + stepIndex: 1, + recordId: [99], + collectionName: 'orders', + }); + + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionDisplayName: 'Orders', + fields: [{ fieldName: 'total', displayName: 'Total', isRelationship: false }], + }); + + const invoke = jest.fn().mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'NonExistent #999' }, id: 'call_1' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([ + { type: 'load-related-record', stepIndex: 1, record: relatedRecord }, + ]), + }); + const workflowPort = makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }); + const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'AI selected record "NonExistent #999" which does not match any available record', + ); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + }); + + describe('agentPort.getRecord error', () => { + it('returns error when agentPort.getRecord throws a WorkflowExecutorError', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.getRecord as jest.Mock).mockRejectedValue( + new RecordNotFoundError('customers', '42'), + ); + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore(); + const context = makeContext({ model: mockModel.model, runStore, agentPort }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('Record not found: collection "customers", id "42"'); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + + it('lets infrastructure errors propagate', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.getRecord as jest.Mock).mockRejectedValue(new Error('Connection refused')); + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const context = makeContext({ model: mockModel.model, agentPort }); + const executor = new ReadRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Connection refused'); + }); + }); + + describe('model error', () => { + it('lets non-WorkflowExecutorError propagate from AI invocation', async () => { + const invoke = jest.fn().mockRejectedValue(new Error('API timeout')); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const context = makeContext({ + model: { bindTools } as unknown as ExecutionContext['model'], + }); + const executor = new ReadRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('API timeout'); + }); + }); + + describe('malformed tool call', () => { + it('returns error status on malformed tool call', async () => { + const invoke = jest.fn().mockResolvedValue({ + tool_calls: [], + invalid_tool_calls: [ + { name: 'read-selected-record-fields', args: '{bad json', error: 'JSON parse error' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: { bindTools } as unknown as ExecutionContext['model'], + runStore, + }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'AI returned a malformed tool call for "read-selected-record-fields": JSON parse error', + ); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + + it('returns error status when AI returns no tool call at all', async () => { + const invoke = jest.fn().mockResolvedValue({ tool_calls: [] }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: { bindTools } as unknown as ExecutionContext['model'], + runStore, + }); + const executor = new ReadRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('AI did not return a tool call'); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + }); + + describe('RunStore error propagation', () => { + it('lets saveStepExecution errors propagate', async () => { + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore({ + saveStepExecution: jest.fn().mockRejectedValue(new Error('Storage full')), + }); + const context = makeContext({ model: mockModel.model, runStore }); + const executor = new ReadRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Storage full'); + }); + + it('lets getStepExecutions errors propagate', async () => { + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockRejectedValue(new Error('Connection lost')), + }); + const context = makeContext({ model: mockModel.model, runStore }); + const executor = new ReadRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Connection lost'); + }); + }); + + describe('previous steps context', () => { + it('includes previous steps summary in read-field messages', async () => { + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'Yes', reasoning: 'Approved' }, + }, + ]), + }); + const context = makeContext({ + model: mockModel.model, + runStore, + history: [ + { + stepDefinition: { + type: StepType.Condition, + options: ['Yes', 'No'], + prompt: 'Should we proceed?', + }, + stepOutcome: { + type: 'condition', + stepId: 'prev-step', + stepIndex: 0, + status: 'success', + }, + }, + ], + }); + const executor = new ReadRecordStepExecutor({ + ...context, + stepId: 'read-2', + stepIndex: 1, + }); + + await executor.execute(); + + const messages = mockModel.invoke.mock.calls[0][0]; + // previous steps summary + system prompt + collection info + human message = 4 + expect(messages).toHaveLength(4); + expect(messages[0].content).toContain('Should we proceed?'); + expect(messages[0].content).toContain('"answer":"Yes"'); + expect(messages[1].content).toContain('reading fields from a record'); + }); + }); + + describe('default prompt', () => { + it('uses default prompt when step.prompt is undefined', async () => { + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const context = makeContext({ + model: mockModel.model, + stepDefinition: makeStep({ prompt: undefined }), + }); + const executor = new ReadRecordStepExecutor(context); + + await executor.execute(); + + const messages = mockModel.invoke.mock.calls[0][0]; + const humanMessage = messages[messages.length - 1]; + expect(humanMessage.content).toBe('**Request**: Read the relevant fields.'); + }); + }); + + describe('saveStepExecution arguments', () => { + it('saves executionParams, executionResult, and selectedRecord', async () => { + const mockModel = makeMockModel({ fieldNames: ['email', 'name'] }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: mockModel.model, + runStore, + stepIndex: 3, + }); + const executor = new ReadRecordStepExecutor(context); + + await executor.execute(); + + expect(runStore.saveStepExecution).toHaveBeenCalledWith({ + type: 'read-record', + stepIndex: 3, + executionParams: { fieldNames: ['email', 'name'] }, + executionResult: { + fields: [ + { value: 'john@example.com', fieldName: 'email', displayName: 'Email' }, + { value: 'John Doe', fieldName: 'name', displayName: 'Full Name' }, + ], + }, + selectedRecordRef: { + collectionName: 'customers', + recordId: [42], + stepIndex: 0, + }, + }); + }); + }); +});