diff --git a/genkit-tools/cli/src/commands/mcp.ts b/genkit-tools/cli/src/commands/mcp.ts index 02a007c128..3b6e4eb1ca 100644 --- a/genkit-tools/cli/src/commands/mcp.ts +++ b/genkit-tools/cli/src/commands/mcp.ts @@ -14,19 +14,28 @@ * limitations under the License. */ -import { findProjectRoot, forceStderr } from '@genkit-ai/tools-common/utils'; +import { + debugToFile, + findProjectRoot, + forceStderr, +} from '@genkit-ai/tools-common/utils'; import { Command } from 'commander'; import { startMcpServer } from '../mcp/server'; interface McpOptions { projectRoot?: string; + debug?: boolean; } /** Command to run MCP server. */ export const mcp = new Command('mcp') .option('--project-root [projectRoot]', 'Project root') + .option('-d, --debug', 'debug to file', false) .description('run MCP stdio server (EXPERIMENTAL, subject to change)') .action(async (options: McpOptions) => { forceStderr(); + if (options.debug) { + debugToFile(); + } await startMcpServer(options.projectRoot ?? (await findProjectRoot())); }); diff --git a/genkit-tools/cli/src/mcp/runtime.ts b/genkit-tools/cli/src/mcp/runtime.ts index daa52e622d..2faa320236 100644 --- a/genkit-tools/cli/src/mcp/runtime.ts +++ b/genkit-tools/cli/src/mcp/runtime.ts @@ -34,8 +34,12 @@ export function defineRuntimeTools( {command: 'go', args: ['run', 'main.go']} {command: 'npm', args: ['run', 'dev']}`, inputSchema: { - command: z.string(), - args: z.array(z.string()), + command: z.string().describe('The command to run'), + args: z + .array(z.string()) + .describe( + 'List of command line arguments. IMPORTANT: This must be an array of strings, not a single string.' + ), }, }, async ({ command, args }) => { diff --git a/genkit-tools/cli/src/mcp/server.ts b/genkit-tools/cli/src/mcp/server.ts index 6aecf2a218..9bbd03a44f 100644 --- a/genkit-tools/cli/src/mcp/server.ts +++ b/genkit-tools/cli/src/mcp/server.ts @@ -26,6 +26,7 @@ import { defineUsageGuideTool } from './usage'; import { McpRuntimeManager } from './util'; export async function startMcpServer(projectRoot: string) { + logger.info(`Starting MCP server in: ${projectRoot}`); const server = new McpServer({ name: 'Genkit MCP', version: '0.0.2', diff --git a/genkit-tools/cli/src/mcp/util.ts b/genkit-tools/cli/src/mcp/util.ts index f39ac569bf..897a4edf3d 100644 --- a/genkit-tools/cli/src/mcp/util.ts +++ b/genkit-tools/cli/src/mcp/util.ts @@ -41,7 +41,8 @@ export class McpRuntimeManager { const devManager = await startDevProcessManager( this.projectRoot, command, - args + args, + { nonInteractive: true, healthCheck: true } ); this.manager = devManager.manager; return this.manager; diff --git a/genkit-tools/cli/src/utils/manager-utils.ts b/genkit-tools/cli/src/utils/manager-utils.ts index 187e32a116..c61f0a77df 100644 --- a/genkit-tools/cli/src/utils/manager-utils.ts +++ b/genkit-tools/cli/src/utils/manager-utils.ts @@ -21,6 +21,7 @@ import { import type { Status } from '@genkit-ai/tools-common'; import { ProcessManager, + RuntimeEvent, RuntimeManager, type GenkitToolsError, } from '@genkit-ai/tools-common/manager'; @@ -68,6 +69,8 @@ export async function startManager( export interface DevProcessManagerOptions { disableRealtimeTelemetry?: boolean; + nonInteractive?: boolean; + healthCheck?: boolean; } export async function startDevProcessManager( @@ -93,10 +96,68 @@ export async function startDevProcessManager( processManager, disableRealtimeTelemetry, }); - const processPromise = processManager.start(); + const processPromise = processManager.start({ ...options, cwd: projectRoot }); + + if (options?.healthCheck) { + await waitForRuntime(manager, processPromise); + } + return { manager, processPromise }; } +/** + * Waits for a new runtime to register itself. + * Rejects if the process exits or if the timeout is reached. + */ +export async function waitForRuntime( + manager: RuntimeManager, + processPromise: Promise +): Promise { + const TIMEOUT_MS = 30000; + let unsubscribe: (() => void) | undefined; + let timeoutId: NodeJS.Timeout | undefined; + + if (manager.listRuntimes().length > 0) { + return; + } + + try { + const runtimeAddedPromise = new Promise((resolve) => { + unsubscribe = manager.onRuntimeEvent((event) => { + // Just listen for a new runtime, not for a specific ID. + if (event === RuntimeEvent.ADD) { + resolve(); + } + }); + if (manager.listRuntimes().length > 0) { + resolve(); + } + }); + + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout( + () => reject(new Error('Timeout waiting for runtime to be ready')), + TIMEOUT_MS + ); + }); + + const processExitedPromise = processPromise.then( + () => + Promise.reject(new Error('Process exited before runtime was ready')), + (err) => Promise.reject(err) + ); + + await Promise.race([ + runtimeAddedPromise, + timeoutPromise, + processExitedPromise, + ]); + } finally { + if (unsubscribe) unsubscribe(); + if (timeoutId) clearTimeout(timeoutId); + } +} + /** * Runs the given function with a runtime manager. */ diff --git a/genkit-tools/cli/tests/commands/start_test.ts b/genkit-tools/cli/tests/commands/start_test.ts new file mode 100644 index 0000000000..76c90e0833 --- /dev/null +++ b/genkit-tools/cli/tests/commands/start_test.ts @@ -0,0 +1,136 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { startServer } from '@genkit-ai/tools-common/server'; +import { + afterEach, + beforeEach, + describe, + expect, + it, + jest, +} from '@jest/globals'; +import { start } from '../../src/commands/start'; +import * as managerUtils from '../../src/utils/manager-utils'; + +jest.mock('@genkit-ai/tools-common/server'); +jest.mock('@genkit-ai/tools-common/utils', () => ({ + findProjectRoot: jest.fn(() => Promise.resolve('/mock/root')), + logger: { + warn: jest.fn(), + error: jest.fn(), + }, +})); +jest.mock('get-port', () => ({ + __esModule: true, + default: jest.fn(() => Promise.resolve(4000)), + makeRange: jest.fn(), +})); +jest.mock('open'); + +describe('start command', () => { + let startDevProcessManagerSpy: any; + let startManagerSpy: any; + let startServerSpy: any; + + beforeEach(() => { + startDevProcessManagerSpy = jest + .spyOn(managerUtils, 'startDevProcessManager') + .mockResolvedValue({ + manager: {} as any, + processPromise: Promise.resolve(), + }); + startManagerSpy = jest + .spyOn(managerUtils, 'startManager') + .mockResolvedValue({} as any); + startServerSpy = startServer as unknown as jest.Mock; + + // Reset args + start.args = []; + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + it('should start dev process manager when args are provided', async () => { + await start.parseAsync(['node', 'genkit', 'run', 'app']); + + expect(startDevProcessManagerSpy).toHaveBeenCalledWith( + '/mock/root', + 'run', + ['app'], + expect.objectContaining({ disableRealtimeTelemetry: undefined }) + ); + expect(startServerSpy).toHaveBeenCalled(); + }); + + it('should start manager only when no args are provided', async () => { + let resolveServerStarted: () => void; + const serverStartedPromise = new Promise((resolve) => { + resolveServerStarted = resolve; + }); + + startServerSpy.mockImplementation(() => { + resolveServerStarted(); + }); + + start.parseAsync(['node', 'genkit']); + + await serverStartedPromise; + + expect(startManagerSpy).toHaveBeenCalledWith('/mock/root', true); + expect(startDevProcessManagerSpy).not.toHaveBeenCalled(); + expect(startServerSpy).toHaveBeenCalled(); + }); + + it('should not start server if --noui is provided', async () => { + let resolveManagerStarted: () => void; + const managerStartedPromise = new Promise((resolve) => { + resolveManagerStarted = resolve; + }); + + startManagerSpy.mockImplementation(() => { + resolveManagerStarted(); + return Promise.resolve({} as any); + }); + + start.parseAsync(['node', 'genkit', '--noui']); + + await managerStartedPromise; + // Wait for the synchronous continuation after startManager resolves + await new Promise((resolve) => setImmediate(resolve)); + + expect(startServerSpy).not.toHaveBeenCalled(); + }); + + it('should pass disableRealtimeTelemetry option', async () => { + await start.parseAsync([ + 'node', + 'genkit', + 'run', + 'app', + '--disable-realtime-telemetry', + ]); + + expect(startDevProcessManagerSpy).toHaveBeenCalledWith( + expect.anything(), + expect.anything(), + expect.anything(), + expect.objectContaining({ disableRealtimeTelemetry: true }) + ); + }); +}); diff --git a/genkit-tools/cli/tests/utils/manager-utils_test.ts b/genkit-tools/cli/tests/utils/manager-utils_test.ts new file mode 100644 index 0000000000..bd7bad5f1b --- /dev/null +++ b/genkit-tools/cli/tests/utils/manager-utils_test.ts @@ -0,0 +1,88 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { RuntimeEvent } from '@genkit-ai/tools-common/manager'; +import { beforeEach, describe, expect, it, jest } from '@jest/globals'; +import { waitForRuntime } from '../../src/utils/manager-utils'; + +describe('waitForRuntime', () => { + let mockManager: any; + let mockProcessPromise: Promise; + let processReject: (reason?: any) => void; + + beforeEach(() => { + mockManager = { + listRuntimes: jest.fn(), + onRuntimeEvent: jest.fn(), + }; + mockProcessPromise = new Promise((_, reject) => { + processReject = reject; + }); + }); + + it('should resolve immediately if runtime is already present', async () => { + mockManager.listRuntimes.mockReturnValue([{}]); + await expect( + waitForRuntime(mockManager, mockProcessPromise) + ).resolves.toBeUndefined(); + }); + + it('should wait for runtime event and resolve', async () => { + mockManager.listRuntimes.mockReturnValue([]); + let eventCallback: (event: RuntimeEvent, runtime: any) => void; + + mockManager.onRuntimeEvent.mockImplementation((cb: any) => { + eventCallback = cb; + return jest.fn(); // unsubscribe + }); + + const waitPromise = waitForRuntime(mockManager, mockProcessPromise); + + // Simulate event + setTimeout(() => { + eventCallback(RuntimeEvent.ADD, { id: 'any-id' }); + }, 10); + + await expect(waitPromise).resolves.toBeUndefined(); + }); + + it('should reject if process exits early', async () => { + mockManager.listRuntimes.mockReturnValue([]); + mockManager.onRuntimeEvent.mockReturnValue(jest.fn()); + + const waitPromise = waitForRuntime(mockManager, mockProcessPromise); + + // Simulate process exit + processReject(new Error('Process exited')); + + await expect(waitPromise).rejects.toThrow('Process exited'); + }); + + it('should timeout if runtime never appears', async () => { + jest.useFakeTimers(); + mockManager.listRuntimes.mockReturnValue([]); + mockManager.onRuntimeEvent.mockReturnValue(jest.fn()); + + const waitPromise = waitForRuntime(mockManager, mockProcessPromise); + + jest.advanceTimersByTime(30000); + + await expect(waitPromise).rejects.toThrow( + 'Timeout waiting for runtime to be ready' + ); + jest.useRealTimers(); + }); +}); diff --git a/genkit-tools/common/src/manager/manager.ts b/genkit-tools/common/src/manager/manager.ts index 7b20698644..1f28d303ad 100644 --- a/genkit-tools/common/src/manager/manager.ts +++ b/genkit-tools/common/src/manager/manager.ts @@ -167,13 +167,23 @@ export class RuntimeManager { * `runtime` to which it applies. * * @param listener the callback function. + * @returns an unsubscriber function. */ onRuntimeEvent( listener: (eventType: RuntimeEvent, runtime: RuntimeInfo) => void ) { - Object.values(RuntimeEvent).forEach((event) => - this.eventEmitter.on(event, (rt) => listener(event, rt)) - ); + const listeners: Array<{ event: string; fn: (rt: RuntimeInfo) => void }> = + []; + Object.values(RuntimeEvent).forEach((event) => { + const fn = (rt: RuntimeInfo) => listener(event, rt); + this.eventEmitter.on(event, fn); + listeners.push({ event, fn }); + }); + return () => { + listeners.forEach(({ event, fn }) => { + this.eventEmitter.off(event, fn); + }); + }; } /** @@ -567,6 +577,7 @@ export class RuntimeManager { try { const runtimesDir = await findRuntimesDir(this.projectRoot); await fs.mkdir(runtimesDir, { recursive: true }); + logger.debug(`Watching runtimes in ${runtimesDir}`); const watcher = chokidar.watch(runtimesDir, { persistent: true, ignoreInitial: false, diff --git a/genkit-tools/common/src/manager/process-manager.ts b/genkit-tools/common/src/manager/process-manager.ts index 3c4f905fb7..7b23e1b414 100644 --- a/genkit-tools/common/src/manager/process-manager.ts +++ b/genkit-tools/common/src/manager/process-manager.ts @@ -25,6 +25,7 @@ export interface AppProcessStatus { } export interface ProcessManagerStartOptions { + cwd?: string; nonInteractive?: boolean; } @@ -47,9 +48,11 @@ export class ProcessManager { * Starts the process. */ start(options?: ProcessManagerStartOptions): Promise { + logger.debug(`Starting process: ${this.command} ${this.args.join(' ')}`); return new Promise((resolve, reject) => { this._status = 'running'; this.appProcess = spawn(this.command, this.args, { + cwd: options?.cwd, env: { ...process.env, ...this.env, @@ -62,6 +65,13 @@ export class ProcessManager { this.appProcess.stderr?.pipe(process.stderr); this.appProcess.stdout?.pipe(process.stdout); process.stdin?.pipe(this.appProcess.stdin!); + } else { + this.appProcess.stderr?.on('data', (data) => { + logger.debug(`[ProcessManager Stderr] ${data.toString()}`); + }); + this.appProcess.stdout?.on('data', (data) => { + logger.debug(`[ProcessManager Stdout] ${data.toString()}`); + }); } this.appProcess.on('error', (error): void => { @@ -133,8 +143,8 @@ export class ProcessManager { this.originalStdIn = undefined; } if (this.appProcess) { - this.appProcess.stdout?.unpipe(process.stdout); - this.appProcess.stderr?.unpipe(process.stderr); + this.appProcess.stdout?.removeAllListeners(); + this.appProcess.stderr?.removeAllListeners(); } this.appProcess = undefined; this._status = 'stopped'; diff --git a/genkit-tools/common/src/utils/logger.ts b/genkit-tools/common/src/utils/logger.ts index 0e2a62e502..0d6a211223 100644 --- a/genkit-tools/common/src/utils/logger.ts +++ b/genkit-tools/common/src/utils/logger.ts @@ -29,6 +29,19 @@ export function forceStderr() { ); } +export function debugToFile() { + logger.add( + new winston.transports.File({ + filename: 'genkit-debug.log', + format: winston.format.combine( + winston.format.timestamp(), + winston.format.json() + ), + }) + ); + logger.level = 'debug'; +} + export const logger = winston.createLogger({ level: process.env.DEBUG ? 'debug' : 'info', format: winston.format.printf((log) => { diff --git a/genkit-tools/common/tests/manager_test.ts b/genkit-tools/common/tests/manager_test.ts new file mode 100644 index 0000000000..c70c38d0a1 --- /dev/null +++ b/genkit-tools/common/tests/manager_test.ts @@ -0,0 +1,47 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { describe, expect, it, jest } from '@jest/globals'; +import { RuntimeManager } from '../src/manager/manager'; +import { RuntimeEvent } from '../src/manager/types'; + +jest.mock('chokidar', () => ({ + watch: jest.fn().mockReturnValue({ + on: jest.fn(), + close: jest.fn(), + }), +})); + +describe('RuntimeManager', () => { + it('should allow unsubscribing from runtime events', async () => { + const manager = await RuntimeManager.create({ projectRoot: '.' }); + const listener = jest.fn(); + + // Subscribe + const unsubscribe = manager.onRuntimeEvent(listener); + + // Simulate event + (manager as any).eventEmitter.emit(RuntimeEvent.ADD, { id: '1' }); + expect(listener).toHaveBeenCalledTimes(1); + + // Unsubscribe + unsubscribe(); + + // Simulate event again + (manager as any).eventEmitter.emit(RuntimeEvent.ADD, { id: '2' }); + expect(listener).toHaveBeenCalledTimes(1); // Should not have increased + }); +}); diff --git a/js/core/src/reflection.ts b/js/core/src/reflection.ts index 9b9389ad9d..7572ec37fa 100644 --- a/js/core/src/reflection.ts +++ b/js/core/src/reflection.ts @@ -388,7 +388,13 @@ export class ReflectionServer { `Reflection server (${process.pid}) running on http://localhost:${this.port}` ); ReflectionServer.RUNNING_SERVERS.push(this); - await this.writeRuntimeFile(); + + try { + await this.registry.listActions(); + await this.writeRuntimeFile(); + } catch (e) { + logger.error(`Error initializing plugins: ${e}`); + } }); }