Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions plugins/codex/scripts/lib/broker-lifecycle.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,28 @@ export async function waitForBrokerEndpoint(endpoint, timeoutMs = 2000) {
return false;
}

export async function sendBrokerShutdown(endpoint) {
export async function sendBrokerShutdown(endpoint, timeoutMs = 5000) {
await new Promise((resolve) => {
const socket = connectToEndpoint(endpoint);
let settled = false;
const finish = () => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
socket.destroy();
resolve();
};
const timer = setTimeout(finish, timeoutMs);

socket.setEncoding("utf8");
socket.on("connect", () => {
socket.write(`${JSON.stringify({ id: 1, method: "broker/shutdown", params: {} })}\n`);
});
socket.on("data", () => {
socket.end();
resolve();
});
socket.on("error", resolve);
socket.on("close", resolve);
socket.on("data", finish);
socket.on("error", finish);
socket.on("close", finish);
});
}

Expand Down
44 changes: 44 additions & 0 deletions tests/broker-lifecycle.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import fs from "node:fs";
import net from "node:net";
import path from "node:path";
import test from "node:test";
import assert from "node:assert/strict";

import { createBrokerEndpoint, parseBrokerEndpoint } from "../plugins/codex/scripts/lib/broker-endpoint.mjs";
import { sendBrokerShutdown } from "../plugins/codex/scripts/lib/broker-lifecycle.mjs";
import { makeTempDir } from "./helpers.mjs";

test("sendBrokerShutdown resolves when the broker accepts the socket but never replies", async () => {
const endpoint = createBrokerEndpoint(makeTempDir(), process.platform);
const { path: socketPath } = parseBrokerEndpoint(endpoint);
let resolveShutdownReceived;
const shutdownReceived = new Promise((resolve) => {
resolveShutdownReceived = resolve;
});

const server = net.createServer((socket) => {
socket.setEncoding("utf8");
socket.on("data", (chunk) => {
if (String(chunk).includes("broker/shutdown")) {
resolveShutdownReceived();
}
// Deliberately keep the socket open without responding.
});
});

await new Promise((resolve, reject) => {
server.once("error", reject);
server.listen(socketPath, resolve);
});

try {
const startedAt = Date.now();
await Promise.all([sendBrokerShutdown(endpoint, 500), shutdownReceived]);
assert.ok(Date.now() - startedAt < 1000);
Comment thread
dhruvac29 marked this conversation as resolved.
} finally {
await new Promise((resolve) => server.close(resolve));
if (fs.existsSync(socketPath)) {
fs.unlinkSync(socketPath);
}
}
});