Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "fishbone",
"displayName": "Fishbone / Ishikawa analysis",
"description": "Create interactive fishbone diagrams for a systematic defect/failure analysis.",
"version": "1.38.4",
"version": "1.51.1",
"license": "CC-BY-NC-SA-4.0",
"publisher": "mbehr1",
"author": {
Expand Down
55 changes: 55 additions & 0 deletions src/agents/core/AgentManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

import { EventBus } from '../core/EventBus';
import { EventType } from './types';
import * as vscode from 'vscode';

/**
* AgentManager (Agentic/Event-Driven)
*
* Orchestrates the event-driven workflow for all agents.
* Publishes events for each workflow step and handles errors robustly.
*
* Responsibilities:
* - Orchestrate agentic event flow for a given task
* - Publish SEQUENCE, FILTER, QUERY, SUMMARY, and UPDATE events
* - Handle and emit ERROR events for each step
* - Log all actions for traceability
*/
export class AgentManager {
/**
* Orchestrate the event-driven workflow for a given task.
* Publishes events for each step and handles errors robustly.
*/
async orchestrate(task: any, eventBus: EventBus, log: vscode.LogOutputChannel) {
log.info('AgentManager: orchestrate called', { task });
const correlationId = task?.correlationId;
// Example event-driven workflow with robust error handling:
try {
await eventBus.publish({ type: EventType.SEQUENCE, payload: { sequence: task.dltResult } }, log);
} catch (e) {
await eventBus.publish({ type: EventType.ERROR, payload: { error: { message: `AgentManager SEQUENCE error: ${e}`, eventType: EventType.SEQUENCE, eventPayload: { sequence: task.dltResult }, correlationId } } }, log);
}
try {
await eventBus.publish({ type: EventType.FILTER, payload: { filter: 'default', data: task.dltResult } }, log);
} catch (e) {
await eventBus.publish({ type: EventType.ERROR, payload: { error: { message: `AgentManager FILTER error: ${e}`, eventType: EventType.FILTER, eventPayload: { filter: 'default', data: task.dltResult }, correlationId } } }, log);
}
try {
await eventBus.publish({ type: EventType.QUERY, payload: { query: task.query || '' } }, log);
} catch (e) {
await eventBus.publish({ type: EventType.ERROR, payload: { error: { message: `AgentManager QUERY error: ${e}`, eventType: EventType.QUERY, eventPayload: { query: task.query || '' }, correlationId } } }, log);
}
try {
await eventBus.publish({ type: EventType.SUMMARY, payload: { summary: 'Summary placeholder' } }, log);
} catch (e) {
await eventBus.publish({ type: EventType.ERROR, payload: { error: { message: `AgentManager SUMMARY error: ${e}`, eventType: EventType.SUMMARY, eventPayload: { summary: 'Summary placeholder' }, correlationId } } }, log);
}
try {
await eventBus.publish({ type: EventType.UPDATE, payload: { update: 'Update placeholder' } }, log);
} catch (e) {
await eventBus.publish({ type: EventType.ERROR, payload: { error: { message: `AgentManager UPDATE error: ${e}`, eventType: EventType.UPDATE, eventPayload: { update: 'Update placeholder' }, correlationId } } }, log);
}
log.info('AgentManager: orchestrate finished');
return { summary: 'Summary placeholder' };
}
}
109 changes: 109 additions & 0 deletions src/agents/core/ErrorHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { EventType, Event } from './types';
import { globalEventBus } from './EventBus';
import * as vscode from 'vscode';

export enum ErrorSeverity {
INFO = 'info',
WARNING = 'warning',
CRITICAL = 'critical',
}

export interface AgentError {
message: string;
code?: string;
severity: ErrorSeverity;
cause?: any;
agentId?: string;
timestamp: number;
}

/**
* ErrorHandler (Agentic/Event-Driven)
*
* Centralized error handling for all agents and workflows.
* Classifies, logs, emits, and recovers from errors in an agentic context.
*
* Responsibilities:
* - Classify errors by severity and agent
* - Log errors and emit ERROR events to the EventBus
* - Attempt recovery or escalation as needed
*/
export class ErrorHandler {
/**
* Handle an error: classify, log, emit event, and attempt recovery.
*/
handle(error: any, agentId: string | undefined, log: vscode.LogOutputChannel) {
const agentError: AgentError = this.classify(error, agentId);
log.error(`[${agentError.severity}] Error: ${agentError.message}`, agentError);
// Emit error event for agentic workflows
const event: Event<EventType.ERROR> = {
type: EventType.ERROR,
payload: { error: agentError },
};
globalEventBus.publish(event, log);
this.recover(agentError, log);
}

/**
* Classify an error by severity, message, and agent.
* Returns a structured AgentError for logging and event emission.
*/
classify(error: any, agentId?: string): AgentError {
// Simple classification logic (customize as needed)
let severity = ErrorSeverity.WARNING;
let message = '';
if (typeof error === 'string') {
message = error;
} else if (error instanceof Error) {
message = error.message;
} else if (error && error.message) {
message = error.message;
} else {
message = JSON.stringify(error);
}
if (message.match(/critical|fatal|unrecoverable/i)) {
severity = ErrorSeverity.CRITICAL;
} else if (message.match(/warn|timeout|retry/i)) {
severity = ErrorSeverity.WARNING;
} else {
severity = ErrorSeverity.INFO;
}
return {
message,
code: error.code || undefined,
severity,
cause: error,
agentId,
timestamp: Date.now(),
};
}

recover(agentError: AgentError, log: vscode.LogOutputChannel) {
// Recovery logic based on severity
switch (agentError.severity) {
case ErrorSeverity.INFO:
// No action needed for informational errors
break;
case ErrorSeverity.WARNING:
// Attempt a retry: emit a RETRY event if agentId is available
if (agentError.agentId) {
const retryEvent: Event = {
type: 'RETRY' as EventType, // Add RETRY to EventType if not present
payload: { error: agentError },
};
log.info('[Recovery] RETRY event emitted', retryEvent);
globalEventBus.publish(retryEvent, log);
}
break;
case ErrorSeverity.CRITICAL:
// Escalate: emit an ESCALATE event if agentId is available
const escalateEvent: Event = {
type: 'ESCALATE' as EventType, // Add ESCALATE to EventType if not present
payload: { error: agentError },
};
log.error('[Recovery] ESCALATE event emitted', escalateEvent);
globalEventBus.publish(escalateEvent, log);
break;
}
}
}
99 changes: 99 additions & 0 deletions src/agents/core/EventBus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// EventBus.ts

import { Event, EventType } from './types';
import { ErrorHandler, ErrorSeverity } from './ErrorHandler';

// Strongly typed async event handler that receives log
import * as vscode from 'vscode';
type AsyncEventHandler = (event: Event, log: vscode.LogOutputChannel) => Promise<void> | void;

interface HandlerEntry {
handler: AsyncEventHandler;
once: boolean;
}


export class EventBus {
private handlers: Map<EventType, HandlerEntry[]> = new Map();
private errorHandler: ErrorHandler;

constructor() {
this.errorHandler = new ErrorHandler();
}

/**
* Subscribe to an event type. Returns an unsubscribe function.
*/

subscribe(type: EventType, handler: AsyncEventHandler): () => void {
if (!this.handlers.has(type)) {
this.handlers.set(type, []);
}
const entry: HandlerEntry = { handler, once: false };
this.handlers.get(type)!.push(entry);
return () => {
const typeHandlers = this.handlers.get(type)!;
const idx = typeHandlers.indexOf(entry);
if (idx !== -1) { typeHandlers.splice(idx, 1); }
};
}

/**
* Subscribe to an event type, but only for the next event (one-time listener).
*/
once(type: EventType, handler: AsyncEventHandler): void {
if (!this.handlers.has(type)) {
this.handlers.set(type, []);
}
this.handlers.get(type)!.push({ handler, once: true });
}

/**
* Publish an event asynchronously. All handlers are awaited, errors are isolated.
*/
async publish(event: Event, log: vscode.LogOutputChannel): Promise<void> {
log.debug(`[EventBus] publish called: event.type='${event.type}' payload=${JSON.stringify(event.payload)}`);
const entries = this.handlers.get(event.type) || [];
// Copy to avoid mutation during iteration
const toRemove: HandlerEntry[] = [];
await Promise.all(entries.map(async entry => {
try {
await entry.handler(event, log);
} catch (error) {
// Enhanced error handling: include event context and correlation ID if present
const correlationId = (event as any)?.payload?.correlationId;
this.errorHandler.handle(
{
error,
eventType: event.type,
eventPayload: event.payload,
correlationId,
},
undefined,
log
);
// Optionally, emit an error event for critical errors (if ErrorHandler supports escalation)
// If error is critical, broadcast to ERROR event listeners
// (Assumes ErrorHandler.classify returns severity)
const classified = this.errorHandler.classify(error);
if (classified.severity === ErrorSeverity.CRITICAL) {
const errorEvent: Event = {
type: EventType.ERROR,
payload: { error: { ...classified, correlationId } },
};
// Avoid infinite loop: only emit if not already an ERROR event
if (event.type !== EventType.ERROR) {
await this.publish(errorEvent, log);
}
}
}
if (entry.once) { toRemove.push(entry); }
}));
// Remove one-time listeners
if (toRemove.length) {
this.handlers.set(event.type, entries.filter(e => !toRemove.includes(e)));
}
}
}

export const globalEventBus = new EventBus();
63 changes: 63 additions & 0 deletions src/agents/core/FeedbackLoop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@

import { Event, EventType, AgentType } from './types';
import { globalEventBus } from './EventBus';
import * as vscode from 'vscode';

export interface FeedbackCriteria {
minScore?: number;
maxRetries?: number;
customRule?: (result: any) => boolean;
}

/**
* FeedbackLoop (Agentic/Event-Driven)
*
* Provides feedback and refinement logic for agent tasks.
* Tracks retries, emits UPDATE and ERROR events, and enables robust agentic workflows.
*
* Responsibilities:
* - Refine agent tasks based on result and feedback criteria
* - Track and limit retries for each agent
* - Emit UPDATE events for further refinement
* - Emit ERROR events when max retries are exceeded
*/
export class FeedbackLoop {
private retryCounts: Map<string, number> = new Map();

/**
* Refine agent task based on result and feedback criteria.
* Optionally triggers new events for further refinement.
* Emits ERROR if max retries are exceeded.
*/
async refine(agentId: string, result: any, criteria: FeedbackCriteria, log: vscode.LogOutputChannel): Promise<void> {
let shouldRefine = false;
// Check if result score is below minimum
if (criteria.minScore !== undefined && typeof result.score === 'number') {
shouldRefine = result.score < criteria.minScore;
}
// Check custom rule if provided
if (criteria.customRule && !shouldRefine) {
shouldRefine = !criteria.customRule(result);
}
if (shouldRefine) {
// Track retries for this agent
const retries = (this.retryCounts.get(agentId) || 0) + 1;
this.retryCounts.set(agentId, retries);
// If max retries exceeded, emit ERROR event
if (criteria.maxRetries !== undefined && retries > criteria.maxRetries) {
log.warn(`FeedbackLoop: Max retries exceeded for agent ${agentId}`);
await globalEventBus.publish({
type: EventType.ERROR,
payload: { error: `Max retries exceeded for agent ${agentId}` },
}, log);
return;
}
log.info(`FeedbackLoop: Refine triggered for agent ${agentId}, retries=${retries}`);
// Emit UPDATE event for agent to adjust parameters
await globalEventBus.publish({
type: EventType.UPDATE,
payload: { update: { action: 'refine', retries, lastResult: result } },
}, log);
}
}
}
Loading
Loading