diff --git a/packages/workflow-executor/package.json b/packages/workflow-executor/package.json index 3c838da93..fafc832c5 100644 --- a/packages/workflow-executor/package.json +++ b/packages/workflow-executor/package.json @@ -23,6 +23,7 @@ "test": "jest" }, "dependencies": { + "@forestadmin/agent-client": "1.4.13", "@langchain/core": "1.1.33", "zod": "4.3.6" } diff --git a/packages/workflow-executor/src/adapters/agent-client-agent-port.ts b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts new file mode 100644 index 000000000..23015e8bc --- /dev/null +++ b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts @@ -0,0 +1,129 @@ +import type { AgentPort } from '../ports/agent-port'; +import type { ActionRef, CollectionRef, RecordData } from '../types/record'; +import type { RemoteAgentClient, SelectOptions } from '@forestadmin/agent-client'; + +import { RecordNotFoundError } from '../errors'; + +function buildPkFilter( + primaryKeyFields: string[], + recordId: Array, +): SelectOptions['filters'] { + if (primaryKeyFields.length === 1) { + return { field: primaryKeyFields[0], operator: 'Equal', value: recordId[0] }; + } + + return { + aggregator: 'And', + conditions: primaryKeyFields.map((field, i) => ({ + field, + operator: 'Equal', + value: recordId[i], + })), + }; +} + +// agent-client methods (update, relation, action) still expect the pipe-encoded string format +function encodePk(recordId: Array): string { + return recordId.map(v => String(v)).join('|'); +} + +function extractRecordId( + primaryKeyFields: string[], + record: Record, +): Array { + return primaryKeyFields.map(field => record[field] as string | number); +} + +export default class AgentClientAgentPort implements AgentPort { + private readonly client: RemoteAgentClient; + private readonly collectionRefs: Record; + + constructor(params: { + client: RemoteAgentClient; + collectionRefs: Record; + }) { + this.client = params.client; + this.collectionRefs = params.collectionRefs; + } + + async getRecord(collectionName: string, recordId: Array): Promise { + const ref = this.getCollectionRef(collectionName); + const records = await this.client.collection(collectionName).list>({ + filters: buildPkFilter(ref.primaryKeyFields, recordId), + pagination: { size: 1, number: 1 }, + }); + + if (records.length === 0) { + throw new RecordNotFoundError(collectionName, encodePk(recordId)); + } + + return { ...ref, recordId, values: records[0] }; + } + + async updateRecord( + collectionName: string, + recordId: Array, + values: Record, + ): Promise { + const ref = this.getCollectionRef(collectionName); + const updatedRecord = await this.client + .collection(collectionName) + .update>(encodePk(recordId), values); + + return { ...ref, recordId, values: updatedRecord }; + } + + async getRelatedData( + collectionName: string, + recordId: Array, + relationName: string, + ): Promise { + const relatedRef = this.getCollectionRef(relationName); + + const records = await this.client + .collection(collectionName) + .relation(relationName, encodePk(recordId)) + .list>(); + + return records.map(record => ({ + ...relatedRef, + recordId: extractRecordId(relatedRef.primaryKeyFields, record), + values: record, + })); + } + + async getActions(collectionName: string): Promise { + const ref = this.collectionRefs[collectionName]; + + return ref ? ref.actions : []; + } + + async executeAction( + collectionName: string, + actionName: string, + recordIds: Array[], + ): Promise { + const encodedIds = recordIds.map(id => encodePk(id)); + const action = await this.client + .collection(collectionName) + .action(actionName, { recordIds: encodedIds }); + + return action.execute(); + } + + private getCollectionRef(collectionName: string): CollectionRef { + const ref = this.collectionRefs[collectionName]; + + if (!ref) { + return { + collectionName, + collectionDisplayName: collectionName, + primaryKeyFields: ['id'], + fields: [], + actions: [], + }; + } + + return ref; + } +} diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 3a853e949..d735977d4 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -21,3 +21,9 @@ export class MalformedToolCallError extends WorkflowExecutorError { this.toolName = toolName; } } + +export class RecordNotFoundError extends WorkflowExecutorError { + constructor(collectionName: string, recordId: string) { + super(`Record not found: collection "${collectionName}", id "${recordId}"`); + } +} diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 9d570f572..c434071d8 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -19,7 +19,7 @@ export type { StepExecutionData, } from './types/step-execution-data'; -export type { RecordFieldRef, RecordRef, RecordData } from './types/record'; +export type { RecordFieldRef, ActionRef, CollectionRef, RecordData } from './types/record'; export type { StepRecord, @@ -33,6 +33,12 @@ export type { AgentPort } from './ports/agent-port'; export type { McpConfiguration, WorkflowPort } from './ports/workflow-port'; export type { RunStore } from './ports/run-store'; -export { WorkflowExecutorError, MissingToolCallError, MalformedToolCallError } from './errors'; +export { + WorkflowExecutorError, + MissingToolCallError, + MalformedToolCallError, + RecordNotFoundError, +} from './errors'; export { default as BaseStepExecutor } from './executors/base-step-executor'; export { default as ConditionStepExecutor } from './executors/condition-step-executor'; +export { default as AgentClientAgentPort } from './adapters/agent-client-agent-port'; diff --git a/packages/workflow-executor/src/ports/agent-port.ts b/packages/workflow-executor/src/ports/agent-port.ts index 5d4f431c7..6a588f1f2 100644 --- a/packages/workflow-executor/src/ports/agent-port.ts +++ b/packages/workflow-executor/src/ports/agent-port.ts @@ -1,19 +1,23 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ -import type { RecordData } from '../types/record'; +import type { ActionRef, RecordData } from '../types/record'; export interface AgentPort { - getRecord(collectionName: string, recordId: string): Promise; + getRecord(collectionName: string, recordId: Array): Promise; updateRecord( collectionName: string, - recordId: string, + recordId: Array, values: Record, ): Promise; getRelatedData( collectionName: string, - recordId: string, + recordId: Array, relationName: string, ): Promise; - getActions(collectionName: string): Promise; - executeAction(collectionName: string, actionName: string, recordIds: string[]): Promise; + getActions(collectionName: string): Promise; + executeAction( + collectionName: string, + actionName: string, + recordIds: Array[], + ): Promise; } diff --git a/packages/workflow-executor/src/ports/workflow-port.ts b/packages/workflow-executor/src/ports/workflow-port.ts index 806bb3398..c36ea41d8 100644 --- a/packages/workflow-executor/src/ports/workflow-port.ts +++ b/packages/workflow-executor/src/ports/workflow-port.ts @@ -1,7 +1,7 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ import type { PendingStepExecution } from '../types/execution'; -import type { RecordRef } from '../types/record'; +import type { CollectionRef } from '../types/record'; import type { StepHistory } from '../types/step-history'; /** Placeholder -- will be typed as McpConfiguration from @forestadmin/ai-proxy/mcp-client once added as dependency. */ @@ -10,6 +10,6 @@ export type McpConfiguration = unknown; export interface WorkflowPort { getPendingStepExecutions(): Promise; completeStepExecution(runId: string, stepHistory: StepHistory): Promise; - getCollectionRef(collectionName: string): Promise; + getCollectionRef(collectionName: string): Promise; getMcpServerConfigs(): Promise; } diff --git a/packages/workflow-executor/src/types/execution.ts b/packages/workflow-executor/src/types/execution.ts index e983aad4b..d2524403c 100644 --- a/packages/workflow-executor/src/types/execution.ts +++ b/packages/workflow-executor/src/types/execution.ts @@ -1,6 +1,6 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ -import type { RecordRef } from './record'; +import type { CollectionRef } from './record'; import type { StepDefinition } from './step-definition'; import type { StepHistory } from './step-history'; import type { AgentPort } from '../ports/agent-port'; @@ -20,7 +20,7 @@ export interface PendingStepExecution { readonly step: StepDefinition; readonly stepHistory: StepHistory; readonly previousSteps: ReadonlyArray; - readonly availableRecords: ReadonlyArray; + readonly availableRecords: ReadonlyArray; readonly userInput?: UserInput; } diff --git a/packages/workflow-executor/src/types/record.ts b/packages/workflow-executor/src/types/record.ts index 9610da056..14064fcb1 100644 --- a/packages/workflow-executor/src/types/record.ts +++ b/packages/workflow-executor/src/types/record.ts @@ -8,13 +8,20 @@ export interface RecordFieldRef { referencedCollectionName?: string; } -export interface RecordRef { - recordId: string; +export interface ActionRef { + name: string; + displayName: string; +} + +export interface CollectionRef { collectionName: string; collectionDisplayName: string; + primaryKeyFields: string[]; fields: RecordFieldRef[]; + actions: ActionRef[]; } -export interface RecordData extends RecordRef { +export interface RecordData extends CollectionRef { + recordId: Array; values: Record; } diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index 5b5549c87..e2d46eaf4 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -1,6 +1,6 @@ /** @draft Types derived from the workflow-executor spec -- subject to change. */ -import type { RecordRef } from './record'; +import type { CollectionRef } from './record'; interface BaseStepExecutionData { stepIndex: number; @@ -17,7 +17,7 @@ export interface AiTaskStepExecutionData extends BaseStepExecutionData { executionParams?: Record; executionResult?: Record; toolConfirmationInterruption?: Record; - selectedRecordRef?: RecordRef; + selectedRecord?: CollectionRef; } export type StepExecutionData = ConditionStepExecutionData | AiTaskStepExecutionData; diff --git a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts new file mode 100644 index 000000000..878990787 --- /dev/null +++ b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts @@ -0,0 +1,241 @@ +import type { CollectionRef } from '../../src/types/record'; +import type { RemoteAgentClient } from '@forestadmin/agent-client'; + +import AgentClientAgentPort from '../../src/adapters/agent-client-agent-port'; +import { RecordNotFoundError } from '../../src/errors'; + +function createMockClient() { + const mockAction = { execute: jest.fn() }; + const mockRelation = { list: jest.fn() }; + const mockCollection = { + list: jest.fn(), + update: jest.fn(), + relation: jest.fn().mockReturnValue(mockRelation), + action: jest.fn().mockResolvedValue(mockAction), + }; + + const client = { + collection: jest.fn().mockReturnValue(mockCollection), + } as unknown as jest.Mocked; + + return { client, mockCollection, mockRelation, mockAction }; +} + +describe('AgentClientAgentPort', () => { + let client: jest.Mocked; + let mockCollection: ReturnType['mockCollection']; + let mockRelation: ReturnType['mockRelation']; + let mockAction: ReturnType['mockAction']; + let collectionRefs: Record; + let port: AgentClientAgentPort; + + beforeEach(() => { + jest.clearAllMocks(); + + ({ client, mockCollection, mockRelation, mockAction } = createMockClient()); + + collectionRefs = { + users: { + collectionName: 'users', + collectionDisplayName: 'Users', + primaryKeyFields: ['id'], + fields: [ + { fieldName: 'id', displayName: 'id', type: 'Number', isRelationship: false }, + { fieldName: 'name', displayName: 'name', type: 'String', isRelationship: false }, + ], + actions: [ + { name: 'sendEmail', displayName: 'Send Email' }, + { name: 'archive', displayName: 'Archive' }, + ], + }, + orders: { + collectionName: 'orders', + collectionDisplayName: 'Orders', + primaryKeyFields: ['tenantId', 'orderId'], + fields: [ + { fieldName: 'tenantId', displayName: 'Tenant', type: 'Number', isRelationship: false }, + { fieldName: 'orderId', displayName: 'Order', type: 'Number', isRelationship: false }, + ], + actions: [], + }, + posts: { + collectionName: 'posts', + collectionDisplayName: 'Posts', + primaryKeyFields: ['id'], + fields: [ + { fieldName: 'id', displayName: 'id', type: 'Number', isRelationship: false }, + { fieldName: 'title', displayName: 'title', type: 'String', isRelationship: false }, + ], + actions: [], + }, + }; + + port = new AgentClientAgentPort({ client, collectionRefs }); + }); + + describe('getRecord', () => { + it('should return a RecordData for a simple PK', async () => { + mockCollection.list.mockResolvedValue([{ id: 42, name: 'Alice' }]); + + const result = await port.getRecord('users', [42]); + + expect(mockCollection.list).toHaveBeenCalledWith({ + filters: { field: 'id', operator: 'Equal', value: 42 }, + pagination: { size: 1, number: 1 }, + }); + expect(result).toEqual({ + recordId: [42], + collectionName: 'users', + collectionDisplayName: 'Users', + primaryKeyFields: ['id'], + fields: collectionRefs.users.fields, + actions: collectionRefs.users.actions, + values: { id: 42, name: 'Alice' }, + }); + }); + + it('should build a composite And filter for composite PKs', async () => { + mockCollection.list.mockResolvedValue([{ tenantId: 1, orderId: 2 }]); + + await port.getRecord('orders', [1, 2]); + + expect(mockCollection.list).toHaveBeenCalledWith({ + filters: { + aggregator: 'And', + conditions: [ + { field: 'tenantId', operator: 'Equal', value: 1 }, + { field: 'orderId', operator: 'Equal', value: 2 }, + ], + }, + pagination: { size: 1, number: 1 }, + }); + }); + + it('should throw a RecordNotFoundError when no record is found', async () => { + mockCollection.list.mockResolvedValue([]); + + await expect(port.getRecord('users', [999])).rejects.toThrow(RecordNotFoundError); + }); + + it('should fallback to pk field "id" when collection is unknown', async () => { + mockCollection.list.mockResolvedValue([{ id: 1 }]); + + const result = await port.getRecord('unknown', [1]); + + expect(mockCollection.list).toHaveBeenCalledWith( + expect.objectContaining({ + filters: { field: 'id', operator: 'Equal', value: 1 }, + }), + ); + expect(result.collectionName).toBe('unknown'); + expect(result.fields).toEqual([]); + }); + }); + + describe('updateRecord', () => { + it('should call update with pipe-encoded id and return a RecordData', async () => { + mockCollection.update.mockResolvedValue({ id: 42, name: 'Bob' }); + + const result = await port.updateRecord('users', [42], { name: 'Bob' }); + + expect(mockCollection.update).toHaveBeenCalledWith('42', { name: 'Bob' }); + expect(result).toEqual({ + recordId: [42], + collectionName: 'users', + collectionDisplayName: 'Users', + primaryKeyFields: ['id'], + fields: collectionRefs.users.fields, + actions: collectionRefs.users.actions, + values: { id: 42, name: 'Bob' }, + }); + }); + + it('should encode composite PK to pipe format for update', async () => { + mockCollection.update.mockResolvedValue({ tenantId: 1, orderId: 2 }); + + await port.updateRecord('orders', [1, 2], { status: 'done' }); + + expect(mockCollection.update).toHaveBeenCalledWith('1|2', { status: 'done' }); + }); + }); + + describe('getRelatedData', () => { + it('should return RecordData[] with recordId extracted from PK fields', async () => { + mockRelation.list.mockResolvedValue([ + { id: 10, title: 'Post A' }, + { id: 11, title: 'Post B' }, + ]); + + const result = await port.getRelatedData('users', [42], 'posts'); + + expect(mockCollection.relation).toHaveBeenCalledWith('posts', '42'); + expect(result).toEqual([ + { + recordId: [10], + collectionName: 'posts', + collectionDisplayName: 'Posts', + primaryKeyFields: ['id'], + fields: collectionRefs.posts.fields, + actions: collectionRefs.posts.actions, + values: { id: 10, title: 'Post A' }, + }, + { + recordId: [11], + collectionName: 'posts', + collectionDisplayName: 'Posts', + primaryKeyFields: ['id'], + fields: collectionRefs.posts.fields, + actions: collectionRefs.posts.actions, + values: { id: 11, title: 'Post B' }, + }, + ]); + }); + + it('should fallback to relationName when no CollectionRef exists', async () => { + mockRelation.list.mockResolvedValue([{ id: 1 }]); + + const result = await port.getRelatedData('users', [42], 'unknownRelation'); + + expect(result[0].collectionName).toBe('unknownRelation'); + expect(result[0].recordId).toEqual([1]); + }); + + it('should return an empty array when no related data exists', async () => { + mockRelation.list.mockResolvedValue([]); + + expect(await port.getRelatedData('users', [42], 'posts')).toEqual([]); + }); + }); + + describe('getActions', () => { + it('should return ActionRef[] from CollectionRef', async () => { + expect(await port.getActions('users')).toEqual([ + { name: 'sendEmail', displayName: 'Send Email' }, + { name: 'archive', displayName: 'Archive' }, + ]); + }); + + it('should return an empty array for an unknown collection', async () => { + expect(await port.getActions('unknown')).toEqual([]); + }); + }); + + describe('executeAction', () => { + it('should encode recordIds to pipe format and call execute', async () => { + mockAction.execute.mockResolvedValue({ success: 'done' }); + + const result = await port.executeAction('users', 'sendEmail', [[1], [2]]); + + expect(mockCollection.action).toHaveBeenCalledWith('sendEmail', { recordIds: ['1', '2'] }); + expect(result).toEqual({ success: 'done' }); + }); + + it('should propagate errors from action execution', async () => { + mockAction.execute.mockRejectedValue(new Error('Action failed')); + + await expect(port.executeAction('users', 'sendEmail', [[1]])).rejects.toThrow( + 'Action failed', + ); + }); + }); +});