diff --git a/docs/superpowers/plans/2026-04-24-entity-channel-connectors.md b/docs/superpowers/plans/2026-04-24-entity-channel-connectors.md new file mode 100644 index 0000000000..3226d15933 --- /dev/null +++ b/docs/superpowers/plans/2026-04-24-entity-channel-connectors.md @@ -0,0 +1,2415 @@ +# Entity Channel Connectors Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Connect entities to external messaging platforms (Telegram, Discord) with bidirectional communication, configured at the entity type definition level via `defineEntity()`. + +**Architecture:** New `channels` property on entity definitions declares platform bindings. A server-side `ChannelManager` orchestrates composable `PlatformAdapter` instances (one per platform). Inbound platform messages auto-spawn entity instances and wake them with normalized `channel_message` events. Agent output is implicitly forwarded back via `DraftStreamLoop` (streaming edit-in-place) or send-on-complete. A `HealthMonitor` auto-restarts failed adapters with exponential backoff. + +**Tech Stack:** TypeScript, Vitest, grammy (Telegram Bot API), discord.js (Discord Gateway + REST), Drizzle ORM (Postgres schema), pino (logging) + +**Spec:** `docs/superpowers/specs/2026-04-24-entity-channel-connectors-design.md` + +--- + +## File Structure + +### `packages/agents-runtime` + +| File | Responsibility | +|------|---------------| +| `src/channels/types.ts` | `ChannelConfig`, `TelegramChannelConfig`, `DiscordChannelConfig`, `ChannelMessageWakeEvent`, `PlatformAdapter` sub-interfaces, `OutboundOpts`, `MessageRef`, `DraftStreamLoop` | +| `src/channels/index.ts` | Re-exports types + config factories | +| `src/channels/telegram.ts` | `telegram()` config factory | +| `src/channels/discord.ts` | `discord()` config factory | +| `src/types.ts` | Add `channelSend` to `HandlerContext`, add `PendingChannelSend` type | +| `src/define-entity.ts` | Accept `channels` field in `EntityDefinition` | +| `src/context-factory.ts` | Wire `ctx.channelSend()` into handler context | +| `test/channels/channel-config.test.ts` | Unit tests for config factories | + +### `packages/agents-server` + +| File | Responsibility | +|------|---------------| +| `src/channels/channel-manager.ts` | `ChannelManager` — adapter lifecycle, inbound routing, outbound watching | +| `src/channels/draft-stream-loop.ts` | `DraftStreamLoop` — throttled edit-in-place streaming | +| `src/channels/health-monitor.ts` | `HealthMonitor` — stale detection, exponential backoff restart | +| `src/channels/telegram/telegram-adapter.ts` | `TelegramAdapter` — webhook gateway + outbound via grammy | +| `src/channels/discord/discord-adapter.ts` | `DiscordAdapter` — WebSocket gateway + REST outbound via discord.js | +| `src/db/schema.ts` | Add `channels` column to `entity_types` table | +| `src/db/migrations/add-channels-column.ts` | SQL migration for `channels` JSONB column | +| `src/electric-agents-types.ts` | Add `channels` to `ElectricAgentsEntityType` | +| `src/electric-agents-manager.ts` | Notify `ChannelManager` on entity type registration | +| `src/electric-agents-routes.ts` | Add webhook route for Telegram | +| `src/server.ts` | Wire `ChannelManager` into server startup/shutdown | +| `test/channels/channel-manager.test.ts` | Integration tests for channel routing | +| `test/channels/draft-stream-loop.test.ts` | Unit tests for streaming loop | +| `test/channels/health-monitor.test.ts` | Unit tests for health monitoring | +| `test/channels/telegram-adapter.test.ts` | Unit tests for Telegram adapter | +| `test/channels/discord-adapter.test.ts` | Unit tests for Discord adapter | + +--- + +## Task 1: Channel Types & Config Factories (`agents-runtime`) + +**Files:** +- Create: `packages/agents-runtime/src/channels/types.ts` +- Create: `packages/agents-runtime/src/channels/telegram.ts` +- Create: `packages/agents-runtime/src/channels/discord.ts` +- Create: `packages/agents-runtime/src/channels/index.ts` +- Create: `packages/agents-runtime/test/channels/channel-config.test.ts` + +### Steps + +- [ ] **Step 1: Write failing tests for channel config types and factories** + +```typescript +// packages/agents-runtime/test/channels/channel-config.test.ts +import { describe, it, expect } from 'vitest' +import { telegram, discord } from '../src/channels/index.js' +import type { + ChannelConfig, + TelegramChannelConfig, + DiscordChannelConfig, +} from '../src/channels/types.js' + +describe('telegram config factory', () => { + it('creates config with defaults', () => { + const config = telegram({ mode: 'direct' }) + expect(config).toEqual({ + platform: 'telegram', + mode: 'direct', + allowedChatIds: [], + forwardAgentOutput: true, + }) + }) + + it('accepts all options', () => { + const config = telegram({ + mode: 'group', + allowedChatIds: ['123', '456'], + forwardAgentOutput: false, + }) + expect(config).toEqual({ + platform: 'telegram', + mode: 'group', + allowedChatIds: ['123', '456'], + forwardAgentOutput: false, + }) + }) + + it('rejects invalid mode', () => { + // @ts-expect-error - invalid mode + expect(() => telegram({ mode: 'invalid' })).toThrow('Invalid telegram mode') + }) +}) + +describe('discord config factory', () => { + it('creates config with defaults', () => { + const config = discord({ mode: 'channel' }) + expect(config).toEqual({ + platform: 'discord', + mode: 'channel', + allowedChatIds: [], + forwardAgentOutput: true, + }) + }) + + it('accepts thread mode', () => { + const config = discord({ mode: 'thread' }) + expect(config.mode).toBe('thread') + }) + + it('accepts direct mode', () => { + const config = discord({ mode: 'direct' }) + expect(config.mode).toBe('direct') + }) + + it('rejects invalid mode', () => { + // @ts-expect-error - invalid mode + expect(() => discord({ mode: 'invalid' })).toThrow('Invalid discord mode') + }) +}) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd packages/agents-runtime && npx vitest run test/channels/channel-config.test.ts` +Expected: FAIL — modules not found + +- [ ] **Step 3: Implement channel types** + +```typescript +// packages/agents-runtime/src/channels/types.ts + +// --- Channel Config --- + +export interface ChannelConfig { + platform: string + mode: string + allowedChatIds: string[] + forwardAgentOutput: boolean +} + +export interface TelegramChannelConfig extends ChannelConfig { + platform: 'telegram' + mode: 'direct' | 'group' +} + +export interface DiscordChannelConfig extends ChannelConfig { + platform: 'discord' + mode: 'direct' | 'channel' | 'thread' +} + +// --- Wake Event --- + +export interface ChannelMessageSource { + platform: string + chatId: string + messageId: string + threadId?: string +} + +export interface ChannelMessageSender { + id: string + username?: string +} + +export interface ChannelMessageWakeEvent { + type: 'channel_message' + source: ChannelMessageSource + sender: ChannelMessageSender + payload: { + text: string + replyTo?: string + channelData?: Record + } +} + +// --- Pending Channel Send --- + +export interface PendingChannelSend { + platform: string + chatId: string + text: string + threadId?: string + replyToId?: string + channelData?: Record +} + +// --- Platform Adapter Interfaces --- + +export interface GatewayContext { + configs: Map + abortSignal: AbortSignal + log: Logger + onMessage: ( + entityType: string, + event: ChannelMessageWakeEvent + ) => Promise +} + +export interface GatewayAdapter { + start(ctx: GatewayContext): Promise + stop(): Promise +} + +export interface MessageRef { + messageId: string + chatId: string +} + +export interface OutboundOpts { + threadId?: string + replyToId?: string + channelData?: Record +} + +export interface OutboundAdapter { + sendText( + chatId: string, + text: string, + opts?: OutboundOpts + ): Promise + editText?(ref: MessageRef, text: string): Promise +} + +export interface StreamingAdapter { + mode: 'partial' | 'off' +} + +export interface ThreadingAdapter { + supportsThreads: boolean +} + +export interface GroupAdapter { + supportsGroups: boolean +} + +export interface PlatformAdapter { + platform: string + gateway: GatewayAdapter + outbound: OutboundAdapter + streaming?: StreamingAdapter + threading?: ThreadingAdapter + groups?: GroupAdapter +} + +// Logger type to avoid hard dependency on pino +export interface Logger { + info(msg: string, ...args: unknown[]): void + warn(msg: string, ...args: unknown[]): void + error(msg: string, ...args: unknown[]): void + debug(msg: string, ...args: unknown[]): void +} +``` + +- [ ] **Step 4: Implement telegram config factory** + +```typescript +// packages/agents-runtime/src/channels/telegram.ts +import type { TelegramChannelConfig } from './types.js' + +const VALID_MODES = ['direct', 'group'] as const + +type TelegramMode = (typeof VALID_MODES)[number] + +interface TelegramConfigInput { + mode: TelegramMode + allowedChatIds?: string[] + forwardAgentOutput?: boolean +} + +export function telegram(input: TelegramConfigInput): TelegramChannelConfig { + if (!VALID_MODES.includes(input.mode)) { + throw new Error( + `Invalid telegram mode: "${input.mode}". Must be one of: ${VALID_MODES.join(', ')}` + ) + } + return { + platform: 'telegram', + mode: input.mode, + allowedChatIds: input.allowedChatIds ?? [], + forwardAgentOutput: input.forwardAgentOutput ?? true, + } +} +``` + +- [ ] **Step 5: Implement discord config factory** + +```typescript +// packages/agents-runtime/src/channels/discord.ts +import type { DiscordChannelConfig } from './types.js' + +const VALID_MODES = ['direct', 'channel', 'thread'] as const + +type DiscordMode = (typeof VALID_MODES)[number] + +interface DiscordConfigInput { + mode: DiscordMode + allowedChatIds?: string[] + forwardAgentOutput?: boolean +} + +export function discord(input: DiscordConfigInput): DiscordChannelConfig { + if (!VALID_MODES.includes(input.mode)) { + throw new Error( + `Invalid discord mode: "${input.mode}". Must be one of: ${VALID_MODES.join(', ')}` + ) + } + return { + platform: 'discord', + mode: input.mode, + allowedChatIds: input.allowedChatIds ?? [], + forwardAgentOutput: input.forwardAgentOutput ?? true, + } +} +``` + +- [ ] **Step 6: Create index re-export** + +```typescript +// packages/agents-runtime/src/channels/index.ts +export { telegram } from './telegram.js' +export { discord } from './discord.js' +export type { + ChannelConfig, + TelegramChannelConfig, + DiscordChannelConfig, + ChannelMessageWakeEvent, + ChannelMessageSource, + ChannelMessageSender, + PendingChannelSend, + PlatformAdapter, + GatewayAdapter, + GatewayContext, + OutboundAdapter, + OutboundOpts, + MessageRef, + StreamingAdapter, + ThreadingAdapter, + GroupAdapter, + Logger, +} from './types.js' +``` + +- [ ] **Step 7: Run tests to verify they pass** + +Run: `cd packages/agents-runtime && npx vitest run test/channels/channel-config.test.ts` +Expected: All 6 tests PASS + +- [ ] **Step 8: Commit** + +```bash +git add packages/agents-runtime/src/channels/ packages/agents-runtime/test/channels/ +git commit -m "feat(agents-runtime): add channel config types and factories for telegram/discord" +``` + +--- + +## Task 2: Extend `EntityDefinition` and `HandlerContext` (`agents-runtime`) + +**Files:** +- Modify: `packages/agents-runtime/src/types.ts` (lines 238-244, 607-667, 669-680) +- Modify: `packages/agents-runtime/src/define-entity.ts` (lines 41-43) +- Modify: `packages/agents-runtime/src/context-factory.ts` (lines 182-537) + +### Steps + +- [ ] **Step 1: Add `channels` to `EntityDefinition` in `types.ts`** + +In `packages/agents-runtime/src/types.ts`, add the import at the top: + +```typescript +import type { ChannelConfig, PendingChannelSend } from './channels/types.js' +``` + +Then modify the `EntityDefinition` interface (around line 669) to add `channels`: + +```typescript +export interface EntityDefinition { + description?: string + channels?: ChannelConfig[] + state?: Record + actions?: ( + collections: Record + ) => Record) => void> + creationSchema?: StandardJSONSchemaV1 + inboxSchemas?: Record + outputSchemas?: Record + + handler: (ctx: HandlerContext, wake: WakeEvent) => void | Promise +} +``` + +- [ ] **Step 2: Add `channelSend` to `HandlerContext` in `types.ts`** + +In `packages/agents-runtime/src/types.ts`, add `channelSend` to the `HandlerContext` interface (around line 660, before the closing brace): + +```typescript + channelSend: ( + platform: string, + opts: { + chatId: string + text: string + threadId?: string + replyToId?: string + channelData?: Record + } + ) => void +``` + +- [ ] **Step 3: Re-export channel types from `types.ts`** + +At the bottom of `packages/agents-runtime/src/types.ts`, add: + +```typescript +export type { + ChannelConfig, + PendingChannelSend, + ChannelMessageWakeEvent, +} from './channels/types.js' +``` + +- [ ] **Step 4: Wire `channelSend` into `context-factory.ts`** + +In `packages/agents-runtime/src/context-factory.ts`, add `PendingChannelSend` to the imports from `./channels/types.js`: + +```typescript +import type { PendingChannelSend } from './channels/types.js' +``` + +Add to `HandlerContextConfig` (around line 140): + +```typescript + enqueueChannelSend: (send: PendingChannelSend) => void +``` + +In the `createHandlerContext` function body (around line 490, near the `send` implementation), add: + +```typescript + channelSend: (platform, opts) => { + config.enqueueChannelSend({ + platform, + chatId: opts.chatId, + text: opts.text, + threadId: opts.threadId, + replyToId: opts.replyToId, + channelData: opts.channelData, + }) + }, +``` + +- [ ] **Step 5: Verify existing tests still pass** + +Run: `cd packages/agents-runtime && npx vitest run` +Expected: All existing tests PASS (no regressions) + +- [ ] **Step 6: Commit** + +```bash +git add packages/agents-runtime/src/types.ts packages/agents-runtime/src/define-entity.ts packages/agents-runtime/src/context-factory.ts +git commit -m "feat(agents-runtime): add channels to EntityDefinition and channelSend to HandlerContext" +``` + +--- + +## Task 3: Add `channels` export to `agents-runtime` package + +**Files:** +- Modify: `packages/agents-runtime/package.json` (lines 18-40) +- Modify: `packages/agents-runtime/src/index.ts` + +### Steps + +- [ ] **Step 1: Add `./channels` subpath export to `package.json`** + +In `packages/agents-runtime/package.json`, add a new export entry alongside the existing `"."` and `"./react"` entries: + +```json +"./channels": { + "import": { + "types": "./dist/channels/index.d.ts", + "default": "./dist/channels/index.js" + }, + "require": { + "types": "./dist/channels/index.d.cts", + "default": "./dist/channels/index.cjs" + } +} +``` + +- [ ] **Step 2: Re-export channel types from main index** + +In `packages/agents-runtime/src/index.ts`, add: + +```typescript +export type { + ChannelConfig, + ChannelMessageWakeEvent, + PendingChannelSend, +} from './channels/types.js' +``` + +- [ ] **Step 3: Build to verify exports resolve** + +Run: `cd packages/agents-runtime && npx tsup` (or the project's build command) +Expected: Build succeeds, `dist/channels/` directory created + +- [ ] **Step 4: Commit** + +```bash +git add packages/agents-runtime/package.json packages/agents-runtime/src/index.ts +git commit -m "feat(agents-runtime): export channels subpath" +``` + +--- + +## Task 4: Database Schema — Add `channels` Column (`agents-server`) + +**Files:** +- Modify: `packages/agents-server/src/db/schema.ts` (lines 17-27) +- Modify: `packages/agents-server/src/electric-agents-types.ts` (lines 77-87) + +### Steps + +- [ ] **Step 1: Add `channels` to `entity_types` table in `schema.ts`** + +In `packages/agents-server/src/db/schema.ts`, add the `channels` column to the `entity_types` table definition (around line 24, after `state_schemas`): + +```sql +channels jsonb +``` + +The exact code depends on whether schema is defined via Drizzle or raw SQL. Match the pattern used by the existing `state_schemas` column. + +- [ ] **Step 2: Add `channels` to `ElectricAgentsEntityType` in `electric-agents-types.ts`** + +In `packages/agents-server/src/electric-agents-types.ts`, add the `channels` field to `ElectricAgentsEntityType` (around line 83, after `state_schemas`): + +```typescript +export interface ElectricAgentsEntityType { + name: string + description: string + creation_schema?: Record + inbox_schemas?: Record> + state_schemas?: Record> + channels?: Array<{ + platform: string + mode: string + allowedChatIds?: string[] + forwardAgentOutput: boolean + }> + serve_endpoint?: string + revision: number + created_at: string + updated_at: string +} +``` + +- [ ] **Step 3: Create migration for `channels` column** + +Create a migration file at `packages/agents-server/src/db/migrations/` following the existing migration pattern. The migration should: + +```sql +ALTER TABLE entity_types ADD COLUMN IF NOT EXISTS channels jsonb; +``` + +Check how existing migrations are structured in that directory and follow the same pattern. + +- [ ] **Step 4: Verify migration runs without error** + +Run the test suite or start the server to verify the migration applies cleanly: + +Run: `cd packages/agents-server && npx vitest run test/entrypoint.test.ts` +Expected: Server starts and migrations apply without error + +- [ ] **Step 5: Commit** + +```bash +git add packages/agents-server/src/db/schema.ts packages/agents-server/src/electric-agents-types.ts packages/agents-server/src/db/migrations/ +git commit -m "feat(agents-server): add channels column to entity_types schema" +``` + +--- + +## Task 5: DraftStreamLoop (`agents-server`) + +**Files:** +- Create: `packages/agents-server/src/channels/draft-stream-loop.ts` +- Create: `packages/agents-server/test/channels/draft-stream-loop.test.ts` + +### Steps + +- [ ] **Step 1: Write failing tests for DraftStreamLoop** + +```typescript +// packages/agents-server/test/channels/draft-stream-loop.test.ts +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' +import { createDraftStreamLoop } from '../src/channels/draft-stream-loop.js' + +describe('DraftStreamLoop', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('sends initial message on first delta', async () => { + const sendText = vi.fn().mockResolvedValue({ messageId: 'msg1', chatId: 'chat1' }) + const editText = vi.fn().mockResolvedValue(undefined) + + const loop = createDraftStreamLoop({ + chatId: 'chat1', + sendText, + editText, + throttleMs: 500, + }) + + loop.onDelta('Hello') + await loop.flush() + + expect(sendText).toHaveBeenCalledWith('chat1', 'Hello', undefined) + }) + + it('accumulates deltas and edits on flush', async () => { + const sendText = vi.fn().mockResolvedValue({ messageId: 'msg1', chatId: 'chat1' }) + const editText = vi.fn().mockResolvedValue(undefined) + + const loop = createDraftStreamLoop({ + chatId: 'chat1', + sendText, + editText, + throttleMs: 500, + }) + + loop.onDelta('Hello ') + // Flush the initial send + await vi.advanceTimersByTimeAsync(500) + + loop.onDelta('world') + await loop.flush() + + expect(sendText).toHaveBeenCalledTimes(1) + expect(editText).toHaveBeenCalledWith( + { messageId: 'msg1', chatId: 'chat1' }, + 'Hello world' + ) + }) + + it('throttles edits to avoid rate limiting', async () => { + const sendText = vi.fn().mockResolvedValue({ messageId: 'msg1', chatId: 'chat1' }) + const editText = vi.fn().mockResolvedValue(undefined) + + const loop = createDraftStreamLoop({ + chatId: 'chat1', + sendText, + editText, + throttleMs: 500, + }) + + loop.onDelta('a') + await vi.advanceTimersByTimeAsync(500) + + loop.onDelta('b') + loop.onDelta('c') + loop.onDelta('d') + + // Only one edit should fire per throttle window + await vi.advanceTimersByTimeAsync(500) + expect(editText).toHaveBeenCalledTimes(1) + expect(editText).toHaveBeenCalledWith( + { messageId: 'msg1', chatId: 'chat1' }, + 'abcd' + ) + }) + + it('falls back to send-on-complete when editText is not provided', async () => { + const sendText = vi.fn().mockResolvedValue({ messageId: 'msg1', chatId: 'chat1' }) + + const loop = createDraftStreamLoop({ + chatId: 'chat1', + sendText, + throttleMs: 500, + }) + + loop.onDelta('Hello ') + loop.onDelta('world') + await loop.flush() + + expect(sendText).toHaveBeenCalledTimes(1) + expect(sendText).toHaveBeenCalledWith('chat1', 'Hello world', undefined) + }) +}) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd packages/agents-server && npx vitest run test/channels/draft-stream-loop.test.ts` +Expected: FAIL — module not found + +- [ ] **Step 3: Implement DraftStreamLoop** + +```typescript +// packages/agents-server/src/channels/draft-stream-loop.ts +import type { MessageRef, OutboundOpts } from '@electric-ax/agents-runtime/channels' + +export interface DraftStreamLoopConfig { + chatId: string + sendText: ( + chatId: string, + text: string, + opts?: OutboundOpts + ) => Promise + editText?: (ref: MessageRef, text: string) => Promise + throttleMs: number + opts?: OutboundOpts +} + +export interface DraftStreamLoop { + onDelta(text: string): void + flush(): Promise +} + +export function createDraftStreamLoop( + config: DraftStreamLoopConfig +): DraftStreamLoop { + let accumulated = '' + let messageRef: MessageRef | null = null + let dirty = false + let timer: ReturnType | null = null + let pendingEdit: Promise | null = null + + async function doEdit(): Promise { + if (!dirty) return + dirty = false + + if (!messageRef) { + messageRef = await config.sendText( + config.chatId, + accumulated, + config.opts + ) + } else if (config.editText) { + await config.editText(messageRef, accumulated) + } + } + + function scheduleEdit(): void { + if (timer !== null) return + timer = setTimeout(async () => { + timer = null + pendingEdit = doEdit() + await pendingEdit + pendingEdit = null + }, config.throttleMs) + } + + return { + onDelta(text: string): void { + accumulated += text + dirty = true + if (config.editText) { + scheduleEdit() + } + }, + + async flush(): Promise { + if (timer !== null) { + clearTimeout(timer) + timer = null + } + if (pendingEdit) { + await pendingEdit + } + dirty = true + await doEdit() + }, + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd packages/agents-server && npx vitest run test/channels/draft-stream-loop.test.ts` +Expected: All 4 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add packages/agents-server/src/channels/draft-stream-loop.ts packages/agents-server/test/channels/draft-stream-loop.test.ts +git commit -m "feat(agents-server): add DraftStreamLoop for streaming agent output to platforms" +``` + +--- + +## Task 6: HealthMonitor (`agents-server`) + +**Files:** +- Create: `packages/agents-server/src/channels/health-monitor.ts` +- Create: `packages/agents-server/test/channels/health-monitor.test.ts` + +### Steps + +- [ ] **Step 1: Write failing tests for HealthMonitor** + +```typescript +// packages/agents-server/test/channels/health-monitor.test.ts +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' +import { HealthMonitor } from '../src/channels/health-monitor.js' + +describe('HealthMonitor', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('does not restart during startup grace period', async () => { + const restart = vi.fn().mockResolvedValue(undefined) + const monitor = new HealthMonitor({ + checkIntervalMs: 1000, + staleThresholdMs: 500, + startupGraceMs: 5000, + backoff: { initialMs: 100, maxMs: 1000, factor: 2, jitter: 0 }, + maxRestartAttempts: 10, + }) + + monitor.register('telegram', restart) + monitor.start() + + await vi.advanceTimersByTimeAsync(2000) + expect(restart).not.toHaveBeenCalled() + + monitor.stop() + }) + + it('restarts after stale threshold exceeded', async () => { + const restart = vi.fn().mockResolvedValue(undefined) + const monitor = new HealthMonitor({ + checkIntervalMs: 1000, + staleThresholdMs: 500, + startupGraceMs: 0, + backoff: { initialMs: 100, maxMs: 1000, factor: 2, jitter: 0 }, + maxRestartAttempts: 10, + }) + + monitor.register('telegram', restart) + monitor.start() + + // No heartbeat for > staleThresholdMs + await vi.advanceTimersByTimeAsync(1500) + expect(restart).toHaveBeenCalledTimes(1) + + monitor.stop() + }) + + it('does not restart when heartbeats are fresh', async () => { + const restart = vi.fn().mockResolvedValue(undefined) + const monitor = new HealthMonitor({ + checkIntervalMs: 1000, + staleThresholdMs: 2000, + startupGraceMs: 0, + backoff: { initialMs: 100, maxMs: 1000, factor: 2, jitter: 0 }, + maxRestartAttempts: 10, + }) + + monitor.register('telegram', restart) + monitor.start() + + // Keep heartbeating + await vi.advanceTimersByTimeAsync(500) + monitor.heartbeat('telegram') + await vi.advanceTimersByTimeAsync(500) + monitor.heartbeat('telegram') + await vi.advanceTimersByTimeAsync(500) + + expect(restart).not.toHaveBeenCalled() + + monitor.stop() + }) + + it('stops after max restart attempts', async () => { + let callCount = 0 + const restart = vi.fn().mockImplementation(async () => { + callCount++ + }) + const monitor = new HealthMonitor({ + checkIntervalMs: 100, + staleThresholdMs: 50, + startupGraceMs: 0, + backoff: { initialMs: 10, maxMs: 100, factor: 1, jitter: 0 }, + maxRestartAttempts: 3, + }) + + monitor.register('telegram', restart) + monitor.start() + + // Advance enough for all restart attempts + await vi.advanceTimersByTimeAsync(10000) + expect(callCount).toBeLessThanOrEqual(3) + + monitor.stop() + }) +}) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd packages/agents-server && npx vitest run test/channels/health-monitor.test.ts` +Expected: FAIL — module not found + +- [ ] **Step 3: Implement HealthMonitor** + +```typescript +// packages/agents-server/src/channels/health-monitor.ts + +export interface HealthMonitorConfig { + checkIntervalMs: number + staleThresholdMs: number + startupGraceMs: number + backoff: { + initialMs: number + maxMs: number + factor: number + jitter: number + } + maxRestartAttempts: number +} + +interface AdapterEntry { + restart: () => Promise + lastEventAt: number + restartAttempts: number + currentBackoffMs: number + lastRestartAt: number +} + +export const DEFAULT_HEALTH_CONFIG: HealthMonitorConfig = { + checkIntervalMs: 300_000, + staleThresholdMs: 120_000, + startupGraceMs: 60_000, + backoff: { + initialMs: 5_000, + maxMs: 300_000, + factor: 2, + jitter: 0.1, + }, + maxRestartAttempts: 10, +} + +export class HealthMonitor { + private config: HealthMonitorConfig + private adapters = new Map() + private timer: ReturnType | null = null + private startedAt = 0 + + constructor(config: Partial = {}) { + this.config = { ...DEFAULT_HEALTH_CONFIG, ...config } + } + + register(platform: string, restart: () => Promise): void { + this.adapters.set(platform, { + restart, + lastEventAt: Date.now(), + restartAttempts: 0, + currentBackoffMs: this.config.backoff.initialMs, + lastRestartAt: 0, + }) + } + + unregister(platform: string): void { + this.adapters.delete(platform) + } + + heartbeat(platform: string): void { + const entry = this.adapters.get(platform) + if (entry) { + entry.lastEventAt = Date.now() + entry.restartAttempts = 0 + entry.currentBackoffMs = this.config.backoff.initialMs + } + } + + start(): void { + this.startedAt = Date.now() + this.timer = setInterval(() => this.check(), this.config.checkIntervalMs) + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer) + this.timer = null + } + } + + private async check(): Promise { + const now = Date.now() + if (now - this.startedAt < this.config.startupGraceMs) return + + for (const [platform, entry] of this.adapters) { + const staleDuration = now - entry.lastEventAt + if (staleDuration < this.config.staleThresholdMs) continue + if (entry.restartAttempts >= this.config.maxRestartAttempts) continue + + const timeSinceLastRestart = now - entry.lastRestartAt + if (timeSinceLastRestart < entry.currentBackoffMs) continue + + entry.restartAttempts++ + entry.lastRestartAt = now + + try { + await entry.restart() + entry.lastEventAt = now + } catch { + // Restart failed — backoff will handle retry + } + + const jitter = + 1 + (Math.random() * 2 - 1) * this.config.backoff.jitter + entry.currentBackoffMs = Math.min( + entry.currentBackoffMs * this.config.backoff.factor * jitter, + this.config.backoff.maxMs + ) + } + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd packages/agents-server && npx vitest run test/channels/health-monitor.test.ts` +Expected: All 4 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add packages/agents-server/src/channels/health-monitor.ts packages/agents-server/test/channels/health-monitor.test.ts +git commit -m "feat(agents-server): add HealthMonitor with exponential backoff for platform adapters" +``` + +--- + +## Task 7: TelegramAdapter (`agents-server`) + +**Files:** +- Create: `packages/agents-server/src/channels/telegram/telegram-adapter.ts` +- Create: `packages/agents-server/test/channels/telegram-adapter.test.ts` + +### Steps + +- [ ] **Step 1: Add grammy dependency** + +Run: `cd packages/agents-server && pnpm add grammy` + +- [ ] **Step 2: Write failing tests for TelegramAdapter** + +```typescript +// packages/agents-server/test/channels/telegram-adapter.test.ts +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { TelegramAdapter } from '../src/channels/telegram/telegram-adapter.js' +import type { + GatewayContext, + ChannelMessageWakeEvent, +} from '@electric-ax/agents-runtime/channels' + +// Mock grammy Bot +vi.mock('grammy', () => { + const handlers: Record = {} + return { + Bot: vi.fn().mockImplementation(() => ({ + on: vi.fn((event: string, handler: Function) => { + handlers[event] = handler + }), + api: { + setWebhook: vi.fn().mockResolvedValue(true), + deleteWebhook: vi.fn().mockResolvedValue(true), + sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), + editMessageText: vi.fn().mockResolvedValue(true), + }, + handleUpdate: vi.fn(async (update: any) => { + if (update.message && handlers['message:text']) { + await handlers['message:text']({ + message: update.message, + chat: { id: update.message.chat.id }, + from: update.message.from, + }) + } + }), + _handlers: handlers, + })), + webhookCallback: vi.fn().mockReturnValue( + async (req: any, res: any) => {} + ), + } +}) + +describe('TelegramAdapter', () => { + let adapter: TelegramAdapter + + beforeEach(() => { + adapter = new TelegramAdapter('test-token', 'https://example.com') + }) + + it('has platform set to telegram', () => { + expect(adapter.platform).toBe('telegram') + }) + + it('has streaming support', () => { + expect(adapter.streaming).toBeDefined() + expect(adapter.streaming!.mode).toBe('partial') + }) + + describe('outbound', () => { + it('sends text messages', async () => { + // Start the adapter to initialize the bot + const onMessage = vi.fn() + await adapter.gateway.start({ + configs: new Map([['test-type', [{ platform: 'telegram', mode: 'direct', allowedChatIds: [], forwardAgentOutput: true }]]]), + abortSignal: new AbortController().signal, + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + onMessage, + }) + + const ref = await adapter.outbound.sendText('12345', 'Hello!') + expect(ref.chatId).toBe('12345') + expect(ref.messageId).toBeDefined() + }) + + it('edits text messages', async () => { + const onMessage = vi.fn() + await adapter.gateway.start({ + configs: new Map([['test-type', [{ platform: 'telegram', mode: 'direct', allowedChatIds: [], forwardAgentOutput: true }]]]), + abortSignal: new AbortController().signal, + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + onMessage, + }) + + const ref = { messageId: '42', chatId: '12345' } + await adapter.outbound.editText!(ref, 'Updated text') + // Should not throw + }) + }) +}) +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `cd packages/agents-server && npx vitest run test/channels/telegram-adapter.test.ts` +Expected: FAIL — module not found + +- [ ] **Step 4: Implement TelegramAdapter** + +```typescript +// packages/agents-server/src/channels/telegram/telegram-adapter.ts +import { Bot } from 'grammy' +import type { + PlatformAdapter, + GatewayAdapter, + GatewayContext, + OutboundAdapter, + OutboundOpts, + MessageRef, + StreamingAdapter, + ChannelMessageWakeEvent, +} from '@electric-ax/agents-runtime/channels' + +export class TelegramAdapter implements PlatformAdapter { + readonly platform = 'telegram' + readonly gateway: GatewayAdapter + readonly outbound: OutboundAdapter + readonly streaming: StreamingAdapter = { mode: 'partial' } + + private bot: Bot + private webhookBaseUrl: string + private ctx: GatewayContext | null = null + + constructor(botToken: string, webhookBaseUrl: string) { + this.bot = new Bot(botToken) + this.webhookBaseUrl = webhookBaseUrl + + this.gateway = { + start: async (ctx: GatewayContext) => { + this.ctx = ctx + + this.bot.on('message:text', async (grammyCtx) => { + const chatId = String(grammyCtx.chat.id) + const messageId = String(grammyCtx.message!.message_id) + + // Find which entity type handles this chat + const entityType = this.resolveEntityType(chatId, ctx) + if (!entityType) { + ctx.log.debug( + `No entity type bound to telegram chat ${chatId}, ignoring` + ) + return + } + + const event: ChannelMessageWakeEvent = { + type: 'channel_message', + source: { + platform: 'telegram', + chatId, + messageId, + }, + sender: { + id: String(grammyCtx.from?.id ?? ''), + username: grammyCtx.from?.username, + }, + payload: { + text: grammyCtx.message!.text!, + channelData: { + chat_type: grammyCtx.chat.type, + }, + }, + } + + await ctx.onMessage(entityType, event) + }) + + const webhookUrl = `${this.webhookBaseUrl}/_electric/channels/telegram/webhook` + await this.bot.api.setWebhook(webhookUrl) + ctx.log.info(`Telegram webhook registered at ${webhookUrl}`) + }, + + stop: async () => { + await this.bot.api.deleteWebhook() + this.ctx = null + }, + } + + this.outbound = { + sendText: async ( + chatId: string, + text: string, + opts?: OutboundOpts + ): Promise => { + const result = await this.bot.api.sendMessage(Number(chatId), text, { + reply_to_message_id: opts?.replyToId + ? Number(opts.replyToId) + : undefined, + ...(opts?.channelData as Record), + }) + return { + messageId: String(result.message_id), + chatId, + } + }, + + editText: async (ref: MessageRef, text: string): Promise => { + await this.bot.api.editMessageText( + Number(ref.chatId), + Number(ref.messageId), + text + ) + }, + } + } + + /** + * Handle raw Telegram webhook update. + * Called by the HTTP route handler. + */ + async handleWebhookUpdate(update: unknown): Promise { + await this.bot.handleUpdate(update as any) + } + + private resolveEntityType( + chatId: string, + ctx: GatewayContext + ): string | null { + for (const [entityType, configs] of ctx.configs) { + for (const config of configs) { + if (config.platform !== 'telegram') continue + if ( + config.allowedChatIds.length > 0 && + !config.allowedChatIds.includes(chatId) + ) { + continue + } + return entityType + } + } + return null + } +} +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `cd packages/agents-server && npx vitest run test/channels/telegram-adapter.test.ts` +Expected: All tests PASS + +- [ ] **Step 6: Commit** + +```bash +git add packages/agents-server/src/channels/telegram/ packages/agents-server/test/channels/telegram-adapter.test.ts packages/agents-server/package.json pnpm-lock.yaml +git commit -m "feat(agents-server): add TelegramAdapter with webhook gateway and outbound" +``` + +--- + +## Task 8: DiscordAdapter (`agents-server`) + +**Files:** +- Create: `packages/agents-server/src/channels/discord/discord-adapter.ts` +- Create: `packages/agents-server/test/channels/discord-adapter.test.ts` + +### Steps + +- [ ] **Step 1: Add discord.js dependency** + +Run: `cd packages/agents-server && pnpm add discord.js` + +- [ ] **Step 2: Write failing tests for DiscordAdapter** + +```typescript +// packages/agents-server/test/channels/discord-adapter.test.ts +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { DiscordAdapter } from '../src/channels/discord/discord-adapter.js' +import type { GatewayContext } from '@electric-ax/agents-runtime/channels' + +// Mock discord.js +vi.mock('discord.js', () => { + const handlers: Record = {} + return { + Client: vi.fn().mockImplementation(() => ({ + on: vi.fn((event: string, handler: Function) => { + handlers[event] = handler + }), + login: vi.fn().mockResolvedValue('token'), + destroy: vi.fn().mockResolvedValue(undefined), + channels: { + fetch: vi.fn().mockResolvedValue({ + send: vi.fn().mockResolvedValue({ id: 'msg123' }), + }), + }, + user: { id: 'bot-user-id' }, + _handlers: handlers, + })), + GatewayIntentBits: { + Guilds: 1, + GuildMessages: 2, + MessageContent: 4, + DirectMessages: 8, + }, + Partials: { + Channel: 0, + }, + } +}) + +describe('DiscordAdapter', () => { + let adapter: DiscordAdapter + + beforeEach(() => { + adapter = new DiscordAdapter('test-token', 'test-app-id') + }) + + it('has platform set to discord', () => { + expect(adapter.platform).toBe('discord') + }) + + it('has streaming support', () => { + expect(adapter.streaming).toBeDefined() + expect(adapter.streaming!.mode).toBe('partial') + }) + + it('has threading support', () => { + expect(adapter.threading).toBeDefined() + expect(adapter.threading!.supportsThreads).toBe(true) + }) + + describe('outbound', () => { + it('sends text messages', async () => { + const onMessage = vi.fn() + await adapter.gateway.start({ + configs: new Map([['test-type', [{ platform: 'discord', mode: 'channel', allowedChatIds: [], forwardAgentOutput: true }]]]), + abortSignal: new AbortController().signal, + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + onMessage, + }) + + const ref = await adapter.outbound.sendText('channel123', 'Hello Discord!') + expect(ref.chatId).toBe('channel123') + expect(ref.messageId).toBe('msg123') + }) + }) +}) +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `cd packages/agents-server && npx vitest run test/channels/discord-adapter.test.ts` +Expected: FAIL — module not found + +- [ ] **Step 4: Implement DiscordAdapter** + +```typescript +// packages/agents-server/src/channels/discord/discord-adapter.ts +import { Client, GatewayIntentBits, Partials } from 'discord.js' +import type { + PlatformAdapter, + GatewayAdapter, + GatewayContext, + OutboundAdapter, + OutboundOpts, + MessageRef, + StreamingAdapter, + ThreadingAdapter, + ChannelMessageWakeEvent, +} from '@electric-ax/agents-runtime/channels' + +export class DiscordAdapter implements PlatformAdapter { + readonly platform = 'discord' + readonly gateway: GatewayAdapter + readonly outbound: OutboundAdapter + readonly streaming: StreamingAdapter = { mode: 'partial' } + readonly threading: ThreadingAdapter = { supportsThreads: true } + + private client: Client + private botToken: string + private ctx: GatewayContext | null = null + + constructor(botToken: string, _appId: string) { + this.botToken = botToken + this.client = new Client({ + intents: [ + GatewayIntentBits.Guilds, + GatewayIntentBits.GuildMessages, + GatewayIntentBits.MessageContent, + GatewayIntentBits.DirectMessages, + ], + partials: [Partials.Channel], + }) + + this.gateway = { + start: async (ctx: GatewayContext) => { + this.ctx = ctx + + this.client.on('messageCreate', async (message) => { + // Ignore bot's own messages + if (message.author.id === this.client.user?.id) return + if (message.author.bot) return + + const chatId = message.channelId + const entityType = this.resolveEntityType(chatId, ctx) + if (!entityType) { + ctx.log.debug( + `No entity type bound to discord channel ${chatId}, ignoring` + ) + return + } + + const event: ChannelMessageWakeEvent = { + type: 'channel_message', + source: { + platform: 'discord', + chatId, + messageId: message.id, + threadId: message.thread?.id, + }, + sender: { + id: message.author.id, + username: message.author.username, + }, + payload: { + text: message.content, + replyTo: message.reference?.messageId ?? undefined, + channelData: { + guildId: message.guildId, + isDM: message.channel.isDMBased(), + }, + }, + } + + await ctx.onMessage(entityType, event) + }) + + await this.client.login(this.botToken) + ctx.log.info('Discord gateway connected') + }, + + stop: async () => { + this.client.destroy() + this.ctx = null + }, + } + + this.outbound = { + sendText: async ( + chatId: string, + text: string, + opts?: OutboundOpts + ): Promise => { + const channel = await this.client.channels.fetch(chatId) + if (!channel || !('send' in channel)) { + throw new Error(`Cannot send to Discord channel ${chatId}`) + } + + const message = await (channel as any).send({ + content: text, + reply: opts?.replyToId + ? { messageReference: opts.replyToId } + : undefined, + }) + + return { + messageId: message.id, + chatId, + } + }, + + editText: async (ref: MessageRef, text: string): Promise => { + const channel = await this.client.channels.fetch(ref.chatId) + if (!channel || !('messages' in channel)) { + throw new Error(`Cannot edit in Discord channel ${ref.chatId}`) + } + + const message = await (channel as any).messages.fetch(ref.messageId) + await message.edit(text) + }, + } + } + + private resolveEntityType( + chatId: string, + ctx: GatewayContext + ): string | null { + for (const [entityType, configs] of ctx.configs) { + for (const config of configs) { + if (config.platform !== 'discord') continue + if ( + config.allowedChatIds.length > 0 && + !config.allowedChatIds.includes(chatId) + ) { + continue + } + return entityType + } + } + return null + } +} +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `cd packages/agents-server && npx vitest run test/channels/discord-adapter.test.ts` +Expected: All tests PASS + +- [ ] **Step 6: Commit** + +```bash +git add packages/agents-server/src/channels/discord/ packages/agents-server/test/channels/discord-adapter.test.ts packages/agents-server/package.json pnpm-lock.yaml +git commit -m "feat(agents-server): add DiscordAdapter with Gateway WebSocket and REST outbound" +``` + +--- + +## Task 9: ChannelManager (`agents-server`) + +**Files:** +- Create: `packages/agents-server/src/channels/channel-manager.ts` +- Create: `packages/agents-server/test/channels/channel-manager.test.ts` + +### Steps + +- [ ] **Step 1: Write failing tests for ChannelManager** + +```typescript +// packages/agents-server/test/channels/channel-manager.test.ts +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { ChannelManager } from '../src/channels/channel-manager.js' +import type { + PlatformAdapter, + GatewayAdapter, + GatewayContext, + OutboundAdapter, + MessageRef, + ChannelMessageWakeEvent, +} from '@electric-ax/agents-runtime/channels' +import { telegram } from '@electric-ax/agents-runtime/channels' + +function createMockAdapter(): PlatformAdapter & { + _onMessage: ((entityType: string, event: ChannelMessageWakeEvent) => Promise) | null +} { + let onMessage: ((entityType: string, event: ChannelMessageWakeEvent) => Promise) | null = null + + return { + platform: 'telegram', + _onMessage: null, + gateway: { + start: vi.fn(async (ctx: GatewayContext) => { + onMessage = ctx.onMessage + }), + stop: vi.fn(async () => {}), + } as GatewayAdapter, + outbound: { + sendText: vi.fn(async (chatId: string, text: string): Promise => ({ + messageId: 'msg1', + chatId, + })), + editText: vi.fn(async () => {}), + } as OutboundAdapter, + get _onMessageFn() { + return onMessage + }, + } +} + +describe('ChannelManager', () => { + let manager: ChannelManager + let mockAdapter: ReturnType + let mockSpawn: Ret.MockedFunction + let mockSend: vi.MockedFunction + + beforeEach(() => { + mockAdapter = createMockAdapter() + mockSpawn = vi.fn().mockResolvedValue({ url: '/bot/telegram-123' }) + mockSend = vi.fn().mockResolvedValue(undefined) + + manager = new ChannelManager({ + adapters: [mockAdapter], + spawnEntity: mockSpawn, + sendToEntity: mockSend, + entityExists: vi.fn().mockResolvedValue(false), + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + }) + }) + + it('starts adapters on start()', async () => { + await manager.start() + expect(mockAdapter.gateway.start).toHaveBeenCalled() + }) + + it('stops adapters on stop()', async () => { + await manager.start() + await manager.stop() + expect(mockAdapter.gateway.stop).toHaveBeenCalled() + }) + + it('registers entity type channel bindings', async () => { + manager.bindEntityType('support-bot', [ + telegram({ mode: 'direct' }), + ]) + + await manager.start() + + const startCall = (mockAdapter.gateway.start as vi.MockedFunction).mock.calls[0][0] as GatewayContext + expect(startCall.configs.get('support-bot')).toHaveLength(1) + expect(startCall.configs.get('support-bot')![0].platform).toBe('telegram') + }) + + it('unbinds entity type channels', async () => { + manager.bindEntityType('support-bot', [ + telegram({ mode: 'direct' }), + ]) + manager.unbindEntityType('support-bot') + + await manager.start() + + const startCall = (mockAdapter.gateway.start as vi.MockedFunction).mock.calls[0][0] as GatewayContext + expect(startCall.configs.has('support-bot')).toBe(false) + }) +}) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd packages/agents-server && npx vitest run test/channels/channel-manager.test.ts` +Expected: FAIL — module not found + +- [ ] **Step 3: Implement ChannelManager** + +```typescript +// packages/agents-server/src/channels/channel-manager.ts +import type { + ChannelConfig, + ChannelMessageWakeEvent, + PlatformAdapter, + PendingChannelSend, + Logger, +} from '@electric-ax/agents-runtime/channels' +import { HealthMonitor } from './health-monitor.js' +import { createDraftStreamLoop } from './draft-stream-loop.js' + +export interface ChannelManagerConfig { + adapters: PlatformAdapter[] + spawnEntity: ( + type: string, + instanceId: string, + args?: Record + ) => Promise<{ url: string }> + sendToEntity: ( + entityUrl: string, + payload: unknown, + opts?: { type?: string } + ) => Promise + entityExists: (entityUrl: string) => Promise + log: Logger +} + +export class ChannelManager { + private config: ChannelManagerConfig + private adapters: Map + private bindings: Map = new Map() + private healthMonitor: HealthMonitor + private abortController: AbortController | null = null + + constructor(config: ChannelManagerConfig) { + this.config = config + this.adapters = new Map( + config.adapters.map((a) => [a.platform, a]) + ) + this.healthMonitor = new HealthMonitor() + } + + bindEntityType(typeName: string, channels: ChannelConfig[]): void { + this.bindings.set(typeName, channels) + } + + unbindEntityType(typeName: string): void { + this.bindings.delete(typeName) + } + + async start(): Promise { + this.abortController = new AbortController() + + // Group configs by platform + const platformConfigs = new Map>() + for (const [entityType, channels] of this.bindings) { + for (const channel of channels) { + if (!platformConfigs.has(channel.platform)) { + platformConfigs.set(channel.platform, new Map()) + } + const typeConfigs = platformConfigs.get(channel.platform)! + if (!typeConfigs.has(entityType)) { + typeConfigs.set(entityType, []) + } + typeConfigs.get(entityType)!.push(channel) + } + } + + // Start each adapter with its configs + for (const [platform, adapter] of this.adapters) { + const configs = platformConfigs.get(platform) ?? new Map() + + // Skip adapters with no bindings + if (configs.size === 0) continue + + await adapter.gateway.start({ + configs, + abortSignal: this.abortController.signal, + log: this.config.log, + onMessage: (entityType, event) => + this.handleInboundMessage(entityType, event), + }) + + this.healthMonitor.register(platform, async () => { + await adapter.gateway.stop() + await adapter.gateway.start({ + configs, + abortSignal: this.abortController!.signal, + log: this.config.log, + onMessage: (entityType, event) => + this.handleInboundMessage(entityType, event), + }) + }) + } + + this.healthMonitor.start() + } + + async stop(): Promise { + this.healthMonitor.stop() + + if (this.abortController) { + this.abortController.abort() + } + + for (const adapter of this.adapters.values()) { + await adapter.gateway.stop() + } + } + + /** + * Send a message to a platform channel. + * Used for explicit ctx.channelSend() calls. + */ + async sendToChannel(send: PendingChannelSend): Promise { + const adapter = this.adapters.get(send.platform) + if (!adapter) { + this.config.log.error( + `No adapter for platform "${send.platform}"` + ) + return + } + + await adapter.outbound.sendText(send.chatId, send.text, { + threadId: send.threadId, + replyToId: send.replyToId, + channelData: send.channelData, + }) + } + + /** + * Forward agent text output to the originating platform chat. + * Called by the server after a channel-triggered wake produces output. + */ + async forwardAgentOutput( + platform: string, + chatId: string, + text: string, + opts?: { threadId?: string } + ): Promise { + const adapter = this.adapters.get(platform) + if (!adapter) return + + await adapter.outbound.sendText(chatId, text, { + threadId: opts?.threadId, + }) + } + + /** + * Create a DraftStreamLoop for streaming agent output. + */ + createStreamLoop( + platform: string, + chatId: string, + opts?: { threadId?: string } + ) { + const adapter = this.adapters.get(platform) + if (!adapter) return null + + if (!adapter.streaming || adapter.streaming.mode === 'off') { + return null + } + + return createDraftStreamLoop({ + chatId, + sendText: (c, t, o) => adapter.outbound.sendText(c, t, o), + editText: adapter.outbound.editText + ? (ref, t) => adapter.outbound.editText!(ref, t) + : undefined, + throttleMs: 500, + opts: { threadId: opts?.threadId }, + }) + } + + /** + * Report a heartbeat for a platform adapter. + */ + heartbeat(platform: string): void { + this.healthMonitor.heartbeat(platform) + } + + private async handleInboundMessage( + entityType: string, + event: ChannelMessageWakeEvent + ): Promise { + const { platform, chatId } = event.source + const instanceId = `${platform}-${chatId}` + const entityUrl = `/${entityType}/${instanceId}` + + try { + const exists = await this.config.entityExists(entityUrl) + + if (!exists) { + this.config.log.info( + `Auto-spawning entity ${entityUrl} for ${platform} chat ${chatId}` + ) + await this.config.spawnEntity(entityType, instanceId, { + channel: { platform, chatId }, + }) + } + + await this.config.sendToEntity(entityUrl, event, { + type: 'channel_message', + }) + + this.healthMonitor.heartbeat(platform) + } catch (err) { + this.config.log.error( + `Failed to route ${platform} message to ${entityUrl}: ${err}` + ) + } + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd packages/agents-server && npx vitest run test/channels/channel-manager.test.ts` +Expected: All tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add packages/agents-server/src/channels/channel-manager.ts packages/agents-server/test/channels/channel-manager.test.ts +git commit -m "feat(agents-server): add ChannelManager for inbound routing and outbound forwarding" +``` + +--- + +## Task 10: Wire ChannelManager into Server (`agents-server`) + +**Files:** +- Modify: `packages/agents-server/src/server.ts` (lines 196-416, 419-474) +- Modify: `packages/agents-server/src/electric-agents-manager.ts` (lines 122-174) +- Modify: `packages/agents-server/src/electric-agents-routes.ts` + +### Steps + +- [ ] **Step 1: Add Telegram webhook route to `electric-agents-routes.ts`** + +In `packages/agents-server/src/electric-agents-routes.ts`, add a new route handler for the Telegram webhook. Add this alongside the existing route patterns (around line 26): + +```typescript +// Add to the route matching in handleRequestInner or equivalent: +// Match: /_electric/channels/telegram/webhook +if ( + url.pathname === '/_electric/channels/telegram/webhook' && + method === 'POST' +) { + return this.handleTelegramWebhook(req) +} +``` + +Add the handler method: + +```typescript +private async handleTelegramWebhook(req: Request): Promise { + if (!this.telegramAdapter) { + return new Response('Telegram not configured', { status: 404 }) + } + + try { + const update = await req.json() + await this.telegramAdapter.handleWebhookUpdate(update) + return new Response('ok', { status: 200 }) + } catch (err) { + return new Response('Internal error', { status: 500 }) + } +} +``` + +The routes class will need a reference to the `TelegramAdapter` — pass it via constructor. + +- [ ] **Step 2: Notify ChannelManager on entity type registration in `electric-agents-manager.ts`** + +In `packages/agents-server/src/electric-agents-manager.ts`, add a `channelManager` field and setter: + +```typescript +private channelManager: ChannelManager | null = null + +setChannelManager(channelManager: ChannelManager): void { + this.channelManager = channelManager +} +``` + +In `registerEntityType()` (around line 174, after storing the entity type), add: + +```typescript +if (entityType.channels && entityType.channels.length > 0 && this.channelManager) { + this.channelManager.bindEntityType(entityType.name, entityType.channels) +} +``` + +- [ ] **Step 3: Wire ChannelManager into server startup in `server.ts`** + +In `packages/agents-server/src/server.ts`, in the `start()` method (around line 390, after creating routes): + +```typescript +import { ChannelManager } from './channels/channel-manager.js' +import { TelegramAdapter } from './channels/telegram/telegram-adapter.js' +import { DiscordAdapter } from './channels/discord/discord-adapter.js' + +// In start(): +const adapters: PlatformAdapter[] = [] + +const telegramToken = process.env.TELEGRAM_BOT_TOKEN +const telegramWebhookUrl = process.env.TELEGRAM_WEBHOOK_BASE_URL +if (telegramToken && telegramWebhookUrl) { + adapters.push(new TelegramAdapter(telegramToken, telegramWebhookUrl)) + this.log.info('Telegram adapter configured') +} + +const discordToken = process.env.DISCORD_BOT_TOKEN +const discordAppId = process.env.DISCORD_APP_ID +if (discordToken && discordAppId) { + adapters.push(new DiscordAdapter(discordToken, discordAppId)) + this.log.info('Discord adapter configured') +} + +if (adapters.length > 0) { + this.channelManager = new ChannelManager({ + adapters, + spawnEntity: (type, id, args) => manager.spawn(type, id, { args }), + sendToEntity: (url, payload, opts) => manager.send(url, payload, opts), + entityExists: async (url) => { + try { + const entity = await manager.getEntity(url) + return entity !== null && entity.status !== 'stopped' + } catch { + return false + } + }, + log: this.log, + }) + + manager.setChannelManager(this.channelManager) + + // Bind existing entity types with channels + const entityTypes = await registry.listEntityTypes() + for (const et of entityTypes) { + if (et.channels && et.channels.length > 0) { + this.channelManager.bindEntityType(et.name, et.channels) + } + } + + await this.channelManager.start() +} +``` + +- [ ] **Step 4: Wire ChannelManager into server shutdown** + +In `packages/agents-server/src/server.ts`, in the `stop()` method (around line 420): + +```typescript +if (this.channelManager) { + await this.channelManager.stop() +} +``` + +Add the field to the class: + +```typescript +private channelManager: ChannelManager | null = null +``` + +- [ ] **Step 5: Verify existing tests still pass** + +Run: `cd packages/agents-server && npx vitest run` +Expected: All existing tests PASS (no regressions) + +- [ ] **Step 6: Commit** + +```bash +git add packages/agents-server/src/server.ts packages/agents-server/src/electric-agents-manager.ts packages/agents-server/src/electric-agents-routes.ts +git commit -m "feat(agents-server): wire ChannelManager into server startup, routing, and shutdown" +``` + +--- + +## Task 11: Wire Agent Output Forwarding in Wake Processing + +**Files:** +- Modify: `packages/agents-runtime/src/process-wake.ts` (lines 1207-1394) +- Modify: `packages/agents-server/src/server.ts` + +### Steps + +- [ ] **Step 1: Add channel output forwarding after handler execution** + +The wake processing needs to detect when a wake was triggered by a `channel_message` and forward agent text output to the originating platform. + +In the server code that orchestrates wake processing (in `server.ts` or wherever `processWebhookWake` results are consumed), add post-wake forwarding logic: + +```typescript +// After processWebhookWake completes for a channel-triggered wake: +if ( + wakeEvent.type === 'channel_message' && + this.channelManager +) { + const channelEvent = wakeEvent as ChannelMessageWakeEvent + const { platform, chatId, threadId } = channelEvent.source + + // Find the entity type config to check forwardAgentOutput + const entityType = await registry.getEntityType(entity.type) + const channelConfig = entityType?.channels?.find( + (c) => c.platform === platform + ) + + if (channelConfig?.forwardAgentOutput) { + // Read agent text output from the entity's stream events + // produced during this wake + const agentText = collectAgentTextFromWakeResult(wakeResult) + + if (agentText) { + await this.channelManager.forwardAgentOutput( + platform, + chatId, + agentText, + { threadId } + ) + } + } + + // Deliver any explicit ctx.channelSend() calls + const pendingChannelSends = wakeResult.pendingChannelSends ?? [] + for (const send of pendingChannelSends) { + await this.channelManager.sendToChannel(send) + } +} +``` + +The `collectAgentTextFromWakeResult` function extracts concatenated text from `text` events produced during the wake. The exact implementation depends on how `WakeResult` exposes produced events — inspect the actual return type of `processWebhookWake()` and adapt accordingly. + +- [ ] **Step 2: Thread pending channel sends through wake processing** + +In `packages/agents-runtime/src/process-wake.ts`, the wake session needs to collect `PendingChannelSend` items queued by `ctx.channelSend()`: + +Add to the wake session's state tracking (alongside `pendingSends`): + +```typescript +const pendingChannelSends: PendingChannelSend[] = [] + +// Pass enqueueChannelSend into createHandlerContext config: +enqueueChannelSend: (send: PendingChannelSend) => { + pendingChannelSends.push(send) +} +``` + +Include `pendingChannelSends` in the wake result returned to the server. + +- [ ] **Step 3: Verify existing tests still pass** + +Run: `cd packages/agents-server && npx vitest run` +Expected: All existing tests PASS + +- [ ] **Step 4: Commit** + +```bash +git add packages/agents-runtime/src/process-wake.ts packages/agents-server/src/server.ts +git commit -m "feat: wire agent output forwarding and channelSend delivery after wake completion" +``` + +--- + +## Task 12: Integration Test — End-to-End Channel Flow + +**Files:** +- Create: `packages/agents-server/test/channels/channel-integration.test.ts` + +### Steps + +- [ ] **Step 1: Write integration test for full inbound → spawn → outbound flow** + +```typescript +// packages/agents-server/test/channels/channel-integration.test.ts +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { ChannelManager } from '../src/channels/channel-manager.js' +import { createDraftStreamLoop } from '../src/channels/draft-stream-loop.js' +import { telegram, discord } from '@electric-ax/agents-runtime/channels' +import type { + PlatformAdapter, + GatewayAdapter, + GatewayContext, + OutboundAdapter, + MessageRef, + ChannelMessageWakeEvent, +} from '@electric-ax/agents-runtime/channels' + +function createTestAdapter(platform: string): PlatformAdapter & { + simulateMessage: (entityType: string, event: ChannelMessageWakeEvent) => Promise + sentMessages: Array<{ chatId: string; text: string }> +} { + let onMessage: ((et: string, ev: ChannelMessageWakeEvent) => Promise) | null = null + const sentMessages: Array<{ chatId: string; text: string }> = [] + + return { + platform, + sentMessages, + gateway: { + start: vi.fn(async (ctx: GatewayContext) => { + onMessage = ctx.onMessage + }), + stop: vi.fn(async () => {}), + }, + outbound: { + sendText: vi.fn(async (chatId: string, text: string): Promise => { + sentMessages.push({ chatId, text }) + return { messageId: `msg-${sentMessages.length}`, chatId } + }), + editText: vi.fn(async () => {}), + }, + streaming: { mode: 'partial' as const }, + async simulateMessage(entityType: string, event: ChannelMessageWakeEvent) { + if (!onMessage) throw new Error('Adapter not started') + await onMessage(entityType, event) + }, + } +} + +describe('Channel Integration', () => { + it('routes telegram message to correct entity type and auto-spawns', async () => { + const telegramAdapter = createTestAdapter('telegram') + const spawnEntity = vi.fn().mockResolvedValue({ url: '/bot/telegram-12345' }) + const sendToEntity = vi.fn().mockResolvedValue(undefined) + const entityExists = vi.fn().mockResolvedValue(false) + + const manager = new ChannelManager({ + adapters: [telegramAdapter], + spawnEntity, + sendToEntity, + entityExists, + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + }) + + manager.bindEntityType('bot', [telegram({ mode: 'direct' })]) + await manager.start() + + await telegramAdapter.simulateMessage('bot', { + type: 'channel_message', + source: { platform: 'telegram', chatId: '12345', messageId: 'msg1' }, + sender: { id: 'user1', username: 'alice' }, + payload: { text: 'hello' }, + }) + + expect(spawnEntity).toHaveBeenCalledWith('bot', 'telegram-12345', { + channel: { platform: 'telegram', chatId: '12345' }, + }) + expect(sendToEntity).toHaveBeenCalledWith( + '/bot/telegram-12345', + expect.objectContaining({ type: 'channel_message' }), + { type: 'channel_message' } + ) + }) + + it('does not re-spawn existing entity', async () => { + const telegramAdapter = createTestAdapter('telegram') + const spawnEntity = vi.fn() + const sendToEntity = vi.fn().mockResolvedValue(undefined) + const entityExists = vi.fn().mockResolvedValue(true) + + const manager = new ChannelManager({ + adapters: [telegramAdapter], + spawnEntity, + sendToEntity, + entityExists, + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + }) + + manager.bindEntityType('bot', [telegram({ mode: 'direct' })]) + await manager.start() + + await telegramAdapter.simulateMessage('bot', { + type: 'channel_message', + source: { platform: 'telegram', chatId: '12345', messageId: 'msg1' }, + sender: { id: 'user1' }, + payload: { text: 'hello' }, + }) + + expect(spawnEntity).not.toHaveBeenCalled() + expect(sendToEntity).toHaveBeenCalled() + }) + + it('supports multi-platform entity type', async () => { + const telegramAdapter = createTestAdapter('telegram') + const discordAdapter = createTestAdapter('discord') + const spawnEntity = vi.fn().mockResolvedValue({ url: '/bot/telegram-123' }) + const sendToEntity = vi.fn().mockResolvedValue(undefined) + const entityExists = vi.fn().mockResolvedValue(false) + + const manager = new ChannelManager({ + adapters: [telegramAdapter, discordAdapter], + spawnEntity, + sendToEntity, + entityExists, + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + }) + + manager.bindEntityType('bot', [ + telegram({ mode: 'direct' }), + discord({ mode: 'channel' }), + ]) + await manager.start() + + // Telegram message + await telegramAdapter.simulateMessage('bot', { + type: 'channel_message', + source: { platform: 'telegram', chatId: '111', messageId: 'msg1' }, + sender: { id: 'user1' }, + payload: { text: 'from telegram' }, + }) + + // Discord message + await discordAdapter.simulateMessage('bot', { + type: 'channel_message', + source: { platform: 'discord', chatId: '222', messageId: 'msg2' }, + sender: { id: 'user2' }, + payload: { text: 'from discord' }, + }) + + expect(spawnEntity).toHaveBeenCalledTimes(2) + expect(spawnEntity).toHaveBeenCalledWith('bot', 'telegram-111', expect.any(Object)) + expect(spawnEntity).toHaveBeenCalledWith('bot', 'discord-222', expect.any(Object)) + }) + + it('forwards agent output to originating platform', async () => { + const telegramAdapter = createTestAdapter('telegram') + const manager = new ChannelManager({ + adapters: [telegramAdapter], + spawnEntity: vi.fn().mockResolvedValue({ url: '/bot/telegram-123' }), + sendToEntity: vi.fn().mockResolvedValue(undefined), + entityExists: vi.fn().mockResolvedValue(true), + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + }) + + manager.bindEntityType('bot', [telegram({ mode: 'direct' })]) + await manager.start() + + await manager.forwardAgentOutput('telegram', '12345', 'Agent response') + + expect(telegramAdapter.sentMessages).toEqual([ + { chatId: '12345', text: 'Agent response' }, + ]) + }) + + it('explicit sendToChannel works', async () => { + const telegramAdapter = createTestAdapter('telegram') + const manager = new ChannelManager({ + adapters: [telegramAdapter], + spawnEntity: vi.fn(), + sendToEntity: vi.fn(), + entityExists: vi.fn(), + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + }) + + await manager.start() + + await manager.sendToChannel({ + platform: 'telegram', + chatId: '99999', + text: 'Proactive message', + }) + + expect(telegramAdapter.sentMessages).toEqual([ + { chatId: '99999', text: 'Proactive message' }, + ]) + }) +}) +``` + +- [ ] **Step 2: Run tests to verify they pass** + +Run: `cd packages/agents-server && npx vitest run test/channels/channel-integration.test.ts` +Expected: All 5 tests PASS + +- [ ] **Step 3: Commit** + +```bash +git add packages/agents-server/test/channels/channel-integration.test.ts +git commit -m "test(agents-server): add integration tests for channel routing and forwarding" +``` + +--- + +## Task 13: Export Cleanup and Final Verification + +**Files:** +- Modify: `packages/agents-server/src/channels/index.ts` (create) +- Verify: all packages build and tests pass + +### Steps + +- [ ] **Step 1: Create server channels index** + +```typescript +// packages/agents-server/src/channels/index.ts +export { ChannelManager } from './channel-manager.js' +export type { ChannelManagerConfig } from './channel-manager.js' +export { createDraftStreamLoop } from './draft-stream-loop.js' +export type { DraftStreamLoop, DraftStreamLoopConfig } from './draft-stream-loop.js' +export { HealthMonitor, DEFAULT_HEALTH_CONFIG } from './health-monitor.js' +export type { HealthMonitorConfig } from './health-monitor.js' +export { TelegramAdapter } from './telegram/telegram-adapter.js' +export { DiscordAdapter } from './discord/discord-adapter.js' +``` + +- [ ] **Step 2: Run full test suite for agents-runtime** + +Run: `cd packages/agents-runtime && npx vitest run` +Expected: All tests PASS + +- [ ] **Step 3: Run full test suite for agents-server** + +Run: `cd packages/agents-server && npx vitest run` +Expected: All tests PASS + +- [ ] **Step 4: Build both packages** + +Run: `cd packages/agents-runtime && pnpm build && cd ../agents-server && pnpm build` +Expected: Both packages build without errors + +- [ ] **Step 5: Commit** + +```bash +git add packages/agents-server/src/channels/index.ts +git commit -m "feat: finalize channel connector exports and verify builds" +``` diff --git a/docs/superpowers/specs/2026-04-24-entity-channel-connectors-design.md b/docs/superpowers/specs/2026-04-24-entity-channel-connectors-design.md new file mode 100644 index 0000000000..d3feb0e716 --- /dev/null +++ b/docs/superpowers/specs/2026-04-24-entity-channel-connectors-design.md @@ -0,0 +1,424 @@ +# Entity Channel Connectors + +Connect entities to external messaging platforms (Telegram, Discord, Slack, etc.) with bidirectional communication configured at the entity type definition level. + +## Overview + +Entities gain a new `channels` property in `defineEntity()` that declares which messaging platforms they connect to and how. Inbound platform messages wake entities with normalized `channel_message` events. Agent output is automatically forwarded back to the originating chat. The system is designed for multi-platform support; v1 implements Telegram (webhook) and Discord (Gateway WebSocket) to validate the abstraction. + +Design informed by [OpenClaw](https://github.com/openclaw/openclaw)'s channel adapter architecture — specifically the composable adapter pattern, normalized payload with escape hatch, streaming via draft loops, and health monitoring. + +## Configuration API + +### Entity Type Definition + +```typescript +import { telegram, discord } from '@electric-sql/agents-runtime/channels' + +defineEntity({ + description: 'Support bot', + channels: [ + telegram({ + mode: 'direct', + allowedChatIds: [], + forwardAgentOutput: true, + }), + discord({ + mode: 'channel', + allowedChatIds: ['#support'], + forwardAgentOutput: true, + }), + ], + handler: async (ctx, wake) => { + await ctx.agent.run(wake.payload.text) + }, +}) +``` + +### Channel Config Type + +```typescript +interface ChannelConfig { + platform: string // 'telegram' | 'discord' | 'slack' | ... + mode: string // platform-specific interaction mode + allowedChatIds?: string[] // optional whitelist (empty = all) + forwardAgentOutput: boolean // auto-relay agent text output to originating chat +} +``` + +Each platform has a typed config factory (`telegram()`, `discord()`) that produces a `ChannelConfig` with platform-specific defaults and validation. + +### Server-Level Credentials + +Global credentials configured via environment variables: + +```bash +# Telegram +TELEGRAM_BOT_TOKEN=your-bot-token +TELEGRAM_WEBHOOK_BASE_URL=https://your-server.com + +# Discord +DISCORD_BOT_TOKEN=your-bot-token +DISCORD_APP_ID=your-app-id +``` + +Credentials are global to the server. Entity definitions reference platforms by name; they never contain secrets. + +## Inbound: Platform Messages to Entity Wakes + +### Wake Event Structure + +```typescript +interface ChannelMessageWakeEvent { + type: 'channel_message' + source: { + platform: 'telegram' | 'discord' + chatId: string + messageId: string + threadId?: string + } + sender: { + id: string + username?: string + } + payload: { + text: string + replyTo?: string + attachments?: Attachment[] // future + channelData?: Record // platform-specific raw data (escape hatch) + } +} +``` + +The `channelData` escape hatch allows handlers to access platform-specific fields (Telegram stickers, Discord embeds, etc.) without polluting the common type. + +### Handler Usage + +```typescript +handler: async (ctx, wake) => { + if (wake.type === 'channel_message') { + // Platform message — wake.source.platform identifies origin + await ctx.agent.run(wake.payload.text) + } else if (wake.type === 'message_received') { + // Inter-entity message + } else if (wake.type === 'cron') { + // Scheduled wake + } +} +``` + +The handler does not need platform-specific knowledge. The wake event is normalized across all platforms. + +### Entity Instance Routing + +Inbound messages map to entity instances by chat ID: + +- Instance URL: `/{type}/telegram-{chatId}` or `/{type}/discord-{chatId}` +- If no instance exists for a chat, one is **auto-spawned** +- `allowedChatIds` filtering happens before routing — messages from non-whitelisted chats are dropped + +This means users can start chatting with a bot and an entity instance materializes automatically. + +## Outbound: Entity Output to Platforms + +### Implicit (Agent Output Forwarding) + +When `forwardAgentOutput: true` and the wake was triggered by a `channel_message`: + +1. Handler calls `ctx.agent.run(...)` +2. Agent produces `text` and `text_delta` events on the entity's main durable stream (existing behavior) +3. Server-side `ChannelManager` watches the stream during channel-triggered wakes +4. Text events are forwarded to the platform chat identified by `wake.source.chatId` +5. For streaming: a `DraftStreamLoop` batches `text_delta` events and live-edits a message on the platform (see Streaming section) + +No new handler API needed for the common case. + +### Explicit (`ctx.channelSend`) + +For proactive messaging (e.g., from a cron wake): + +```typescript +handler: async (ctx, wake) => { + if (wake.type === 'cron') { + ctx.channelSend('telegram', { + chatId: '12345', + text: 'Daily summary: ...', + channelData: { parse_mode: 'MarkdownV2' }, // platform-specific options + }) + } +} +``` + +`ctx.channelSend()` queues outbound messages, delivered at wake completion (same pattern as `ctx.send()` for inter-entity messages). The `channelData` escape hatch allows passing platform-specific options without polluting the common API. + +### Streaming (DraftStreamLoop) + +Both Telegram and Discord support editing sent messages, enabling live-streaming of agent output: + +```typescript +interface DraftStreamLoop { + onDelta(text: string): void // feed text_delta events + flush(): Promise // send final accumulated text +} +``` + +The loop: +1. Sends an initial message on the first delta +2. Accumulates subsequent deltas +3. Throttles `editMessage` calls (e.g., every 500ms) to avoid rate limits +4. On completion, sends the final full text + +Each outbound adapter implements the `sendText` / `editText` primitives; the `DraftStreamLoop` handles batching and throttling. Platforms that don't support editing fall back to send-on-complete. + +## Inbound-to-Outbound Integration Flow + +The full flow showing how platform messages integrate with the existing entity system: + +``` +┌──────────────────────────────────────────────────────────────────────┐ +│ Inbound Flow (platform → entity) │ +└──────────────────────────────────────────────────────────────────────┘ + + Platform event (webhook POST or WebSocket message) + │ + ▼ + PlatformAdapter normalizes to ChannelMessageWakeEvent + │ + ▼ + ChannelManager resolves entity type (which types have this platform?) + │ + ▼ + Derives instance URL: /{type}/{platform}-{chatId} + │ + ├── Instance exists? + │ YES → ElectricAgentsManager writes wake event to entity's durable stream + │ NO → ElectricAgentsManager.spawn() creates instance, then writes wake + │ + ▼ + serve_endpoint webhook fires → processWebhookWake() + │ + ▼ + Handler executes with wake.type === 'channel_message' + +┌──────────────────────────────────────────────────────────────────────┐ +│ Outbound Flow (entity → platform) │ +└──────────────────────────────────────────────────────────────────────┘ + + Handler calls ctx.agent.run(wake.payload.text) + │ + ▼ + Agent produces text/text_delta events → written to entity's main durable stream + │ + ▼ + ChannelManager watches stream (subscribed during channel-triggered wakes) + │ + ├── Streaming enabled? + │ YES → DraftStreamLoop batches deltas, calls adapter.outbound.editText() + │ NO → On completion, calls adapter.outbound.sendText() + │ + ▼ + PlatformAdapter sends message via platform API + (Telegram: sendMessage/editMessageText, Discord: POST /channels/{id}/messages) +``` + +The key insight: **no new execution model is needed**. Channel messages are just another way to wake an entity, reusing the existing `ElectricAgentsManager.spawn()` and durable stream infrastructure. The `ChannelManager` acts as a translator between platform events and entity wake events, and watches entity streams for outbound delivery. + +## Server-Side Architecture + +### Component Structure + +``` +agents-server +├── ChannelManager (new) +│ ├── PlatformAdapter (composable interface) +│ │ ├── TelegramAdapter +│ │ └── DiscordAdapter +│ ├── DraftStreamLoop (streaming helper) +│ ├── HealthMonitor (connection health + auto-restart) +│ ├── Inbound: platform message → entity wake +│ └── Outbound: entity stream events → platform message +├── ElectricAgentsManager (existing, used by ChannelManager) +└── ... +``` + +### ChannelManager + +Responsibilities: +- Manages platform adapter lifecycle (start/stop) +- On entity type registration, inspects `channels` config and sets up listeners +- Routes inbound messages to the correct entity instance (auto-spawning if needed) +- Watches entity streams during channel-triggered wakes for outbound forwarding +- Monitors adapter health and auto-restarts on failure + +### PlatformAdapter — Composable Interface + +Inspired by OpenClaw's adapter bag pattern, the `PlatformAdapter` is split into focused sub-interfaces. Adapters implement only what their platform supports: + +```typescript +interface PlatformAdapter { + platform: string + + // Required + gateway: GatewayAdapter // start/stop connections + outbound: OutboundAdapter // send messages + + // Optional — implement what the platform supports + streaming?: StreamingAdapter // live-edit messages with text deltas + threading?: ThreadingAdapter // thread/topic management + groups?: GroupAdapter // group-specific behavior (mention gating) +} + +interface GatewayAdapter { + start(ctx: GatewayContext): Promise + stop(): Promise +} + +interface GatewayContext { + configs: Map // entity type name → configs for this platform + abortSignal: AbortSignal // cancelled on stop/restart + log: Logger + onMessage: (entityType: string, event: ChannelMessageWakeEvent) => Promise +} + +interface OutboundAdapter { + sendText(chatId: string, text: string, opts?: OutboundOpts): Promise + editText?(ref: MessageRef, text: string): Promise +} + +interface OutboundOpts { + threadId?: string + replyToId?: string + channelData?: Record // platform-specific options +} + +interface MessageRef { + messageId: string + chatId: string +} + +interface StreamingAdapter { + mode: 'partial' | 'off' // 'partial' = edit-in-place + createStreamLoop(chatId: string, opts?: OutboundOpts): DraftStreamLoop +} +``` + +This design means: +- A minimal new adapter only needs `gateway` + `outbound` +- The core system checks for optional adapters before using them: `if (adapter.streaming) { ... } else { sendOnComplete() }` +- Platform-specific capabilities are opt-in, not forced + +### TelegramAdapter + +- **Gateway:** Calls Telegram `setWebhook` API on startup; registers `/_electric/channels/telegram/webhook` route via `GatewayContext` +- **Outbound:** `sendMessage` / `editMessageText` via Telegram Bot API +- **Streaming:** Supported — `editMessageText` with throttled deltas +- **Modes:** `direct` (private chats), `group` (group chats) + +### DiscordAdapter + +- **Gateway:** Connects to Discord Gateway via WebSocket (`discord.js`); listens for `messageCreate` events +- **Outbound:** `POST /channels/{id}/messages` / `PATCH /channels/{id}/messages/{msg_id}` via Discord REST API +- **Streaming:** Supported — edit message with accumulated text +- **Threading:** Supported — thread creation and reply routing +- **Modes:** `direct` (DMs), `channel` (server channels), `thread` (thread-based) + +### Health Monitoring + +Borrowed from OpenClaw's production-grade health monitor: + +- Track `lastEventAt` timestamp per adapter +- Periodic health checks (every 5 minutes) +- Auto-restart with exponential backoff: 5s initial → 5min max, factor 2, 10% jitter +- Max 10 restart attempts before giving up and logging an error +- 60-second startup grace period (no health checks during initial connection) + +```typescript +interface HealthMonitorConfig { + checkIntervalMs: number // default: 300_000 (5 min) + staleThresholdMs: number // default: 120_000 (2 min) + startupGraceMs: number // default: 60_000 (1 min) + backoff: { + initialMs: number // default: 5_000 + maxMs: number // default: 300_000 + factor: number // default: 2 + jitter: number // default: 0.1 + } + maxRestartAttempts: number // default: 10 +} +``` + +## Entity Type Registration & Storage + +### Schema Changes + +`channels` stored as JSONB on the `entity_types` table: + +```typescript +interface ElectricAgentsEntityType { + name: string + description: string + creation_schema?: Record + inbox_schemas?: Record> + state_schemas?: Record> + channels?: ChannelConfig[] // new field + serve_endpoint?: string + revision: number + created_at: string + updated_at: string +} +``` + +### Registration Flow + +1. `ElectricAgentsManager.registerEntityType()` validates channel configs +2. Notifies `ChannelManager` of new/updated channel bindings +3. `ChannelManager` starts or reconfigures the relevant platform adapter + +### Credential Validation + +On server startup, `ChannelManager` checks that required env vars exist for any platform referenced by registered entity types. Missing credentials produce a clear error log and skip that adapter rather than crashing the server. + +### Entity Type Removal + +1. `ChannelManager` tears down listeners for that type +2. Existing entity instances remain but stop receiving platform messages + +## Package Structure + +| Concern | Package | Location | +|---------|---------|----------| +| `telegram()`, `discord()` config factories | `agents-runtime` | `src/channels/` | +| `ChannelConfig` types, `ChannelMessageWakeEvent` | `agents-runtime` | `src/channels/types.ts` | +| `PlatformAdapter` sub-interfaces | `agents-runtime` | `src/channels/adapter-types.ts` | +| `ctx.channelSend()` | `agents-runtime` | added to handler context | +| `ChannelManager`, adapter registry | `agents-server` | `src/channels/` | +| `DraftStreamLoop` | `agents-server` | `src/channels/draft-stream-loop.ts` | +| `HealthMonitor` | `agents-server` | `src/channels/health-monitor.ts` | +| `TelegramAdapter` | `agents-server` | `src/channels/telegram/` | +| `DiscordAdapter` | `agents-server` | `src/channels/discord/` | +| `channels` column on `entity_types` | `agents-server` | `src/db/schema.ts` | + +## V1 Scope + +### In Scope + +- `ChannelConfig` base type + `TelegramChannelConfig` + `DiscordChannelConfig` +- `telegram()` and `discord()` config factories +- `ChannelManager` with composable `PlatformAdapter` interface +- `TelegramAdapter` with webhook transport +- `DiscordAdapter` with Gateway WebSocket transport +- `channel_message` wake event type with `channelData` escape hatch +- Auto-spawn entity instances on first inbound message +- Agent output forwarding (implicit, `forwardAgentOutput: true`) +- Streaming via `DraftStreamLoop` (edit-in-place on both platforms) +- `ctx.channelSend()` for explicit outbound with `channelData` support +- Health monitoring with exponential backoff and auto-restart +- Telegram modes: `direct`, `group` +- Discord modes: `direct`, `channel`, `thread` + +### Out of Scope + +- Slack adapter +- Attachments / media messages +- Telegram `channel` and `thread` modes +- UI support in `agents-server-ui`