diff --git a/plugins/codex/scripts/lib/broker-lifecycle.mjs b/plugins/codex/scripts/lib/broker-lifecycle.mjs index ef763819..c2782713 100644 --- a/plugins/codex/scripts/lib/broker-lifecycle.mjs +++ b/plugins/codex/scripts/lib/broker-lifecycle.mjs @@ -40,19 +40,28 @@ export async function waitForBrokerEndpoint(endpoint, timeoutMs = 2000) { return false; } -export async function sendBrokerShutdown(endpoint) { +export async function sendBrokerShutdown(endpoint, timeoutMs = 5000) { await new Promise((resolve) => { const socket = connectToEndpoint(endpoint); + let settled = false; + const finish = () => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + socket.destroy(); + resolve(); + }; + const timer = setTimeout(finish, timeoutMs); + socket.setEncoding("utf8"); socket.on("connect", () => { socket.write(`${JSON.stringify({ id: 1, method: "broker/shutdown", params: {} })}\n`); }); - socket.on("data", () => { - socket.end(); - resolve(); - }); - socket.on("error", resolve); - socket.on("close", resolve); + socket.on("data", finish); + socket.on("error", finish); + socket.on("close", finish); }); } diff --git a/tests/broker-lifecycle.test.mjs b/tests/broker-lifecycle.test.mjs new file mode 100644 index 00000000..67882ec1 --- /dev/null +++ b/tests/broker-lifecycle.test.mjs @@ -0,0 +1,44 @@ +import fs from "node:fs"; +import net from "node:net"; +import path from "node:path"; +import test from "node:test"; +import assert from "node:assert/strict"; + +import { createBrokerEndpoint, parseBrokerEndpoint } from "../plugins/codex/scripts/lib/broker-endpoint.mjs"; +import { sendBrokerShutdown } from "../plugins/codex/scripts/lib/broker-lifecycle.mjs"; +import { makeTempDir } from "./helpers.mjs"; + +test("sendBrokerShutdown resolves when the broker accepts the socket but never replies", async () => { + const endpoint = createBrokerEndpoint(makeTempDir(), process.platform); + const { path: socketPath } = parseBrokerEndpoint(endpoint); + let resolveShutdownReceived; + const shutdownReceived = new Promise((resolve) => { + resolveShutdownReceived = resolve; + }); + + const server = net.createServer((socket) => { + socket.setEncoding("utf8"); + socket.on("data", (chunk) => { + if (String(chunk).includes("broker/shutdown")) { + resolveShutdownReceived(); + } + // Deliberately keep the socket open without responding. + }); + }); + + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(socketPath, resolve); + }); + + try { + const startedAt = Date.now(); + await Promise.all([sendBrokerShutdown(endpoint, 500), shutdownReceived]); + assert.ok(Date.now() - startedAt < 1000); + } finally { + await new Promise((resolve) => server.close(resolve)); + if (fs.existsSync(socketPath)) { + fs.unlinkSync(socketPath); + } + } +});