diff --git a/composer.json b/composer.json index 83a08f39..4afcfac6 100644 --- a/composer.json +++ b/composer.json @@ -26,6 +26,7 @@ "psr/clock": "^1.0", "psr/container": "^1.0 || ^2.0", "psr/event-dispatcher": "^1.0", + "psr/http-client": "^1.0", "psr/http-factory": "^1.1", "psr/http-message": "^1.1 || ^2.0", "psr/log": "^1.0 || ^2.0 || ^3.0", @@ -42,6 +43,7 @@ "psr/simple-cache": "^2.0 || ^3.0", "symfony/cache": "^5.4 || ^6.4 || ^7.3 || ^8.0", "symfony/console": "^5.4 || ^6.4 || ^7.3 || ^8.0", + "symfony/http-client": "^5.4 || ^6.4 || ^7.3 || ^8.0", "symfony/process": "^5.4 || ^6.4 || ^7.3 || ^8.0" }, "autoload": { @@ -73,4 +75,4 @@ }, "sort-packages": true } -} +} \ No newline at end of file diff --git a/examples/client/README.md b/examples/client/README.md new file mode 100644 index 00000000..3e3bc092 --- /dev/null +++ b/examples/client/README.md @@ -0,0 +1,27 @@ +# Client Examples + +These examples demonstrate how to use the MCP PHP Client SDK. + +## STDIO Client + +Connects to an MCP server running as a child process: + +```bash +php examples/client/stdio_discovery_calculator.php +``` + +## HTTP Client + +Connects to an MCP server over HTTP: + +```bash +# First, start an HTTP server +php -S localhost:8000 examples/server/discovery-calculator/server.php + +# Then run the client +php examples/client/http_discovery_calculator.php +``` + +## Requirements + +All examples require the server examples to be available. The STDIO examples spawn the server process, while the HTTP examples connect to a running HTTP server. diff --git a/examples/client/http_client_communication.php b/examples/client/http_client_communication.php new file mode 100644 index 00000000..0cc2b251 --- /dev/null +++ b/examples/client/http_client_communication.php @@ -0,0 +1,130 @@ +level->value}] {$n->data}\n"; +}); + +$samplingRequestHandler = new SamplingRequestHandler(function (CreateSamplingMessageRequest $request): CreateSamplingMessageResult { + echo "[SAMPLING] Server requested LLM sampling (max {$request->maxTokens} tokens)\n"; + + $mockResponse = 'Based on the incident analysis, I recommend: 1) Activate the on-call team, '. + '2) Isolate affected systems, 3) Begin root cause analysis, 4) Prepare stakeholder communication.'; + + return new CreateSamplingMessageResult( + role: Role::Assistant, + content: new TextContent($mockResponse), + model: 'mock-gpt-4', + stopReason: 'end_turn', + ); +}); + +$client = Client::builder() + ->setClientInfo('HTTP Client Communication Test', '1.0.0') + ->setInitTimeout(30) + ->setRequestTimeout(120) + ->setCapabilities(new ClientCapabilities(sampling: true)) + ->addNotificationHandler($loggingNotificationHandler) + ->addRequestHandler($samplingRequestHandler) + ->build(); + +$transport = new HttpTransport(endpoint: $endpoint); + +try { + echo "Connecting to MCP server at {$endpoint}...\n"; + $client->connect($transport); + + $serverInfo = $client->getServerInfo(); + echo 'Connected to: '.($serverInfo->name ?? 'unknown')."\n\n"; + + echo "Available tools:\n"; + $toolsResult = $client->listTools(); + foreach ($toolsResult->tools as $tool) { + echo " - {$tool->name}\n"; + } + echo "\n"; + + echo "Calling 'run_dataset_quality_checks'...\n\n"; + $result = $client->callTool( + name: 'run_dataset_quality_checks', + arguments: ['dataset' => 'sales_transactions_q4'], + onProgress: function (float $progress, ?float $total, ?string $message) { + $percent = $total > 0 ? round(($progress / $total) * 100) : '?'; + echo "[PROGRESS {$percent}%] {$message}\n"; + } + ); + + echo "\nResult:\n"; + foreach ($result->content as $content) { + if ($content instanceof TextContent) { + echo $content->text."\n"; + } + } + + echo "\nCalling 'coordinate_incident_response'...\n\n"; + $result = $client->callTool( + name: 'coordinate_incident_response', + arguments: ['incidentTitle' => 'Database connection pool exhausted'], + onProgress: function (float $progress, ?float $total, ?string $message) { + $percent = $total > 0 ? round(($progress / $total) * 100) : '?'; + echo "[PROGRESS {$percent}%] {$message}\n"; + } + ); + + echo "\nResult:\n"; + foreach ($result->content as $content) { + if ($content instanceof TextContent) { + echo $content->text."\n"; + } + } +} catch (Throwable $e) { + echo "Error: {$e->getMessage()}\n"; + echo $e->getTraceAsString()."\n"; +} finally { + $client->disconnect(); +} diff --git a/examples/client/http_discovery_calculator.php b/examples/client/http_discovery_calculator.php new file mode 100644 index 00000000..ae8babdd --- /dev/null +++ b/examples/client/http_discovery_calculator.php @@ -0,0 +1,77 @@ +setClientInfo('HTTP Example Client', '1.0.0') + ->setInitTimeout(30) + ->setRequestTimeout(60) + ->build(); + +$transport = new HttpTransport($endpoint); + +try { + echo "Connecting to MCP server at {$endpoint}...\n"; + $client->connect($transport); + + echo "Connected! Server info:\n"; + $serverInfo = $client->getServerInfo(); + echo ' Name: '.($serverInfo->name ?? 'unknown')."\n"; + echo ' Version: '.($serverInfo->version ?? 'unknown')."\n\n"; + + echo "Available tools:\n"; + $toolsResult = $client->listTools(); + foreach ($toolsResult->tools as $tool) { + echo " - {$tool->name}: {$tool->description}\n"; + } + echo "\n"; + + echo "Available resources:\n"; + $resourcesResult = $client->listResources(); + foreach ($resourcesResult->resources as $resource) { + echo " - {$resource->uri}: {$resource->name}\n"; + } + echo "\n"; + + echo "Available prompts:\n"; + $promptsResult = $client->listPrompts(); + foreach ($promptsResult->prompts as $prompt) { + echo " - {$prompt->name}: {$prompt->description}\n"; + } + echo "\n"; +} catch (Throwable $e) { + echo "Error: {$e->getMessage()}\n"; + echo $e->getTraceAsString()."\n"; +} finally { + echo "Disconnecting...\n"; + $client->disconnect(); + echo "Done.\n"; +} diff --git a/examples/client/stdio_client_communication.php b/examples/client/stdio_client_communication.php new file mode 100644 index 00000000..fdf8de82 --- /dev/null +++ b/examples/client/stdio_client_communication.php @@ -0,0 +1,122 @@ +level->value}] {$n->data}\n"; +}); + +$samplingRequestHandler = new SamplingRequestHandler(function (CreateSamplingMessageRequest $request): CreateSamplingMessageResult { + echo "[SAMPLING] Server requested LLM sampling (max {$request->maxTokens} tokens)\n"; + + $mockResponse = 'Based on the incident analysis, I recommend: 1) Activate the on-call team, '. + '2) Isolate affected systems, 3) Begin root cause analysis, 4) Prepare stakeholder communication.'; + + return new CreateSamplingMessageResult( + role: Role::Assistant, + content: new TextContent($mockResponse), + model: 'mock-gpt-4', + stopReason: 'end_turn', + ); +}); + +$client = Client::builder() + ->setClientInfo('STDIO Client Communication Test', '1.0.0') + ->setInitTimeout(30) + ->setRequestTimeout(120) + ->setCapabilities(new ClientCapabilities(sampling: true)) + ->addNotificationHandler($loggingNotificationHandler) + ->addRequestHandler($samplingRequestHandler) + ->build(); + +$transport = new StdioTransport( + command: 'php', + args: [__DIR__.'/../server/client-communication/server.php'], +); + +try { + echo "Connecting to MCP server...\n"; + $client->connect($transport); + + $serverInfo = $client->getServerInfo(); + echo 'Connected to: '.($serverInfo->name ?? 'unknown')."\n\n"; + + echo "Available tools:\n"; + $toolsResult = $client->listTools(); + foreach ($toolsResult->tools as $tool) { + echo " - {$tool->name}\n"; + } + echo "\n"; + + echo "Calling 'run_dataset_quality_checks'...\n\n"; + $result = $client->callTool( + name: 'run_dataset_quality_checks', + arguments: ['dataset' => 'customer_orders_2024'], + onProgress: function (float $progress, ?float $total, ?string $message) { + $percent = $total > 0 ? round(($progress / $total) * 100) : '?'; + echo "[PROGRESS {$percent}%] {$message}\n"; + } + ); + + echo "\nResult:\n"; + foreach ($result->content as $content) { + if ($content instanceof TextContent) { + echo $content->text."\n"; + } + } + + echo "\nCalling 'coordinate_incident_response'...\n\n"; + $result = $client->callTool( + name: 'coordinate_incident_response', + arguments: ['incidentTitle' => 'Database connection pool exhausted'], + onProgress: function (float $progress, ?float $total, ?string $message) { + $percent = $total > 0 ? round(($progress / $total) * 100) : '?'; + echo "[PROGRESS {$percent}%] {$message}\n"; + } + ); + + echo "\nResult:\n"; + foreach ($result->content as $content) { + if ($content instanceof TextContent) { + echo $content->text."\n"; + } + } +} catch (Throwable $e) { + echo "Error: {$e->getMessage()}\n"; + echo $e->getTraceAsString()."\n"; +} finally { + $client->disconnect(); +} diff --git a/examples/client/stdio_discovery_calculator.php b/examples/client/stdio_discovery_calculator.php new file mode 100644 index 00000000..7d25a807 --- /dev/null +++ b/examples/client/stdio_discovery_calculator.php @@ -0,0 +1,89 @@ +setClientInfo('STDIO Example Client', '1.0.0') + ->setInitTimeout(30) + ->setRequestTimeout(60) + ->build(); + +$transport = new StdioTransport( + command: 'php', + args: [__DIR__.'/../server/discovery-calculator/server.php'], +); + +try { + echo "Connecting to MCP server...\n"; + $client->connect($transport); + + echo "Connected! Server info:\n"; + $serverInfo = $client->getServerInfo(); + echo ' Name: '.($serverInfo->name ?? 'unknown')."\n"; + echo ' Version: '.($serverInfo->version ?? 'unknown')."\n\n"; + + echo "Available tools:\n"; + $toolsResult = $client->listTools(); + foreach ($toolsResult->tools as $tool) { + echo " - {$tool->name}: {$tool->description}\n"; + } + echo "\n"; + + echo "Calling 'calculate' tool with a=5, b=3, operation='add'...\n"; + $result = $client->callTool('calculate', ['a' => 5, 'b' => 3, 'operation' => 'add']); + echo 'Result: '; + foreach ($result->content as $content) { + if ($content instanceof TextContent) { + echo $content->text; + } + } + echo "\n\n"; + + echo "Available resources:\n"; + $resourcesResult = $client->listResources(); + foreach ($resourcesResult->resources as $resource) { + echo " - {$resource->uri}: {$resource->name}\n"; + } + echo "\n"; + + echo "Reading resource 'config://calculator/settings'...\n"; + $resourceContent = $client->readResource('config://calculator/settings'); + foreach ($resourceContent->contents as $content) { + if ($content instanceof TextResourceContents) { + echo ' Content: '.$content->text."\n"; + echo ' Mimetype: '.$content->mimeType."\n"; + } + } +} catch (Throwable $e) { + echo "Error: {$e->getMessage()}\n"; + echo $e->getTraceAsString()."\n"; +} finally { + echo "Disconnecting...\n"; + $client->disconnect(); + echo "Done.\n"; +} diff --git a/examples/server/README.md b/examples/server/README.md index 27874d71..a9326395 100644 --- a/examples/server/README.md +++ b/examples/server/README.md @@ -8,10 +8,10 @@ The bootstrapping of the example will choose the used transport based on the SAP For running an example, you execute the `server.php` like this: ```bash # For using the STDIO transport: -php examples/discovery-calculator/server.php +php examples/server/discovery-calculator/server.php # For using the Streamable HTTP transport: -php -S localhost:8000 examples/discovery-userprofile/server.php +php -S localhost:8000 examples/server/discovery-userprofile/server.php ``` You will see debug outputs to help you understand what is happening. @@ -19,7 +19,7 @@ You will see debug outputs to help you understand what is happening. Run with Inspector: ```bash -npx @modelcontextprotocol/inspector php examples/discovery-calculator/server.php +npx @modelcontextprotocol/inspector php examples/server/discovery-calculator/server.php ``` ## Debugging @@ -30,5 +30,5 @@ directory. With the Inspector you can set the environment variables like this: ```bash -npx @modelcontextprotocol/inspector -e DEBUG=1 -e FILE_LOG=1 php examples/discovery-calculator/server.php +npx @modelcontextprotocol/inspector -e DEBUG=1 -e FILE_LOG=1 php examples/server/discovery-calculator/server.php ``` diff --git a/examples/server/bootstrap.php b/examples/server/bootstrap.php index a9110317..28186335 100644 --- a/examples/server/bootstrap.php +++ b/examples/server/bootstrap.php @@ -55,7 +55,7 @@ function shutdown(ResponseInterface|int $result): never function logger(): LoggerInterface { return new class extends AbstractLogger { - public function log($level, Stringable|string $message, array $context = []): void + public function log($level, string|Stringable $message, array $context = []): void { $debug = $_SERVER['DEBUG'] ?? false; diff --git a/examples/server/client-communication/server.php b/examples/server/client-communication/server.php index 42e5058b..f5e76fc2 100644 --- a/examples/server/client-communication/server.php +++ b/examples/server/client-communication/server.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php + */ +class Client +{ + private ?TransportInterface $transport = null; + + public function __construct( + private readonly Protocol $protocol, + private readonly Configuration $config, + private readonly LoggerInterface $logger = new NullLogger(), + ) { + } + + /** + * Create a new client builder for fluent configuration. + */ + public static function builder(): Builder + { + return new Builder(); + } + + /** + * Connect to an MCP server using the provided transport. + * + * @throws ConnectionException If connection or initialization fails + */ + public function connect(TransportInterface $transport): void + { + $this->transport = $transport; + $this->protocol->connect($transport, $this->config); + + $transport->connectAndInitialize(); + + $this->logger->info('Client connected and initialized'); + } + + /** + * Check if connected and initialized. + */ + public function isConnected(): bool + { + return null !== $this->transport && $this->protocol->getState()->isInitialized(); + } + + /** + * Get server information from initialization. + */ + public function getServerInfo(): ?Implementation + { + return $this->protocol->getState()->getServerInfo(); + } + + /** + * Get server instructions. + */ + public function getInstructions(): ?string + { + return $this->protocol->getState()->getInstructions(); + } + + /** + * Send a ping request to the server. + */ + public function ping(): void + { + $request = new PingRequest(); + + $this->sendRequest($request); + } + + /** + * List available tools from the server. + */ + public function listTools(?string $cursor = null): ListToolsResult + { + $request = new ListToolsRequest($cursor); + + $response = $this->sendRequest($request); + + return ListToolsResult::fromArray($response->result); + } + + /** + * Call a tool on the server. + * + * @param string $name Tool name + * @param array $arguments Tool arguments + * @param (callable(float $progress, ?float $total, ?string $message): void)|null $onProgress + * Optional callback for progress updates + */ + public function callTool(string $name, array $arguments = [], ?callable $onProgress = null): CallToolResult + { + $request = new CallToolRequest($name, $arguments); + + $response = $this->sendRequest($request, $onProgress); + + return CallToolResult::fromArray($response->result); + } + + /** + * List available resources from the server. + */ + public function listResources(?string $cursor = null): ListResourcesResult + { + $request = new ListResourcesRequest($cursor); + + $response = $this->sendRequest($request); + + return ListResourcesResult::fromArray($response->result); + } + + /** + * List available resource templates from the server. + */ + public function listResourceTemplates(?string $cursor = null): ListResourceTemplatesResult + { + $request = new ListResourceTemplatesRequest($cursor); + + $response = $this->sendRequest($request); + + return ListResourceTemplatesResult::fromArray($response->result); + } + + /** + * Read a resource by URI. + * + * @param string $uri The resource URI + * @param (callable(float $progress, ?float $total, ?string $message): void)|null $onProgress + * Optional callback for progress updates + */ + public function readResource(string $uri, ?callable $onProgress = null): ReadResourceResult + { + $request = new ReadResourceRequest($uri); + + $response = $this->sendRequest($request, $onProgress); + + return ReadResourceResult::fromArray($response->result); + } + + /** + * List available prompts from the server. + */ + public function listPrompts(?string $cursor = null): ListPromptsResult + { + $request = new ListPromptsRequest($cursor); + + $response = $this->sendRequest($request); + + return ListPromptsResult::fromArray($response->result); + } + + /** + * Get a prompt from the server. + * + * @param string $name Prompt name + * @param array $arguments Prompt arguments + * @param (callable(float $progress, ?float $total, ?string $message): void)|null $onProgress + * Optional callback for progress updates + */ + public function getPrompt(string $name, array $arguments = [], ?callable $onProgress = null): GetPromptResult + { + $request = new GetPromptRequest($name, $arguments); + + $response = $this->sendRequest($request, $onProgress); + + return GetPromptResult::fromArray($response->result); + } + + /** + * Request completion suggestions for a prompt or resource argument. + * + * @param PromptReference|ResourceReference $ref The prompt or resource reference + * @param array{name: string, value: string} $argument The argument to complete + */ + public function complete(PromptReference|ResourceReference $ref, array $argument): CompletionCompleteResult + { + $request = new CompletionCompleteRequest($ref, $argument); + + $response = $this->sendRequest($request); + + return CompletionCompleteResult::fromArray($response->result); + } + + /** + * Set the minimum logging level for server log messages. + */ + public function setLoggingLevel(LoggingLevel $level): void + { + $request = new SetLogLevelRequest($level); + + $this->sendRequest($request); + } + + /** + * Send a request to the server and wait for response. + * + * @param (callable(float $progress, ?float $total, ?string $message): void)|null $onProgress + * + * @return Response + * + * @throws RequestException|ConnectionException + */ + private function sendRequest(Request $request, ?callable $onProgress = null): Response + { + if (!$this->isConnected()) { + throw new ConnectionException('Client is not connected. Call connect() first.'); + } + + $withProgress = null !== $onProgress; + $fiber = new \Fiber(fn () => $this->protocol->request($request, $this->config->requestTimeout, $withProgress)); + $response = $this->transport->runRequest($fiber, $onProgress); + + if ($response instanceof Error) { + throw RequestException::fromError($response); + } + + return $response; + } + + /** + * Disconnect from the server. + */ + public function disconnect(): void + { + if (null !== $this->transport) { + $this->transport->close(); + $this->transport = null; + $this->logger->info('Client disconnected'); + } + } +} diff --git a/src/Client/Builder.php b/src/Client/Builder.php new file mode 100644 index 00000000..120ae19c --- /dev/null +++ b/src/Client/Builder.php @@ -0,0 +1,170 @@ + + */ +class Builder +{ + private string $name = 'mcp-php-client'; + private string $version = '1.0.0'; + private ?string $description = null; + private ?ProtocolVersion $protocolVersion = null; + private ?ClientCapabilities $capabilities = null; + private int $initTimeout = 30; + private int $requestTimeout = 120; + private int $maxRetries = 3; + private ?LoggerInterface $logger = null; + + /** @var NotificationHandlerInterface[] */ + private array $notificationHandlers = []; + + /** @var RequestHandlerInterface[] */ + private array $requestHandlers = []; + + /** + * Set the client name and version. + */ + public function setClientInfo(string $name, string $version, ?string $description = null): self + { + $this->name = $name; + $this->version = $version; + $this->description = $description; + + return $this; + } + + /** + * Set the protocol version to use. + */ + public function setProtocolVersion(ProtocolVersion $protocolVersion): self + { + $this->protocolVersion = $protocolVersion; + + return $this; + } + + /** + * Set client capabilities. + */ + public function setCapabilities(ClientCapabilities $capabilities): self + { + $this->capabilities = $capabilities; + + return $this; + } + + /** + * Set initialization timeout in seconds. + */ + public function setInitTimeout(int $seconds): self + { + $this->initTimeout = $seconds; + + return $this; + } + + /** + * Set request timeout in seconds. + */ + public function setRequestTimeout(int $seconds): self + { + $this->requestTimeout = $seconds; + + return $this; + } + + /** + * Set maximum retry attempts for failed connections. + */ + public function setMaxRetries(int $retries): self + { + $this->maxRetries = $retries; + + return $this; + } + + /** + * Set the logger. + */ + public function setLogger(LoggerInterface $logger): self + { + $this->logger = $logger; + + return $this; + } + + /** + * Add a notification handler for server notifications. + */ + public function addNotificationHandler(NotificationHandlerInterface $handler): self + { + $this->notificationHandlers[] = $handler; + + return $this; + } + + /** + * Add a request handler for server requests (e.g., sampling). + * + * @param RequestHandlerInterface $handler + */ + public function addRequestHandler(RequestHandlerInterface $handler): self + { + $this->requestHandlers[] = $handler; + + return $this; + } + + /** + * Build the client instance. + */ + public function build(): Client + { + $logger = $this->logger ?? new NullLogger(); + + $clientInfo = new Implementation( + $this->name, + $this->version, + $this->description, + ); + + $config = new Configuration( + clientInfo: $clientInfo, + capabilities: $this->capabilities ?? new ClientCapabilities(), + protocolVersion: $this->protocolVersion ?? ProtocolVersion::V2025_06_18, + initTimeout: $this->initTimeout, + requestTimeout: $this->requestTimeout, + maxRetries: $this->maxRetries, + ); + + $protocol = new Protocol( + requestHandlers: $this->requestHandlers, + notificationHandlers: $this->notificationHandlers, + logger: $logger, + ); + + return new Client($protocol, $config, $logger); + } +} diff --git a/src/Client/Configuration.php b/src/Client/Configuration.php new file mode 100644 index 00000000..4dce6349 --- /dev/null +++ b/src/Client/Configuration.php @@ -0,0 +1,34 @@ + + */ +class Configuration +{ + public function __construct( + public readonly Implementation $clientInfo, + public readonly ClientCapabilities $capabilities, + public readonly ProtocolVersion $protocolVersion = ProtocolVersion::V2025_06_18, + public readonly int $initTimeout = 30, + public readonly int $requestTimeout = 120, + public readonly int $maxRetries = 3, + ) { + } +} diff --git a/src/Client/Handler/Notification/LoggingNotificationHandler.php b/src/Client/Handler/Notification/LoggingNotificationHandler.php new file mode 100644 index 00000000..c160ccd0 --- /dev/null +++ b/src/Client/Handler/Notification/LoggingNotificationHandler.php @@ -0,0 +1,43 @@ + + */ +class LoggingNotificationHandler implements NotificationHandlerInterface +{ + /** + * @param callable(LoggingMessageNotification): void $callback + */ + public function __construct( + private readonly mixed $callback, + ) { + } + + public function supports(Notification $notification): bool + { + return $notification instanceof LoggingMessageNotification; + } + + public function handle(Notification $notification): void + { + \assert($notification instanceof LoggingMessageNotification); + + ($this->callback)($notification); + } +} diff --git a/src/Client/Handler/Notification/NotificationHandlerInterface.php b/src/Client/Handler/Notification/NotificationHandlerInterface.php new file mode 100644 index 00000000..82092aa2 --- /dev/null +++ b/src/Client/Handler/Notification/NotificationHandlerInterface.php @@ -0,0 +1,32 @@ + + */ +interface NotificationHandlerInterface +{ + /** + * Check if this handler supports the given notification. + */ + public function supports(Notification $notification): bool; + + /** + * Handle the notification. + */ + public function handle(Notification $notification): void; +} diff --git a/src/Client/Handler/Notification/ProgressNotificationHandler.php b/src/Client/Handler/Notification/ProgressNotificationHandler.php new file mode 100644 index 00000000..3c489bf0 --- /dev/null +++ b/src/Client/Handler/Notification/ProgressNotificationHandler.php @@ -0,0 +1,52 @@ + + * + * @internal + */ +class ProgressNotificationHandler implements NotificationHandlerInterface +{ + public function __construct( + private readonly ClientStateInterface $state, + ) { + } + + public function supports(Notification $notification): bool + { + return $notification instanceof ProgressNotification; + } + + public function handle(Notification $notification): void + { + if (!$notification instanceof ProgressNotification) { + return; + } + + $this->state->storeProgress( + (string) $notification->progressToken, + $notification->progress, + $notification->total, + $notification->message, + ); + } +} diff --git a/src/Client/Handler/Request/RequestHandlerInterface.php b/src/Client/Handler/Request/RequestHandlerInterface.php new file mode 100644 index 00000000..1c050181 --- /dev/null +++ b/src/Client/Handler/Request/RequestHandlerInterface.php @@ -0,0 +1,38 @@ + + */ +interface RequestHandlerInterface +{ + /** + * Check if this handler supports the given request. + */ + public function supports(Request $request): bool; + + /** + * Handle the request and return a response or error. + * + * @return Response|Error + */ + public function handle(Request $request): Response|Error; +} diff --git a/src/Client/Handler/Request/SamplingRequestHandler.php b/src/Client/Handler/Request/SamplingRequestHandler.php new file mode 100644 index 00000000..750f7f10 --- /dev/null +++ b/src/Client/Handler/Request/SamplingRequestHandler.php @@ -0,0 +1,73 @@ + + * + * @author Kyrian Obikwelu + */ +class SamplingRequestHandler implements RequestHandlerInterface +{ + private readonly LoggerInterface $logger; + + /** + * @param callable(CreateSamplingMessageRequest): CreateSamplingMessageResult $callback + */ + public function __construct( + private readonly mixed $callback, + ?LoggerInterface $logger = null, + ) { + $this->logger = $logger ?? new NullLogger(); + } + + public function supports(Request $request): bool + { + return $request instanceof CreateSamplingMessageRequest; + } + + /** + * @return Response|Error + */ + public function handle(Request $request): Response|Error + { + \assert($request instanceof CreateSamplingMessageRequest); + + try { + $result = ($this->callback)($request); + + return new Response($request->getId(), $result); + } catch (SamplingException $e) { + $this->logger->error('Sampling failed: '.$e->getMessage()); + + return Error::forInternalError($e->getMessage(), $request->getId()); + } catch (\Throwable $e) { + $this->logger->error('Unexpected error during sampling', ['exception' => $e]); + + return Error::forInternalError('Error while sampling LLM', $request->getId()); + } + } +} diff --git a/src/Client/Protocol.php b/src/Client/Protocol.php new file mode 100644 index 00000000..75648456 --- /dev/null +++ b/src/Client/Protocol.php @@ -0,0 +1,318 @@ + + */ +class Protocol +{ + private ?TransportInterface $transport = null; + private ClientStateInterface $state; + private MessageFactory $messageFactory; + private LoggerInterface $logger; + + /** @var NotificationHandlerInterface[] */ + private array $notificationHandlers; + + /** + * @param RequestHandlerInterface[] $requestHandlers + * @param NotificationHandlerInterface[] $notificationHandlers + */ + public function __construct( + private readonly array $requestHandlers = [], + array $notificationHandlers = [], + ?MessageFactory $messageFactory = null, + ?LoggerInterface $logger = null, + ) { + $this->state = new ClientState(); + $this->messageFactory = $messageFactory ?? MessageFactory::make(); + $this->logger = $logger ?? new NullLogger(); + + $this->notificationHandlers = [ + new ProgressNotificationHandler($this->state), + ...$notificationHandlers, + ]; + } + + /** + * Connect this protocol to a transport. + * + * Sets up message handling callbacks. + * + * @param TransportInterface $transport The transport to connect + * @param Configuration $config The client configuration for initialization + */ + public function connect(TransportInterface $transport, Configuration $config): void + { + $this->transport = $transport; + $transport->setState($this->state); + $transport->onInitialize(fn () => $this->initialize($config)); + $transport->onMessage($this->processMessage(...)); + $transport->onError(fn (\Throwable $e) => $this->logger->error('Transport error', ['exception' => $e])); + + $this->logger->info('Protocol connected to transport', ['transport' => $transport::class]); + } + + /** + * Perform the MCP initialization handshake. + * + * Sends InitializeRequest and waits for response, then sends InitializedNotification. + * + * @param Configuration $config The client configuration + * + * @return Response>|Error + */ + public function initialize(Configuration $config): Response|Error + { + $request = new InitializeRequest( + $config->protocolVersion->value, + $config->capabilities, + $config->clientInfo, + ); + + $response = $this->request($request, $config->initTimeout); + + if ($response instanceof Response) { + $initResult = InitializeResult::fromArray($response->result); + $this->state->setServerInfo($initResult->serverInfo); + $this->state->setInstructions($initResult->instructions); + $this->state->setInitialized(true); + + $this->sendNotification(new InitializedNotification()); + + $this->logger->info('Initialization complete', [ + 'server' => $initResult->serverInfo->name, + ]); + } + + return $response; + } + + /** + * Send a request to the server and wait for response. + * + * If a response is immediately available (sync HTTP), returns it. + * Otherwise, suspends the Fiber and waits for the transport to resume it. + * + * @param Request $request The request to send + * @param int $timeout The timeout in seconds + * @param bool $withProgress Whether to attach a progress token to the request + * + * @return Response>|Error + */ + public function request(Request $request, int $timeout, bool $withProgress = false): Response|Error + { + $requestId = $this->state->nextRequestId(); + $request = $request->withId($requestId); + + if ($withProgress) { + $progressToken = "prog-{$requestId}"; + $request = $request->withMeta(['progressToken' => $progressToken]); + } + + $this->state->addPendingRequest($requestId, $timeout); + $this->sendRequest($request); + + $immediate = $this->state->consumeResponse($requestId); + if (null !== $immediate) { + $this->logger->debug('Received immediate response', ['id' => $requestId]); + + return $immediate; + } + + $this->logger->debug('Suspending fiber for response', ['id' => $requestId]); + + return \Fiber::suspend([ + 'type' => 'await_response', + 'request_id' => $requestId, + 'timeout' => $timeout, + ]); + } + + /** + * Send a request to the server. + */ + private function sendRequest(Request $request): void + { + $this->logger->debug('Sending request', [ + 'id' => $request->getId(), + 'method' => $request::getMethod(), + ]); + + $encoded = json_encode($request, \JSON_THROW_ON_ERROR); + $this->transport?->send($encoded); + } + + /** + * Send a notification to the server (fire and forget). + */ + public function sendNotification(Notification $notification): void + { + $this->logger->debug('Sending notification', ['method' => $notification::getMethod()]); + + $encoded = json_encode($notification, \JSON_THROW_ON_ERROR); + $this->transport?->send($encoded); + } + + /** + * Send a response back to the server (for server-initiated requests). + * + * @param Response|Error $response + */ + private function sendResponse(Response|Error $response): void + { + $this->logger->debug('Sending response', ['id' => $response->getId()]); + + $encoded = json_encode($response, \JSON_THROW_ON_ERROR); + $this->transport?->send($encoded); + } + + /** + * Process an incoming message from the server. + * + * Routes to appropriate handler based on message type. + */ + public function processMessage(string $input): void + { + $this->logger->debug('Received message', ['input' => $input]); + + try { + $messages = $this->messageFactory->create($input); + } catch (\JsonException $e) { + $this->logger->warning('Failed to parse message', ['exception' => $e]); + + return; + } + + foreach ($messages as $message) { + if ($message instanceof Response || $message instanceof Error) { + $this->handleResponse($message); + } elseif ($message instanceof Request) { + $this->handleRequest($message); + } elseif ($message instanceof Notification) { + $this->handleNotification($message); + } + } + } + + /** + * Handle a response from the server. + * + * This stores it in session. The transport will pick it up and resume the Fiber. + * + * @param Response|Error $response + */ + private function handleResponse(Response|Error $response): void + { + $requestId = $response->getId(); + + $this->logger->debug('Handling response', ['id' => $requestId]); + + $this->state->storeResponse($requestId, $response->jsonSerialize()); + } + + /** + * Handle a request from the server (e.g., sampling request). + */ + private function handleRequest(Request $request): void + { + $method = $request::getMethod(); + + $this->logger->debug('Received server request', [ + 'method' => $method, + 'id' => $request->getId(), + ]); + + foreach ($this->requestHandlers as $handler) { + if ($handler->supports($request)) { + try { + $response = $handler->handle($request); + } catch (\Throwable $e) { + $this->logger->error('Unexpected error while handling request', [ + 'method' => $method, + 'exception' => $e, + ]); + + $response = Error::forInternalError( + \sprintf('Unexpected error while handling "%s" request', $method), + $request->getId() + ); + } + + $this->sendResponse($response); + + return; + } + } + + $error = Error::forMethodNotFound( + \sprintf('Client does not handle "%s" requests.', $method), + $request->getId() + ); + + $this->sendResponse($error); + } + + /** + * Handle a notification from the server. + */ + private function handleNotification(Notification $notification): void + { + $method = $notification::getMethod(); + + $this->logger->debug('Received server notification', [ + 'method' => $method, + ]); + + foreach ($this->notificationHandlers as $handler) { + if ($handler->supports($notification)) { + try { + $handler->handle($notification); + } catch (\Throwable $e) { + $this->logger->warning('Notification handler failed', ['exception' => $e]); + } + + return; + } + } + } + + public function getState(): ClientStateInterface + { + return $this->state; + } +} diff --git a/src/Client/State/ClientState.php b/src/Client/State/ClientState.php new file mode 100644 index 00000000..241a4450 --- /dev/null +++ b/src/Client/State/ClientState.php @@ -0,0 +1,136 @@ + + */ +class ClientState implements ClientStateInterface +{ + private int $requestIdCounter = 1; + private bool $initialized = false; + private ?Implementation $serverInfo = null; + private ?string $instructions = null; + + /** @var array */ + private array $pendingRequests = []; + + /** @var array> */ + private array $responses = []; + + /** @var array */ + private array $progressUpdates = []; + + public function nextRequestId(): int + { + return $this->requestIdCounter++; + } + + public function addPendingRequest(int $requestId, int $timeout): void + { + $this->pendingRequests[$requestId] = [ + 'request_id' => $requestId, + 'timestamp' => time(), + 'timeout' => $timeout, + ]; + } + + public function removePendingRequest(int $requestId): void + { + unset($this->pendingRequests[$requestId]); + } + + public function getPendingRequests(): array + { + return $this->pendingRequests; + } + + public function storeResponse(int $requestId, array $responseData): void + { + $this->responses[$requestId] = $responseData; + } + + public function consumeResponse(int $requestId): Response|Error|null + { + if (!isset($this->responses[$requestId])) { + return null; + } + + $data = $this->responses[$requestId]; + unset($this->responses[$requestId]); + $this->removePendingRequest($requestId); + + if (isset($data['error'])) { + return Error::fromArray($data); + } + + return Response::fromArray($data); + } + + public function setInitialized(bool $initialized): void + { + $this->initialized = $initialized; + } + + public function isInitialized(): bool + { + return $this->initialized; + } + + public function setServerInfo(Implementation $serverInfo): void + { + $this->serverInfo = $serverInfo; + } + + public function getServerInfo(): ?Implementation + { + return $this->serverInfo; + } + + public function setInstructions(?string $instructions): void + { + $this->instructions = $instructions; + } + + public function getInstructions(): ?string + { + return $this->instructions; + } + + public function storeProgress(string $token, float $progress, ?float $total, ?string $message): void + { + $this->progressUpdates[] = [ + 'token' => $token, + 'progress' => $progress, + 'total' => $total, + 'message' => $message, + ]; + } + + public function consumeProgressUpdates(): array + { + $updates = $this->progressUpdates; + $this->progressUpdates = []; + + return $updates; + } +} diff --git a/src/Client/State/ClientStateInterface.php b/src/Client/State/ClientStateInterface.php new file mode 100644 index 00000000..af697d4e --- /dev/null +++ b/src/Client/State/ClientStateInterface.php @@ -0,0 +1,115 @@ + + */ +interface ClientStateInterface +{ + /** + * Get the next request ID for outgoing requests. + */ + public function nextRequestId(): int; + + /** + * Add a pending request to track. + * + * @param int $requestId The request ID + * @param int $timeout Timeout in seconds + */ + public function addPendingRequest(int $requestId, int $timeout): void; + + /** + * Remove a pending request. + */ + public function removePendingRequest(int $requestId): void; + + /** + * Get all pending requests. + * + * @return array + */ + public function getPendingRequests(): array; + + /** + * Store a received response. + * + * @param int $requestId The request ID + * @param array $responseData The raw response data + */ + public function storeResponse(int $requestId, array $responseData): void; + + /** + * Check and consume a response for a request ID. + * + * @return Response>|Error|null + */ + public function consumeResponse(int $requestId): Response|Error|null; + + /** + * Set initialization state. + */ + public function setInitialized(bool $initialized): void; + + /** + * Check if connection is initialized. + */ + public function isInitialized(): bool; + + /** + * Store the server info from initialization. + */ + public function setServerInfo(Implementation $serverInfo): void; + + /** + * Get the server info from initialization. + */ + public function getServerInfo(): ?Implementation; + + /** + * Store the server instructions from initialization. + */ + public function setInstructions(?string $instructions): void; + + /** + * Get the server instructions from initialization. + */ + public function getInstructions(): ?string; + + /** + * Store progress data received from a notification. + * + * @param string $token The progress token + * @param float $progress Current progress value + * @param float|null $total Total progress value (if known) + * @param string|null $message Progress message + */ + public function storeProgress(string $token, float $progress, ?float $total, ?string $message): void; + + /** + * Consume all pending progress updates. + * + * @return array + */ + public function consumeProgressUpdates(): array; +} diff --git a/src/Client/Transport/BaseTransport.php b/src/Client/Transport/BaseTransport.php new file mode 100644 index 00000000..fd70803b --- /dev/null +++ b/src/Client/Transport/BaseTransport.php @@ -0,0 +1,125 @@ + + */ +abstract class BaseTransport implements TransportInterface +{ + /** @var callable(): mixed|null */ + protected $initializeCallback; + + /** @var callable(string): void|null */ + protected $messageCallback; + + /** @var callable(\Throwable): void|null */ + protected $errorCallback; + + /** @var callable(string): void|null */ + protected $closeCallback; + + protected ?ClientStateInterface $state = null; + protected LoggerInterface $logger; + + public function __construct(?LoggerInterface $logger = null) + { + $this->logger = $logger ?? new NullLogger(); + } + + public function onInitialize(callable $listener): void + { + $this->initializeCallback = $listener; + } + + public function onMessage(callable $listener): void + { + $this->messageCallback = $listener; + } + + public function onError(callable $listener): void + { + $this->errorCallback = $listener; + } + + public function onClose(callable $listener): void + { + $this->closeCallback = $listener; + } + + public function setState(ClientStateInterface $state): void + { + $this->state = $state; + } + + /** + * Perform initialization via the registered callback. + * + * @return mixed The result from the initialization callback + * + * @throws \RuntimeException If no initialize listener is registered + */ + protected function handleInitialize(): mixed + { + if (!\is_callable($this->initializeCallback)) { + throw new \RuntimeException('No initialize listener registered'); + } + + return ($this->initializeCallback)(); + } + + /** + * Handle an incoming message from the server. + */ + protected function handleMessage(string $message): void + { + if (\is_callable($this->messageCallback)) { + try { + ($this->messageCallback)($message); + } catch (\Throwable $e) { + $this->handleError($e); + } + } + } + + /** + * Handle a transport error. + */ + protected function handleError(\Throwable $error): void + { + $this->logger->error('Transport error', ['exception' => $error]); + + if (\is_callable($this->errorCallback)) { + ($this->errorCallback)($error); + } + } + + /** + * Handle connection close. + */ + protected function handleClose(string $reason): void + { + $this->logger->info('Transport closed', ['reason' => $reason]); + + if (\is_callable($this->closeCallback)) { + ($this->closeCallback)($reason); + } + } +} diff --git a/src/Client/Transport/HttpTransport.php b/src/Client/Transport/HttpTransport.php new file mode 100644 index 00000000..51083af4 --- /dev/null +++ b/src/Client/Transport/HttpTransport.php @@ -0,0 +1,298 @@ + + */ +class HttpTransport extends BaseTransport +{ + private ClientInterface $httpClient; + private RequestFactoryInterface $requestFactory; + private StreamFactoryInterface $streamFactory; + + private ?string $sessionId = null; + + /** @var McpFiber|null */ + private ?\Fiber $activeFiber = null; + + /** @var (callable(float, ?float, ?string): void)|null */ + private $activeProgressCallback; + + /** @var StreamInterface|null Active SSE stream being read */ + private ?StreamInterface $activeStream = null; + + /** @var string Buffer for incomplete SSE data */ + private string $sseBuffer = ''; + + /** + * @param string $endpoint The MCP server endpoint URL + * @param array $headers Additional headers to send + * @param ClientInterface|null $httpClient PSR-18 HTTP client (auto-discovered if null) + * @param RequestFactoryInterface|null $requestFactory PSR-17 request factory (auto-discovered if null) + * @param StreamFactoryInterface|null $streamFactory PSR-17 stream factory (auto-discovered if null) + */ + public function __construct( + private readonly string $endpoint, + private readonly array $headers = [], + ?ClientInterface $httpClient = null, + ?RequestFactoryInterface $requestFactory = null, + ?StreamFactoryInterface $streamFactory = null, + ?LoggerInterface $logger = null, + ) { + parent::__construct($logger); + + $this->httpClient = $httpClient ?? Psr18ClientDiscovery::find(); + $this->requestFactory = $requestFactory ?? Psr17FactoryDiscovery::findRequestFactory(); + $this->streamFactory = $streamFactory ?? Psr17FactoryDiscovery::findStreamFactory(); + } + + public function connectAndInitialize(): void + { + $this->activeFiber = new \Fiber(fn () => $this->handleInitialize()); + + $this->activeFiber->start(); + + while (!$this->activeFiber->isTerminated()) { + $this->tick(); + } + + $result = $this->activeFiber->getReturn(); + $this->activeFiber = null; + + if ($result instanceof Error) { + throw new ConnectionException('Initialization failed: '.$result->message); + } + + $this->logger->info('HTTP client connected and initialized', ['endpoint' => $this->endpoint]); + } + + public function send(string $data): void + { + $request = $this->requestFactory->createRequest('POST', $this->endpoint) + ->withHeader('Content-Type', 'application/json') + ->withHeader('Accept', 'application/json, text/event-stream') + ->withBody($this->streamFactory->createStream($data)); + + if (null !== $this->sessionId) { + $request = $request->withHeader('Mcp-Session-Id', $this->sessionId); + } + + foreach ($this->headers as $name => $value) { + $request = $request->withHeader($name, $value); + } + + $this->logger->debug('Sending HTTP request', ['data' => $data]); + + try { + $response = $this->httpClient->sendRequest($request); + } catch (\Throwable $e) { + $this->handleError($e); + throw new ConnectionException('HTTP request failed: '.$e->getMessage(), 0, $e); + } + + if ($response->hasHeader('Mcp-Session-Id')) { + $this->sessionId = $response->getHeaderLine('Mcp-Session-Id'); + $this->logger->debug('Received session ID', ['session_id' => $this->sessionId]); + } + + $contentType = $response->getHeaderLine('Content-Type'); + + if (str_contains($contentType, 'text/event-stream')) { + $this->activeStream = $response->getBody(); + $this->sseBuffer = ''; + } elseif (str_contains($contentType, 'application/json')) { + $body = $response->getBody()->getContents(); + if (!empty($body)) { + $this->handleMessage($body); + } + } + } + + /** + * @param McpFiber $fiber + * @param (callable(float $progress, ?float $total, ?string $message): void)|null $onProgress + */ + public function runRequest(\Fiber $fiber, ?callable $onProgress = null): Response|Error + { + $this->activeFiber = $fiber; + $this->activeProgressCallback = $onProgress; + $fiber->start(); + + while (!$fiber->isTerminated()) { + $this->tick(); + } + + $this->activeFiber = null; + $this->activeProgressCallback = null; + $this->activeStream = null; + + return $fiber->getReturn(); + } + + public function close(): void + { + if (null !== $this->sessionId) { + try { + $request = $this->requestFactory->createRequest('DELETE', $this->endpoint) + ->withHeader('Mcp-Session-Id', $this->sessionId); + + foreach ($this->headers as $name => $value) { + $request = $request->withHeader($name, $value); + } + + $this->httpClient->sendRequest($request); + $this->logger->info('Session closed', ['session_id' => $this->sessionId]); + } catch (\Throwable $e) { + $this->logger->warning('Failed to close session', ['error' => $e->getMessage()]); + } + } + + $this->sessionId = null; + $this->activeStream = null; + $this->handleClose('Transport closed'); + } + + private function tick(): void + { + $this->processSSEStream(); + $this->processProgress(); + $this->processFiber(); + + usleep(1000); // 1ms + } + + /** + * Read SSE data incrementally from active stream. + */ + private function processSSEStream(): void + { + if (null === $this->activeStream) { + return; + } + + if (!$this->activeStream->eof()) { + $chunk = $this->activeStream->read(4096); + if ('' !== $chunk) { + $this->sseBuffer .= $chunk; + } + } + + while (false !== ($pos = strpos($this->sseBuffer, "\n\n"))) { + $event = substr($this->sseBuffer, 0, $pos); + $this->sseBuffer = substr($this->sseBuffer, $pos + 2); + + if (!empty(trim($event))) { + $this->processSSEEvent($event); + } + } + + if ($this->activeStream->eof() && empty($this->sseBuffer)) { + $this->activeStream = null; + } + } + + /** + * Parse a single SSE event and handle the message. + */ + private function processSSEEvent(string $event): void + { + $data = ''; + + foreach (explode("\n", $event) as $line) { + if (str_starts_with($line, 'data:')) { + $data .= trim(substr($line, 5)); + } + } + + if (!empty($data)) { + $this->handleMessage($data); + } + } + + /** + * Process pending progress updates from session and execute callback. + */ + private function processProgress(): void + { + if (null === $this->activeProgressCallback || null === $this->state) { + return; + } + + $updates = $this->state->consumeProgressUpdates(); + + foreach ($updates as $update) { + try { + ($this->activeProgressCallback)( + $update['progress'], + $update['total'], + $update['message'], + ); + } catch (\Throwable $e) { + $this->logger->warning('Progress callback failed', ['exception' => $e]); + } + } + } + + private function processFiber(): void + { + if (null === $this->activeFiber || !$this->activeFiber->isSuspended()) { + return; + } + + if (null === $this->state) { + return; + } + + $pendingRequests = $this->state->getPendingRequests(); + + foreach ($pendingRequests as $pending) { + $requestId = $pending['request_id']; + $timestamp = $pending['timestamp']; + $timeout = $pending['timeout']; + + $response = $this->state->consumeResponse($requestId); + + if (null !== $response) { + $this->logger->debug('Resuming fiber with response', ['request_id' => $requestId]); + $this->activeFiber->resume($response); + + return; + } + + if (time() - $timestamp >= $timeout) { + $this->logger->warning('Request timed out', ['request_id' => $requestId]); + $error = Error::forInternalError('Request timed out', $requestId); + $this->activeFiber->resume($error); + + return; + } + } + } +} diff --git a/src/Client/Transport/StdioTransport.php b/src/Client/Transport/StdioTransport.php new file mode 100644 index 00000000..b580b047 --- /dev/null +++ b/src/Client/Transport/StdioTransport.php @@ -0,0 +1,289 @@ + + */ +class StdioTransport extends BaseTransport +{ + /** @var resource|null */ + private $process; + + /** @var resource|null */ + private $stdin; + + /** @var resource|null */ + private $stdout; + + /** @var resource|null */ + private $stderr; + + private string $inputBuffer = ''; + + /** @var McpFiber|null */ + private ?\Fiber $activeFiber = null; + + /** @var (callable(float, ?float, ?string): void)|null */ + private $activeProgressCallback; + + /** + * @param string $command The command to run + * @param array $args Command arguments + * @param string|null $cwd Working directory + * @param array|null $env Environment variables + */ + public function __construct( + private readonly string $command, + private readonly array $args = [], + private readonly ?string $cwd = null, + private readonly ?array $env = null, + ?LoggerInterface $logger = null, + ) { + parent::__construct($logger); + } + + public function connectAndInitialize(): void + { + $this->spawnProcess(); + + $this->activeFiber = new \Fiber(fn () => $this->handleInitialize()); + + $this->activeFiber->start(); + + while (!$this->activeFiber->isTerminated()) { + $this->tick(); + } + + $result = $this->activeFiber->getReturn(); + $this->activeFiber = null; + + if ($result instanceof Error) { + $this->close(); + throw new ConnectionException('Initialization failed: '.$result->message); + } + + $this->logger->info('Client connected and initialized'); + } + + public function send(string $data): void + { + if (null === $this->stdin || !\is_resource($this->stdin)) { + throw new ConnectionException('Process stdin not available'); + } + + fwrite($this->stdin, $data."\n"); + fflush($this->stdin); + + $this->logger->debug('Sent message to server', ['data' => $data]); + } + + /** + * @param McpFiber $fiber + * @param (callable(float $progress, ?float $total, ?string $message): void)|null $onProgress + */ + public function runRequest(\Fiber $fiber, ?callable $onProgress = null): Response|Error + { + $this->activeFiber = $fiber; + $this->activeProgressCallback = $onProgress; + $fiber->start(); + + while (!$fiber->isTerminated()) { + $this->tick(); + } + + $this->activeFiber = null; + $this->activeProgressCallback = null; + + return $fiber->getReturn(); + } + + public function close(): void + { + if (\is_resource($this->stdin)) { + fclose($this->stdin); + $this->stdin = null; + } + if (\is_resource($this->stdout)) { + fclose($this->stdout); + $this->stdout = null; + } + if (\is_resource($this->stderr)) { + fclose($this->stderr); + $this->stderr = null; + } + if (\is_resource($this->process)) { + proc_terminate($this->process, 15); // SIGTERM + proc_close($this->process); + $this->process = null; + } + + $this->handleClose('Transport closed'); + } + + private function spawnProcess(): void + { + $descriptors = [ + 0 => ['pipe', 'r'], // stdin + 1 => ['pipe', 'w'], // stdout + 2 => ['pipe', 'w'], // stderr + ]; + + $cmd = escapeshellcmd($this->command); + foreach ($this->args as $arg) { + $cmd .= ' '.escapeshellarg($arg); + } + + $this->process = proc_open( + $cmd, + $descriptors, + $pipes, + $this->cwd, + $this->env + ); + + if (!\is_resource($this->process)) { + throw new ConnectionException('Failed to start process: '.$cmd); + } + + $this->stdin = $pipes[0]; + $this->stdout = $pipes[1]; + $this->stderr = $pipes[2]; + + // Set non-blocking mode for reading + stream_set_blocking($this->stdout, false); + stream_set_blocking($this->stderr, false); + + $this->logger->info('Started MCP server process', ['command' => $cmd]); + } + + private function tick(): void + { + $this->processInput(); + $this->processProgress(); + $this->processFiber(); + $this->processStderr(); + + usleep(1000); // 1ms + } + + /** + * Process pending progress updates from session and execute callback. + */ + private function processProgress(): void + { + if (null === $this->activeProgressCallback || null === $this->state) { + return; + } + + $updates = $this->state->consumeProgressUpdates(); + + foreach ($updates as $update) { + try { + ($this->activeProgressCallback)( + $update['progress'], + $update['total'], + $update['message'], + ); + } catch (\Throwable $e) { + $this->logger->warning('Progress callback failed', ['exception' => $e]); + } + } + } + + private function processInput(): void + { + if (null === $this->stdout || !\is_resource($this->stdout)) { + return; + } + + $data = fread($this->stdout, 8192); + if (false !== $data && '' !== $data) { + $this->inputBuffer .= $data; + } + + while (false !== ($pos = strpos($this->inputBuffer, "\n"))) { + $line = substr($this->inputBuffer, 0, $pos); + $this->inputBuffer = substr($this->inputBuffer, $pos + 1); + + $trimmed = trim($line); + if (!empty($trimmed)) { + $this->handleMessage($trimmed); + } + } + } + + private function processFiber(): void + { + if (null === $this->activeFiber || !$this->activeFiber->isSuspended()) { + return; + } + + if (null === $this->state) { + return; + } + + $pendingRequests = $this->state->getPendingRequests(); + + foreach ($pendingRequests as $pending) { + $requestId = $pending['request_id']; + $timestamp = $pending['timestamp']; + $timeout = $pending['timeout']; + + // Check if response arrived + $response = $this->state->consumeResponse($requestId); + + if (null !== $response) { + $this->logger->debug('Resuming fiber with response', ['request_id' => $requestId]); + $this->activeFiber->resume($response); + + return; + } + + // Check timeout + if (time() - $timestamp >= $timeout) { + $this->logger->warning('Request timed out', ['request_id' => $requestId]); + $error = Error::forInternalError('Request timed out', $requestId); + $this->activeFiber->resume($error); + + return; + } + } + } + + private function processStderr(): void + { + if (null === $this->stderr || !\is_resource($this->stderr)) { + return; + } + + $stderr = fread($this->stderr, 8192); + if (false !== $stderr && '' !== $stderr) { + $this->logger->debug('Server stderr', ['output' => trim($stderr)]); + } + } +} diff --git a/src/Client/Transport/TransportInterface.php b/src/Client/Transport/TransportInterface.php new file mode 100644 index 00000000..84e15d4f --- /dev/null +++ b/src/Client/Transport/TransportInterface.php @@ -0,0 +1,107 @@ +|Error) + * @phpstan-type FiberResume (Response|Error) + * @phpstan-type FiberSuspend array{type: 'await_response', request_id: int, timeout: int} + * @phpstan-type McpFiber \Fiber + * + * @author Kyrian Obikwelu + */ +interface TransportInterface +{ + /** + * Connect to the MCP server and perform initialization handshake. + * + * This method blocks until: + * - Initialization completes successfully + * - Connection fails (throws ConnectionException) + * + * @throws \Mcp\Exception\ConnectionException + */ + public function connectAndInitialize(): void; + + /** + * Send a message to the server immediately. + * + * @param string $data JSON-encoded message + */ + public function send(string $data): void; + + /** + * Run a request fiber to completion. + * + * The transport starts the fiber, runs its internal loop, and resumes + * the fiber when a response arrives or timeout occurs. + * + * During the loop, the transport checks session for progress data and + * executes the callback if provided. + * + * @param McpFiber $fiber The fiber to execute + * @param (callable(float $progress, ?float $total, ?string $message): void)|null $onProgress + * Optional callback for progress updates + * + * @return Response>|Error The response or error + */ + public function runRequest(\Fiber $fiber, ?callable $onProgress = null): Response|Error; + + /** + * Close the transport and clean up resources. + */ + public function close(): void; + + /** + * Register callback for initialization handshake. + * + * The callback should return a Fiber that performs the initialization. + * + * @param callable(): mixed $callback + */ + public function onInitialize(callable $callback): void; + + /** + * Register callback for incoming messages from server. + * + * @param callable(string $message): void $callback + */ + public function onMessage(callable $callback): void; + + /** + * Register callback for transport errors. + * + * @param callable(\Throwable $error): void $callback + */ + public function onError(callable $callback): void; + + /** + * Register callback for when connection closes. + * + * @param callable(string $reason): void $callback + */ + public function onClose(callable $callback): void; + + /** + * Set the client state for runtime state management. + */ + public function setState(ClientStateInterface $state): void; +} diff --git a/src/Exception/ConnectionException.php b/src/Exception/ConnectionException.php new file mode 100644 index 00000000..4e4527f5 --- /dev/null +++ b/src/Exception/ConnectionException.php @@ -0,0 +1,21 @@ + + */ +class ConnectionException extends Exception +{ +} diff --git a/src/Exception/RequestException.php b/src/Exception/RequestException.php new file mode 100644 index 00000000..44b8a91f --- /dev/null +++ b/src/Exception/RequestException.php @@ -0,0 +1,40 @@ + + */ +class RequestException extends Exception +{ + private ?Error $error; + + public function __construct(string $message = '', int $code = 0, ?\Throwable $previous = null, ?Error $error = null) + { + parent::__construct($message, $code, $previous); + $this->error = $error; + } + + public static function fromError(Error $error): self + { + return new self($error->message, $error->code, null, $error); + } + + public function getError(): ?Error + { + return $this->error; + } +} diff --git a/src/Exception/SamplingException.php b/src/Exception/SamplingException.php new file mode 100644 index 00000000..17abcebc --- /dev/null +++ b/src/Exception/SamplingException.php @@ -0,0 +1,24 @@ + + */ +final class SamplingException extends \RuntimeException implements ExceptionInterface +{ +} diff --git a/src/Exception/TimeoutException.php b/src/Exception/TimeoutException.php new file mode 100644 index 00000000..8fa5c74a --- /dev/null +++ b/src/Exception/TimeoutException.php @@ -0,0 +1,21 @@ + + */ +class TimeoutException extends Exception +{ +} diff --git a/src/Schema/ClientCapabilities.php b/src/Schema/ClientCapabilities.php index 0428c2eb..44fe9592 100644 --- a/src/Schema/ClientCapabilities.php +++ b/src/Schema/ClientCapabilities.php @@ -34,8 +34,8 @@ public function __construct( * @param array{ * roots?: array{ * listChanged?: bool, - * }, - * sampling?: bool, + * }|object, + * sampling?: object|bool, * experimental?: array, * } $data */ diff --git a/src/Schema/Request/CreateSamplingMessageRequest.php b/src/Schema/Request/CreateSamplingMessageRequest.php index 99aae118..3014405e 100644 --- a/src/Schema/Request/CreateSamplingMessageRequest.php +++ b/src/Schema/Request/CreateSamplingMessageRequest.php @@ -74,17 +74,33 @@ protected static function fromParams(?array $params): static throw new InvalidArgumentException('Missing or invalid "maxTokens" parameter for sampling/createMessage.'); } + $messages = []; + foreach ($params['messages'] as $messageData) { + if ($messageData instanceof SamplingMessage) { + $messages[] = $messageData; + } elseif (\is_array($messageData)) { + $messages[] = SamplingMessage::fromArray($messageData); + } else { + throw new InvalidArgumentException('Invalid message format in sampling/createMessage.'); + } + } + $preferences = null; if (isset($params['preferences'])) { $preferences = ModelPreferences::fromArray($params['preferences']); } + $includeContext = null; + if (isset($params['includeContext']) && \is_string($params['includeContext'])) { + $includeContext = SamplingContext::tryFrom($params['includeContext']); + } + return new self( - $params['messages'], + $messages, $params['maxTokens'], $preferences, $params['systemPrompt'] ?? null, - $params['includeContext'] ?? null, + $includeContext, $params['temperature'] ?? null, $params['stopSequences'] ?? null, $params['metadata'] ?? null, diff --git a/src/Schema/Result/CompletionCompleteResult.php b/src/Schema/Result/CompletionCompleteResult.php index 813e8abd..d7fa4c4b 100644 --- a/src/Schema/Result/CompletionCompleteResult.php +++ b/src/Schema/Result/CompletionCompleteResult.php @@ -60,4 +60,18 @@ public function jsonSerialize(): array return ['completion' => $completion]; } + + /** + * @param array $data + */ + public static function fromArray(array $data): self + { + $completion = $data['completion'] ?? []; + + return new self( + $completion['values'] ?? [], + $completion['total'] ?? null, + $completion['hasMore'] ?? null, + ); + } }