From b91f1c9bf79091d3fece870a5a7af2620bcb9dab Mon Sep 17 00:00:00 2001 From: Chris Lee Date: Sun, 21 Jun 2026 08:07:18 +1000 Subject: [PATCH 1/2] feat(observability): emit failed_stage and failed_stage_delta_ms on pipeline.failed pipeline.failed carried only err + pipeline_wall_clock_ms, so an operator had to scan back for the last pipeline.stage line and know the static stage order to guess which stage threw. Add a StageTracker threaded as an optional 4th arg through the 7 timeStage calls + the prompt.build site: timeStage sets it on entry, clears on success, and leaves it set on throw, so the outer catch attributes failed_stage + failed_stage_delta_ms. The finalize inner catch clears the tracker so a later throw is not mis-attributed. New strict PipelineFailedLogSchema mirrors PipelineCompletedLogSchema. Additive only. Closes #226 Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_015v2bspPF1ZTTG9ZZrbBgA1 --- docs/operate/observability.md | 12 ++-- src/core/log-fields.ts | 50 ++++++++++++- src/core/pipeline.ts | 128 ++++++++++++++++++++++------------ test/core/log-fields.test.ts | 102 +++++++++++++++++++++++++++ 4 files changed, 240 insertions(+), 52 deletions(-) diff --git a/docs/operate/observability.md b/docs/operate/observability.md index 08bae5e..c193741 100644 --- a/docs/operate/observability.md +++ b/docs/operate/observability.md @@ -49,12 +49,12 @@ The crash path is covered too. `installFatalHandlers(processName)` in `src/logge `runPipeline` (`src/core/pipeline.ts`) emits structured timing events whose `pipeline.stage` shape is pinned by the `.strict()` Zod schema in `src/core/log-fields.ts` (parallel to the ship schema below; the co-located test rejects field drift). Four event keys: -| `event` | Meaning | -| -------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `pipeline.started` | Request entered `runPipeline`. Carries the child-logger bindings (deliveryId, entity). | -| `pipeline.stage` | One stage finished; carries `stage` + `delta_ms`. Stages: trackingComment.create, token.resolve, github.fetch, prompt.build, repo.clone, executor.invoke, trackingComment.finalize, workspace.cleanup. | -| `pipeline.completed` | Success terminal line; carries `pipeline_wall_clock_ms` alongside `costUsd` / `numTurns` and the token counters `inputTokens` / `outputTokens` / `cacheReadInputTokens` / `cacheCreationInputTokens` (issue #192). Pinned by `PipelineCompletedLogSchema`. | -| `pipeline.failed` | Failure terminal line; carries `pipeline_wall_clock_ms` + the redacted `err`. | +| `event` | Meaning | +| -------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `pipeline.started` | Request entered `runPipeline`. Carries the child-logger bindings (deliveryId, entity). | +| `pipeline.stage` | One stage finished; carries `stage` + `delta_ms`. Stages: trackingComment.create, token.resolve, github.fetch, prompt.build, repo.clone, executor.invoke, trackingComment.finalize, workspace.cleanup. | +| `pipeline.completed` | Success terminal line; carries `pipeline_wall_clock_ms` alongside `costUsd` / `numTurns` and the token counters `inputTokens` / `outputTokens` / `cacheReadInputTokens` / `cacheCreationInputTokens` (issue #192). Pinned by `PipelineCompletedLogSchema`. | +| `pipeline.failed` | Failure terminal line; carries `pipeline_wall_clock_ms` + the redacted `err`, plus `failed_stage` (the timed stage in flight when the throw happened) and `failed_stage_delta_ms` (that stage's wall-clock up to the throw) when a stage was running (issue #226). Both stage fields are omitted if no timed stage was active. Pinned by `PipelineFailedLogSchema`. | ### Token usage and the cache hit-ratio diff --git a/src/core/log-fields.ts b/src/core/log-fields.ts index 34adfa3..bc13ff8 100644 --- a/src/core/log-fields.ts +++ b/src/core/log-fields.ts @@ -61,6 +61,41 @@ export const PipelineCompletedLogSchema = z export type PipelineCompletedLog = z.infer; +/** + * Shape of the terminal `pipeline.failed` line (issue #226). The two stage + * fields are `.optional()` because a failure before any timed stage starts + * (or after all stages cleared) carries neither. `failed_stage` is the stage + * in flight when the throw happened; `failed_stage_delta_ms` is that stage's + * wall-clock up to the throw. `err` (the standard pino error field) is not + * pinned here, same as `PipelineCompletedLogSchema`: this schema fixes only the + * custom metric fields. `.strict()` so an emitter that adds an unpinned field, + * or mistypes one, trips the co-located test. + */ +export const PipelineFailedLogSchema = z + .object({ + event: z.literal(CORE_PIPELINE_LOG_EVENTS.failed), + failed_stage: z.string().optional(), + failed_stage_delta_ms: z.number().int().nonnegative().optional(), + pipeline_wall_clock_ms: z.number().int().nonnegative(), + }) + .strict(); + +export type PipelineFailedLog = z.infer; + +/** + * Per-pipeline cursor of the stage currently in flight (issue #226). `timeStage` + * sets `active` before awaiting and clears it on success, so after a throw it + * still points at the failed stage, letting the terminal `pipeline.failed` line + * attribute which stage threw and its wall-clock to that point. + */ +export interface StageTracker { + active: { stage: string; startedAt: number } | null; +} + +export function createStageTracker(): StageTracker { + return { active: null }; +} + /** * Emit a `pipeline.stage` event measuring `Date.now() - startedAt`. The child * logger's bindings (deliveryId, owner, repo, entityNumber) are prepended by @@ -77,11 +112,20 @@ export function logPipelineStage(log: Logger, stage: string, startedAt: number): * Run an awaited stage, timing it end-to-end and emitting one `pipeline.stage` * event on success. Errors propagate unchanged (the failure is logged by the * pipeline's terminal `pipeline.failed` line), so a throwing stage does not - * emit a misleading "completed" timing. + * emit a misleading "completed" timing. When a `tracker` is passed, `active` is + * set before awaiting and cleared after the success log; a throw skips the clear + * so `active` still names the failed stage for the failure path to attribute. */ -export async function timeStage(log: Logger, stage: string, fn: () => Promise): Promise { +export async function timeStage( + log: Logger, + stage: string, + fn: () => Promise, + tracker?: StageTracker, +): Promise { const startedAt = Date.now(); - const value = await fn(); + if (tracker) tracker.active = { stage, startedAt }; + const value = await fn(); // a throw here leaves tracker.active on this stage and propagates unchanged logPipelineStage(log, stage, startedAt); + if (tracker) tracker.active = null; return value; } diff --git a/src/core/pipeline.ts b/src/core/pipeline.ts index 49e2c07..926999b 100644 --- a/src/core/pipeline.ts +++ b/src/core/pipeline.ts @@ -14,7 +14,12 @@ import { checkoutRepo } from "./checkout"; import { executeAgent } from "./executor"; import { fetchGitHubData } from "./fetcher"; import { resolveGithubToken } from "./github-token"; -import { CORE_PIPELINE_LOG_EVENTS, logPipelineStage, timeStage } from "./log-fields"; +import { + CORE_PIPELINE_LOG_EVENTS, + createStageTracker, + logPipelineStage, + timeStage, +} from "./log-fields"; import { buildPrompt, buildPromptParts, resolveAllowedTools } from "./prompt-builder"; import { createTrackingComment, finalizeTrackingComment } from "./tracking-comment"; @@ -286,6 +291,9 @@ export async function runPipeline( // (issue #166); pipeline_wall_clock_ms on the terminal line is measured from // here so an operator can see total request cost without a webhook arriving. const pipelineStartedAt = Date.now(); + // Cursor of the stage in flight; cleared on each stage's success so the outer + // catch can attribute which stage threw (issue #226). + const stageTracker = createStageTracker(); try { ctx.log.info({ event: CORE_PIPELINE_LOG_EVENTS.started }, "Pipeline started"); @@ -299,28 +307,39 @@ export async function runPipeline( } else if (ctx.skipTrackingComments === true) { ctx.log.info("Skipping tracking comment (skipTrackingComments)"); } else { - trackingCommentId = await timeStage(ctx.log, "trackingComment.create", () => - retryWithBackoff(() => createTrackingComment(ctx), { - maxAttempts: 3, - initialDelayMs: 1000, - log: ctx.log, - op: "tracking_comment.create", - }), + trackingCommentId = await timeStage( + ctx.log, + "trackingComment.create", + () => + retryWithBackoff(() => createTrackingComment(ctx), { + maxAttempts: 3, + initialDelayMs: 1000, + log: ctx.log, + op: "tracking_comment.create", + }), + stageTracker, ); } const resolvedTrackingCommentId = trackingCommentId; - const installationToken = await timeStage(ctx.log, "token.resolve", () => - resolveGithubToken(ctx.octokit), + const installationToken = await timeStage( + ctx.log, + "token.resolve", + () => resolveGithubToken(ctx.octokit), + stageTracker, ); - const data = await timeStage(ctx.log, "github.fetch", () => - retryWithBackoff(() => fetchGitHubData(ctx), { - maxAttempts: 3, - initialDelayMs: 2000, - log: ctx.log, - op: "github.fetch", - }), + const data = await timeStage( + ctx.log, + "github.fetch", + () => + retryWithBackoff(() => fetchGitHubData(ctx), { + maxAttempts: 3, + initialDelayMs: 2000, + log: ctx.log, + op: "github.fetch", + }), + stageTracker, ); const enrichedCtx: EnrichedBotContext = { @@ -365,6 +384,7 @@ export async function runPipeline( // `promptParts` is forwarded when cacheable layout is on so the executor // can pivot to systemPrompt.append + excludeDynamicSections (issue #134). const promptBuildAt = Date.now(); + stageTracker.active = { stage: "prompt.build", startedAt: promptBuildAt }; const prompt = buildPrompt( enrichedCtx, data, @@ -376,6 +396,7 @@ export async function runPipeline( ? buildPromptParts(enrichedCtx, data, resolvedTrackingCommentId, overrides.discussionDigest) : undefined; logPipelineStage(ctx.log, "prompt.build", promptBuildAt); + stageTracker.active = null; if (ctx.dryRun === true) { ctx.log.info( @@ -385,8 +406,11 @@ export async function runPipeline( return { success: true, durationMs: 0, costUsd: 0, numTurns: 0, dryRun: true }; } - const { workDir, cleanup } = await timeStage(enrichedCtx.log, "repo.clone", () => - checkoutRepo(enrichedCtx, installationToken, enrichedCtx.baseBranch), + const { workDir, cleanup } = await timeStage( + enrichedCtx.log, + "repo.clone", + () => checkoutRepo(enrichedCtx, installationToken, enrichedCtx.baseBranch), + stageTracker, ); overrides.onWorkDirReady?.(workDir); @@ -445,36 +469,48 @@ export async function runPipeline( ] : withResolveTool; - const result = await timeStage(enrichedCtx.log, "executor.invoke", () => - executeAgent({ - ctx: enrichedCtx, - prompt, - mcpServers, - workDir, - artifactsDir, - allowedTools, - installationToken, - ...(overrides.maxTurns !== undefined ? { maxTurns: overrides.maxTurns } : {}), - ...(overrides.signal !== undefined ? { signal: overrides.signal } : {}), - ...(promptParts !== undefined ? { promptParts } : {}), - }), + const result = await timeStage( + enrichedCtx.log, + "executor.invoke", + () => + executeAgent({ + ctx: enrichedCtx, + prompt, + mcpServers, + workDir, + artifactsDir, + allowedTools, + installationToken, + ...(overrides.maxTurns !== undefined ? { maxTurns: overrides.maxTurns } : {}), + ...(overrides.signal !== undefined ? { signal: overrides.signal } : {}), + ...(promptParts !== undefined ? { promptParts } : {}), + }), + stageTracker, ); if (resolvedTrackingCommentId !== undefined && !callerOwnsTrackingComment) { try { const finalOpts = buildFinalOpts(result); - await timeStage(enrichedCtx.log, "trackingComment.finalize", () => - retryWithBackoff( - () => finalizeTrackingComment(enrichedCtx, resolvedTrackingCommentId, finalOpts), - { - maxAttempts: 3, - initialDelayMs: 1000, - log: enrichedCtx.log, - op: "tracking_comment.finalize", - }, - ), + await timeStage( + enrichedCtx.log, + "trackingComment.finalize", + () => + retryWithBackoff( + () => finalizeTrackingComment(enrichedCtx, resolvedTrackingCommentId, finalOpts), + { + maxAttempts: 3, + initialDelayMs: 1000, + log: enrichedCtx.log, + op: "tracking_comment.finalize", + }, + ), + stageTracker, ); } catch (finalizeError) { + // The finalize error is handled here, so no stage is in flight anymore. + // Clear the tracker so a later throw is not mis-attributed to + // trackingComment.finalize (timeStage sets active but never clears on throw). + stageTracker.active = null; enrichedCtx.log.error( { err: finalizeError }, "Failed to finalize tracking comment after successful execution", @@ -553,7 +589,7 @@ export async function runPipeline( }; } finally { try { - await timeStage(ctx.log, "workspace.cleanup", () => cleanup()); + await timeStage(ctx.log, "workspace.cleanup", () => cleanup(), stageTracker); } catch (cleanupError) { ctx.log.error({ err: cleanupError }, "Failed to cleanup temp directory"); } @@ -565,10 +601,16 @@ export async function runPipeline( } } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); + // Non-null only if a timed stage was in flight when the throw happened + // (issue #226); each stage clears it on success. + const failed = stageTracker.active; ctx.log.error( { event: CORE_PIPELINE_LOG_EVENTS.failed, err, + ...(failed + ? { failed_stage: failed.stage, failed_stage_delta_ms: Date.now() - failed.startedAt } + : {}), pipeline_wall_clock_ms: Date.now() - pipelineStartedAt, }, "Request processing failed", diff --git a/test/core/log-fields.test.ts b/test/core/log-fields.test.ts index f15e6ae..e370d23 100644 --- a/test/core/log-fields.test.ts +++ b/test/core/log-fields.test.ts @@ -2,9 +2,16 @@ import { describe, expect, it } from "bun:test"; import { CORE_PIPELINE_LOG_EVENTS, + createStageTracker, PipelineCompletedLogSchema, + PipelineFailedLogSchema, PipelineStageLogSchema, + timeStage, } from "../../src/core/log-fields"; +import { type Logger } from "../../src/logger"; + +// No-op pino-like stub: timeStage calls log.info on the success path. +const log = { info: () => {} } as unknown as Logger; describe("PipelineCompletedLogSchema (#192)", () => { it("accepts a full pipeline.completed line carrying token counters", () => { @@ -110,3 +117,98 @@ describe("PipelineStageLogSchema (#166)", () => { }); }); }); + +describe("PipelineFailedLogSchema (#226)", () => { + it("accepts a well-formed failed record", () => { + const result = PipelineFailedLogSchema.safeParse({ + event: CORE_PIPELINE_LOG_EVENTS.failed, + failed_stage: "executor.invoke", + failed_stage_delta_ms: 240200, + pipeline_wall_clock_ms: 240630, + }); + expect(result.success).toBe(true); + }); + + it("accepts a failed record without the optional stage fields", () => { + const result = PipelineFailedLogSchema.safeParse({ + event: CORE_PIPELINE_LOG_EVENTS.failed, + pipeline_wall_clock_ms: 12, + }); + expect(result.success).toBe(true); + }); + + it("rejects a camelCase misnamed field (strict pins drift)", () => { + const result = PipelineFailedLogSchema.safeParse({ + event: CORE_PIPELINE_LOG_EVENTS.failed, + failedStage: "x", // camelCase typo, the field is failed_stage + pipeline_wall_clock_ms: 12, + }); + expect(result.success).toBe(false); + }); + + it("rejects an unknown extra field (strict)", () => { + const result = PipelineFailedLogSchema.safeParse({ + event: CORE_PIPELINE_LOG_EVENTS.failed, + pipeline_wall_clock_ms: 12, + surprise: "boo", + }); + expect(result.success).toBe(false); + }); + + it("rejects a negative failed_stage_delta_ms", () => { + const result = PipelineFailedLogSchema.safeParse({ + event: CORE_PIPELINE_LOG_EVENTS.failed, + failed_stage: "x", + failed_stage_delta_ms: -1, + pipeline_wall_clock_ms: 5, + }); + expect(result.success).toBe(false); + }); + + it("rejects the wrong event literal", () => { + const result = PipelineFailedLogSchema.safeParse({ + event: CORE_PIPELINE_LOG_EVENTS.completed, + pipeline_wall_clock_ms: 12, + }); + expect(result.success).toBe(false); + }); + + it("rejects a record missing pipeline_wall_clock_ms", () => { + const result = PipelineFailedLogSchema.safeParse({ + event: CORE_PIPELINE_LOG_EVENTS.failed, + }); + expect(result.success).toBe(false); + }); +}); + +describe("timeStage tracker (#226)", () => { + it("clears the tracker after a successful stage", async () => { + const t = createStageTracker(); + const value = await timeStage(log, "github.fetch", () => Promise.resolve("ok"), t); + expect(value).toBe("ok"); + expect(t.active).toBeNull(); + }); + + it("leaves the tracker pointing at the failed stage on throw", async () => { + const t = createStageTracker(); + let caught: unknown; + try { + await timeStage(log, "executor.invoke", () => Promise.reject(new Error("boom")), t); + } catch (e) { + caught = e; + } + expect(caught).toBeInstanceOf(Error); + expect((caught as Error).message).toBe("boom"); + expect(t.active?.stage).toBe("executor.invoke"); + expect(typeof t.active?.startedAt).toBe("number"); + }); + + it("starts with a null active tracker", () => { + expect(createStageTracker().active).toBeNull(); + }); + + it("works without a tracker (backward compatible)", async () => { + const value = await timeStage(log, "x", () => Promise.resolve(1)); + expect(value).toBe(1); + }); +}); From 7760b9ce331dbb7e08f6dee7a89ee90d4501d91e Mon Sep 17 00:00:00 2001 From: Chris Lee Date: Sun, 21 Jun 2026 08:14:40 +1000 Subject: [PATCH 2/2] fix(observability): enforce failed_stage pairing in PipelineFailedLogSchema Address review: failed_stage and failed_stage_delta_ms are emitted together or not at all, so a .refine rejects a record carrying only one. Add paired- contract regression tests. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_015v2bspPF1ZTTG9ZZrbBgA1 --- src/core/log-fields.ts | 9 +++++++-- test/core/log-fields.test.ts | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/core/log-fields.ts b/src/core/log-fields.ts index bc13ff8..f5aef3b 100644 --- a/src/core/log-fields.ts +++ b/src/core/log-fields.ts @@ -69,7 +69,9 @@ export type PipelineCompletedLog = z.infer; * wall-clock up to the throw. `err` (the standard pino error field) is not * pinned here, same as `PipelineCompletedLogSchema`: this schema fixes only the * custom metric fields. `.strict()` so an emitter that adds an unpinned field, - * or mistypes one, trips the co-located test. + * or mistypes one, trips the co-located test. The two stage fields are emitted + * together or not at all (the catch spreads both or neither), so a `.refine` + * rejects a record that carries only one. */ export const PipelineFailedLogSchema = z .object({ @@ -78,7 +80,10 @@ export const PipelineFailedLogSchema = z failed_stage_delta_ms: z.number().int().nonnegative().optional(), pipeline_wall_clock_ms: z.number().int().nonnegative(), }) - .strict(); + .strict() + .refine((v) => (v.failed_stage === undefined) === (v.failed_stage_delta_ms === undefined), { + message: "failed_stage and failed_stage_delta_ms must be present together", + }); export type PipelineFailedLog = z.infer; diff --git a/test/core/log-fields.test.ts b/test/core/log-fields.test.ts index e370d23..ad931c3 100644 --- a/test/core/log-fields.test.ts +++ b/test/core/log-fields.test.ts @@ -179,6 +179,24 @@ describe("PipelineFailedLogSchema (#226)", () => { }); expect(result.success).toBe(false); }); + + it("rejects failed_stage without failed_stage_delta_ms (paired contract)", () => { + const result = PipelineFailedLogSchema.safeParse({ + event: CORE_PIPELINE_LOG_EVENTS.failed, + failed_stage: "executor.invoke", + pipeline_wall_clock_ms: 5, + }); + expect(result.success).toBe(false); + }); + + it("rejects failed_stage_delta_ms without failed_stage (paired contract)", () => { + const result = PipelineFailedLogSchema.safeParse({ + event: CORE_PIPELINE_LOG_EVENTS.failed, + failed_stage_delta_ms: 42, + pipeline_wall_clock_ms: 5, + }); + expect(result.success).toBe(false); + }); }); describe("timeStage tracker (#226)", () => {