Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions src/cli/Cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ export function condense(json: Record<string, unknown>): Record<string, unknown>
return json;
}

/**
* emitFailureMessage — the end-of-run diagnostic for collector emit failures. An HTTP status means the collector
* received the request and rejected it; no status means the POST never landed (connection refused/timeout/DNS),
* so word each distinctly rather than calling both "rejected". `count` is the total failed emits this run; `last`
* is the most recent failure (whose reason is shown). Extracted so the wording/count stay unit-testable.
*/
export function emitFailureMessage(collector: string, count: number, last: EmitResult): string {
return last.status
? `collector ${collector} rejected ${count} emit(s): HTTP ${last.status}${last.body ? ` — ${last.body.slice(0, 200)}` : ""}`
: `${count} emit(s) to collector ${collector} failed: ${last.error ?? "unknown error"}`;
}

/** emit policy: bare --json → JSON to stdout; --json <path> → file (stdout stays human); else human. */
function emit(trace: Trace, renderHuman: () => string, options: any): void {
// Enforce the envelope contract before it leaves the process: structural violations become error
Expand Down Expand Up @@ -120,31 +132,42 @@ export class Cli {
// dashboard updates live as it runs.
const collector = await Collector.resolve(options.emit);
let emitChain: Promise<unknown> = Promise.resolve();
const emitFailures: EmitResult[] = [];
// Only the count and the most recent failure are surfaced, so keep just those — not every failed result.
// onProgress can emit on a hot path, and retaining each failure would grow memory without bound.
let emitFailureCount = 0;
let lastEmitFailure: EmitResult | undefined;
const emitToCollector = collector
? (envelope: unknown) => { emitChain = emitChain.then(async () => { const result = await Collector.emit(collector, envelope); if (!result.ok) emitFailures.push(result); }); }
? (envelope: unknown) => { emitChain = emitChain.then(async () => { const result = await Collector.emit(collector, envelope); if (!result.ok) { emitFailureCount++; lastEmitFailure = result; } }); }
: undefined;

const { trace } = await this.#dynamic.run({
target, port, launch,
breakpoints: options.breakpoint, exprs: options.expression,
steps, curl: options.curl,
root: options.root, maxHits: options.maxHits,
recordOut: options.output,
args: { target, ...(launch ? { launch: true } : { port }), breakpoints: options.breakpoint, ...(options.root ? { root: options.root } : {}), ...(options.maxHits ? { maxHits: options.maxHits } : {}), ...(steps.length ? { steps: steps.map(redactStep) } : {}), ...(options.curl ? { curl: options.curl } : {}) },
...(emitToCollector ? { onProgress: (intermediateTrace: Trace) => emitToCollector(intermediateTrace.toJSON()) } : {}),
});
let trace: Trace;
try {
({ trace } = await this.#dynamic.run({
target, port, launch,
breakpoints: options.breakpoint, exprs: options.expression,
steps, curl: options.curl,
root: options.root, maxHits: options.maxHits,
recordOut: options.output,
args: { target, ...(launch ? { launch: true } : { port }), breakpoints: options.breakpoint, ...(options.root ? { root: options.root } : {}), ...(options.maxHits ? { maxHits: options.maxHits } : {}), ...(steps.length ? { steps: steps.map(redactStep) } : {}), ...(options.curl ? { curl: options.curl } : {}) },
...(emitToCollector ? { onProgress: (intermediateTrace: Trace) => emitToCollector(intermediateTrace.toJSON()) } : {}),
}));
} catch (error) {
// The run threw (attach failed, engine crashed, recording threw). It already emitted a TERMINAL envelope
// via onProgress that clears the dashboard's "running" session — flush the chain so that POST actually
// lands before we exit, then surface the failure (non-zero exit + the same ENGINE_FATAL code in the log).
if (emitToCollector) await emitChain;
log.error("dynamic trace aborted before completion", { code: Code.ENGINE_FATAL, err: error });
process.exit(1);
}

// Flush the final (complete) envelope and all pending emits BEFORE rendering, so a rejected emit
// (a 400 schema error, a 503 dead store) becomes a visible diagnostic in the printed/--json envelope
// instead of vanishing into an info log — the gap that sent a debugging loop chasing the wrong cause.
if (emitToCollector) {
emitToCollector(trace.toJSON());
await emitChain;
if (emitFailures.length) {
const last = emitFailures[emitFailures.length - 1];
const reason = last.status ? `HTTP ${last.status}${last.body ? ` — ${last.body.slice(0, 200)}` : ""}` : (last.error ?? "unknown error");
trace.diagnostics.push(Diagnostic.warn(Code.EMIT, `collector ${collector} rejected ${emitFailures.length} emit(s): ${reason}`));
if (lastEmitFailure && collector) {
trace.diagnostics.push(Diagnostic.warn(Code.EMIT, emitFailureMessage(collector, emitFailureCount, lastEmitFailure)));
}
}
emit(trace, () => this.#dynamic.render(trace), options);
Expand Down
54 changes: 49 additions & 5 deletions src/cli/commands/DynamicCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,33 @@ export class DynamicCommand extends TraceCommand<DynamicRequest, DynamicResult>
const trace = this.#toTrace(capture, { sessionId, args: request.args ?? {}, startedAtMs });
if (isChrome) await this.#record(capture, trace, sessionId, request.recordOut);
return { trace, capture };
} catch (error) {
// A throw here (attach failed, engine crashed, recording threw) would otherwise leave the initial
// `running` partial (emitted above) orphaned in the dashboard forever — the session never resolves.
// Emit a TERMINAL envelope (no `running` flag → meta.running absent; ok:false via the error diagnostic)
// so the dashboard flips it to failed, and surface the cause in the stderr trail with the same code.
log.error("trace run aborted before completion", { code: Code.ENGINE_FATAL, sessionId, err: error });
request.onProgress?.(this.#abortedTrace(error, context));
throw error;
} finally {
launched?.kill();
}
}

/** A terminal envelope for a run that threw: empty data, an ENGINE_FATAL error, and crucially NO `running`
* flag, so the collector resolves the session instead of leaving its initial running partial hanging. */
#abortedTrace(error: unknown, context: RunCtx): Trace {
return this.envelope({
command: `run.${context.target}`,
data: new TraceData({ events: [] }),
diagnostics: [Diagnostic.error(Code.ENGINE_FATAL, String((error as Error)?.message ?? error).split("\n")[0])],
sessionId: context.sessionId,
args: context.args,
startedAtMs: context.startedAtMs,
target: new TargetReference({ kind: context.target, source: "cdp", trigger: context.trigger }),
});
}

/**
* A partial, mid-run envelope: the same shape as the finished trace but flagged `running` and carrying only
* the events captured so far (lineage/recording/diagnostics are computed once at the end in {@link #toTrace}).
Expand All @@ -103,7 +125,10 @@ export class DynamicCommand extends TraceCommand<DynamicRequest, DynamicResult>
#toTrace(capture: CaptureResult, context: { sessionId: string; args: Record<string, unknown>; startedAtMs: number }): Trace {
const source = "cdp";
const diagnostics: Diagnostic[] = [];
if (capture.fatal) diagnostics.push(Diagnostic.error(Code.ENGINE_FATAL, String(capture.fatal).split("\n")[0]));
if (capture.fatal) {
log.error("engine reported a fatal error", { code: Code.ENGINE_FATAL, sessionId: context.sessionId, fatal: String(capture.fatal).split("\n")[0] });
diagnostics.push(Diagnostic.error(Code.ENGINE_FATAL, String(capture.fatal).split("\n")[0]));
}
// A failed journey step (selector not found / timed out) flips the envelope's `ok` — same gate the old
// `journey` command applied to its exit code, now expressed as an error diagnostic.
for (const step of capture.steps ?? []) if (!step.ok) diagnostics.push(Diagnostic.error(Code.STEP_FAILED, `#${step.sequence} ${step.step}${step.note ? " — " + step.note : ""}`));
Expand Down Expand Up @@ -150,13 +175,32 @@ export class DynamicCommand extends TraceCommand<DynamicRequest, DynamicResult>
try {
const videoOutputPath = outputPath ?? join(tmpdir(), `trace-${sessionId}.mp4`);
const videoPath = await Recorder.renderJourney(capture.frames ?? [], capture.traced ?? [], videoOutputPath);
if (!videoPath) { log.warn("no frames captured — nothing to record", { code: Code.RECORD_EMPTY, sessionId }); return; }
const upload = this.artifacts && this.artifacts.isConfigured() ? await this.artifacts.upload(videoPath, `recordings/${sessionId}.mp4`, "video/mp4") : null;
if (!videoPath) {
// Both channels carry the same code: the stderr trail AND an envelope diagnostic, so an agent reading
// --json learns the chrome run produced no video (instead of inferring it from a missing `recording`).
log.warn("no frames captured — nothing to record", { code: Code.RECORD_EMPTY, sessionId });
trace.diagnostics.push(Diagnostic.warn(Code.RECORD_EMPTY, "no frames captured — the debug-replay video is empty (no breakpoint hits, or the journey produced no frames)."));
return;
}
const uploadConfigured = this.artifacts?.isConfigured() ?? false;
const upload = uploadConfigured ? await this.artifacts!.upload(videoPath, `recordings/${sessionId}.mp4`, "video/mp4") : null;
trace.data.recording = upload ? new Recording({ url: upload.url, bytes: upload.bytes }) : new Recording({ path: videoPath });
if (upload) log.info("recording uploaded", { sessionId, url: upload.url, bytes: upload.bytes });
else log.info("recording saved locally — set S3_ENDPOINT to upload + get a link", { sessionId, path: videoPath });
if (upload) {
log.info("recording uploaded", { sessionId, url: upload.url, bytes: upload.bytes });
} else if (uploadConfigured) {
// S3 WAS configured but upload() returned null → it failed (the error was logged inside the store).
// The video is still saved locally, but the dashboard gets no link — surface that instead of
// reporting a clean local save, so "no video link" isn't silently indistinguishable from success.
log.warn("recording upload failed — keeping local copy", { code: Code.UPLOAD, sessionId, path: videoPath });
trace.diagnostics.push(Diagnostic.warn(Code.UPLOAD, `recording upload failed — video saved locally at ${videoPath}, no dashboard link (check S3_ENDPOINT / credentials).`));
} else {
log.info("recording saved locally — set S3_ENDPOINT to upload + get a link", { sessionId, path: videoPath });
}
} catch (error: any) {
// Render or upload threw. Surface it in the envelope too (a warn — the trace data is still valid, only
// the replay is missing) so "no video" is never silent. Previously this was a stderr log the agent never saw.
log.error("recording failed", { code: Code.RECORD, sessionId, err: error });
trace.diagnostics.push(Diagnostic.warn(Code.RECORD, `debug-replay recording failed — ${String(error?.message ?? error).split("\n")[0]}`));
}
}
}
34 changes: 33 additions & 1 deletion src/collector/Collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,35 @@ export interface EmitResult { ok: boolean; status?: number; body?: string; error
* (14747, the compose-published host port from README/compose/scenarios), then the native `trace serve` default (4000). */
const DEFAULT_CANDIDATES = ["http://localhost:14747", "http://localhost:4000"];
const PROBE_TIMEOUT_MS = 500;
/** Cap on the rejection body kept in an {@link EmitResult}: enough to carry a real error message, bounded so a
* large error page can't bloat a caller that retains the result (e.g. across many onProgress emits). */
const MAX_BODY_CHARS = 10_000;

/**
* Read at most `maxChars` of a response body, then cancel the stream — so an oversized rejection page (an HTML
* 500, a stack-trace dump) can't cause a large transient buffer or extra download latency, not just an oversized
* *stored* body. Best-effort: a mid-read network error or a non-stream body falls back to keeping what we have.
*/
async function readCappedText(response: Response, maxChars: number): Promise<string> {
const stream = response.body;
if (!stream) return (await response.text().catch(() => "")).slice(0, maxChars);
const reader = stream.getReader();
const decoder = new TextDecoder();
let text = "";
try {
while (text.length < maxChars) {
const { done, value } = await reader.read();
if (done) break;
text += decoder.decode(value, { stream: true });
}
text += decoder.decode(); // flush bytes buffered from a multi-byte char split across the last chunk boundary
} catch {
// mid-read failure (connection dropped while reading the error body) — surface whatever arrived
} finally {
await reader.cancel().catch(() => {}); // stop downloading the rest once we have enough
}
return text.slice(0, maxChars);
}
Comment thread
burrows99 marked this conversation as resolved.

/**
* Collector — the client-side emit helper for shipping trace envelopes to a remote collector.
Expand All @@ -36,7 +65,10 @@ export class Collector {
}
// A rejection is a real failure — log it at error with the collector's reason (e.g. "invalid envelope",
// "trace store unavailable"), not at info. The caller folds this into the envelope's diagnostics.
const body = await response.text().catch(() => "");
// Cap the body at the source: read only up to MAX_BODY_CHARS off the stream and cancel, so a collector
// that answers with a giant HTML error page can't bloat process memory or stall on the download — callers
// hold onto the result. (The log line truncates further, to 500, separately.)
const body = await readCappedText(response, MAX_BODY_CHARS);
log.error("emit rejected", { code: Code.EMIT, endpoint, status: response.status, body: body.slice(0, 500) });
return { ok: false, status: response.status, body };
} catch (error: any) {
Expand Down
51 changes: 51 additions & 0 deletions test/dynamic-diagnostics.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// DynamicCommand diagnostics: a trace run must make its failures legible in the envelope (not just stderr),
// and a thrown run must emit a TERMINAL envelope so the dashboard's "running" session resolves instead of
// hanging forever. Injects a fake tracer so we exercise the envelope/diagnostic logic without a real CDP target.
import "reflect-metadata";
import { test } from "node:test";
import assert from "node:assert/strict";

import { DynamicCommand } from "../dist/cli/commands/DynamicCommand.js";
import { TargetKind } from "../dist/domain/Target.js";

const fakeTracer = (behavior) => ({
async traceNode() { return behavior(); },
async traceChrome() { return behavior(); },
});

const nodeCapture = (over = {}) => ({ target: TargetKind.Node, trigger: "curl localhost", breakpoints: [], events: [], ...over });

test("a thrown run emits a TERMINAL envelope (running cleared, ENGINE_FATAL) so the dashboard resolves", async () => {
const seen = [];
const cmd = new DynamicCommand(fakeTracer(() => { throw new Error("attach failed: ECONNREFUSED"); }));

await assert.rejects(
cmd.run({ target: TargetKind.Node, port: 9229, onProgress: (t) => seen.push(t) }),
/attach failed/,
);

// The first envelope is the initial running partial; the last must be the terminal abort.
assert.ok(seen.length >= 2, "expected an initial running partial AND a terminal abort envelope");
assert.equal(seen[0].meta.running, true, "the first envelope is the running partial");
const terminal = seen[seen.length - 1];
assert.notEqual(terminal.meta.running, true, "the terminal envelope is NOT running — the session resolves");
assert.equal(terminal.ok, false, "a terminal abort is not ok");
assert.ok(
terminal.diagnostics.some((d) => d.code === "ENGINE_FATAL" && d.level === "error"),
"the terminal envelope carries an ENGINE_FATAL error",
);
});

test("a captured fatal (no throw) yields ok:false + an ENGINE_FATAL diagnostic in the envelope", async () => {
const cmd = new DynamicCommand(fakeTracer(() => nodeCapture({ fatal: "debugger disconnected" })));
const { trace } = await cmd.run({ target: TargetKind.Node, port: 9229 });
assert.equal(trace.ok, false);
assert.ok(trace.diagnostics.some((d) => d.code === "ENGINE_FATAL"));
});

test("a clean empty node trace stays ok:true and not running (no false alarms)", async () => {
const cmd = new DynamicCommand(fakeTracer(() => nodeCapture()));
const { trace } = await cmd.run({ target: TargetKind.Node, port: 9229 });
assert.equal(trace.ok, true);
assert.equal(trace.meta.running, undefined, "the final envelope is not flagged running");
});
22 changes: 21 additions & 1 deletion test/output.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "reflect-metadata";
import { test } from "node:test";
import assert from "node:assert/strict";

import { condense } from "../dist/cli/Cli.js";
import { condense, emitFailureMessage } from "../dist/cli/Cli.js";
import { Code } from "../dist/shared/codes.js";
import { Collector } from "../dist/collector/Collector.js";

Expand Down Expand Up @@ -81,3 +81,23 @@ test("Collector.emit: a failed POST resolves to a rich result (never throws, nev
assert.equal(typeof result.error, "string", "the failure reason is carried back, not dropped");
assert.ok(result.error.length > 0, "error message is non-empty");
});

test("emitFailureMessage: an HTTP status reads as a 'rejected' diagnostic with the status + reason", () => {
const msg = emitFailureMessage("http://localhost:4000", 3, { ok: false, status: 400, body: "invalid envelope" });
assert.equal(msg, "collector http://localhost:4000 rejected 3 emit(s): HTTP 400 — invalid envelope");
});

test("emitFailureMessage: a no-status (network) failure reads as 'failed', never 'rejected'", () => {
// The bug this guards: a POST that never landed (connection refused/timeout/DNS) was reported as the collector
// having "rejected" a request it never received. A status-less failure must use the delivery wording instead.
const msg = emitFailureMessage("http://localhost:4000", 2, { ok: false, error: "fetch failed" });
assert.equal(msg, "2 emit(s) to collector http://localhost:4000 failed: fetch failed");
assert.doesNotMatch(msg, /rejected/, "a delivery failure must not claim the collector rejected anything");
});

test("emitFailureMessage: edge cases — no body omits the reason, missing error falls back, body caps at 200", () => {
assert.equal(emitFailureMessage("u", 1, { ok: false, status: 503 }), "collector u rejected 1 emit(s): HTTP 503");
assert.equal(emitFailureMessage("u", 1, { ok: false }), "1 emit(s) to collector u failed: unknown error");
const huge = emitFailureMessage("u", 1, { ok: false, status: 500, body: "x".repeat(5000) });
assert.equal(huge, "collector u rejected 1 emit(s): HTTP 500 — " + "x".repeat(200), "an oversized body is truncated to 200 chars in the diagnostic");
});