Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/operate/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ The crash path is covered too. `installFatalHandlers(processName)` in `src/logge

`runPipeline` (`src/core/pipeline.ts`) emits structured timing events whose `pipeline.stage` shape is pinned by the `.strict()` Zod schema in `src/core/log-fields.ts` (parallel to the ship schema below; the co-located test rejects field drift). Four event keys:

| `event` | Meaning |
| -------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `pipeline.started` | Request entered `runPipeline`. Carries the child-logger bindings (deliveryId, entity). |
| `pipeline.stage` | One stage finished; carries `stage` + `delta_ms`. Stages: trackingComment.create, token.resolve, github.fetch, prompt.build, repo.clone, executor.invoke, trackingComment.finalize, workspace.cleanup. |
| `pipeline.completed` | Success terminal line; carries `pipeline_wall_clock_ms` alongside `costUsd` / `numTurns` and the token counters `inputTokens` / `outputTokens` / `cacheReadInputTokens` / `cacheCreationInputTokens` (issue #192). Pinned by `PipelineCompletedLogSchema`. |
| `pipeline.failed` | Failure terminal line; carries `pipeline_wall_clock_ms` + the redacted `err`. |
| `event` | Meaning |
| -------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `pipeline.started` | Request entered `runPipeline`. Carries the child-logger bindings (deliveryId, entity). |
| `pipeline.stage` | One stage finished; carries `stage` + `delta_ms`. Stages: trackingComment.create, token.resolve, github.fetch, prompt.build, repo.clone, executor.invoke, trackingComment.finalize, workspace.cleanup. |
| `pipeline.completed` | Success terminal line; carries `pipeline_wall_clock_ms` alongside `costUsd` / `numTurns` and the token counters `inputTokens` / `outputTokens` / `cacheReadInputTokens` / `cacheCreationInputTokens` (issue #192). Pinned by `PipelineCompletedLogSchema`. |
| `pipeline.failed` | Failure terminal line; carries `pipeline_wall_clock_ms` + the redacted `err`, plus `failed_stage` (the timed stage in flight when the throw happened) and `failed_stage_delta_ms` (that stage's wall-clock up to the throw) when a stage was running (issue #226). Both stage fields are omitted if no timed stage was active. Pinned by `PipelineFailedLogSchema`. |

### Token usage and the cache hit-ratio

Expand Down
55 changes: 52 additions & 3 deletions src/core/log-fields.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,46 @@ export const PipelineCompletedLogSchema = z

export type PipelineCompletedLog = z.infer<typeof PipelineCompletedLogSchema>;

/**
* Shape of the terminal `pipeline.failed` line (issue #226). The two stage
* fields are `.optional()` because a failure before any timed stage starts
* (or after all stages cleared) carries neither. `failed_stage` is the stage
* in flight when the throw happened; `failed_stage_delta_ms` is that stage's
* wall-clock up to the throw. `err` (the standard pino error field) is not
* pinned here, same as `PipelineCompletedLogSchema`: this schema fixes only the
* custom metric fields. `.strict()` so an emitter that adds an unpinned field,
* or mistypes one, trips the co-located test. The two stage fields are emitted
* together or not at all (the catch spreads both or neither), so a `.refine`
* rejects a record that carries only one.
*/
export const PipelineFailedLogSchema = z
.object({
event: z.literal(CORE_PIPELINE_LOG_EVENTS.failed),
failed_stage: z.string().optional(),
failed_stage_delta_ms: z.number().int().nonnegative().optional(),
pipeline_wall_clock_ms: z.number().int().nonnegative(),
})
.strict()
.refine((v) => (v.failed_stage === undefined) === (v.failed_stage_delta_ms === undefined), {
message: "failed_stage and failed_stage_delta_ms must be present together",
});

export type PipelineFailedLog = z.infer<typeof PipelineFailedLogSchema>;

/**
* Per-pipeline cursor of the stage currently in flight (issue #226). `timeStage`
* sets `active` before awaiting and clears it on success, so after a throw it
* still points at the failed stage, letting the terminal `pipeline.failed` line
* attribute which stage threw and its wall-clock to that point.
*/
export interface StageTracker {
active: { stage: string; startedAt: number } | null;
}

export function createStageTracker(): StageTracker {
return { active: null };
}

/**
* Emit a `pipeline.stage` event measuring `Date.now() - startedAt`. The child
* logger's bindings (deliveryId, owner, repo, entityNumber) are prepended by
Expand All @@ -77,11 +117,20 @@ export function logPipelineStage(log: Logger, stage: string, startedAt: number):
* Run an awaited stage, timing it end-to-end and emitting one `pipeline.stage`
* event on success. Errors propagate unchanged (the failure is logged by the
* pipeline's terminal `pipeline.failed` line), so a throwing stage does not
* emit a misleading "completed" timing.
* emit a misleading "completed" timing. When a `tracker` is passed, `active` is
* set before awaiting and cleared after the success log; a throw skips the clear
* so `active` still names the failed stage for the failure path to attribute.
*/
export async function timeStage<T>(log: Logger, stage: string, fn: () => Promise<T>): Promise<T> {
export async function timeStage<T>(
log: Logger,
stage: string,
fn: () => Promise<T>,
tracker?: StageTracker,
): Promise<T> {
const startedAt = Date.now();
const value = await fn();
if (tracker) tracker.active = { stage, startedAt };
const value = await fn(); // a throw here leaves tracker.active on this stage and propagates unchanged
logPipelineStage(log, stage, startedAt);
if (tracker) tracker.active = null;
return value;
}
128 changes: 85 additions & 43 deletions src/core/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ import { checkoutRepo } from "./checkout";
import { executeAgent } from "./executor";
import { fetchGitHubData } from "./fetcher";
import { resolveGithubToken } from "./github-token";
import { CORE_PIPELINE_LOG_EVENTS, logPipelineStage, timeStage } from "./log-fields";
import {
CORE_PIPELINE_LOG_EVENTS,
createStageTracker,
logPipelineStage,
timeStage,
} from "./log-fields";
import { buildPrompt, buildPromptParts, resolveAllowedTools } from "./prompt-builder";
import { createTrackingComment, finalizeTrackingComment } from "./tracking-comment";

Expand Down Expand Up @@ -286,6 +291,9 @@ export async function runPipeline(
// (issue #166); pipeline_wall_clock_ms on the terminal line is measured from
// here so an operator can see total request cost without a webhook arriving.
const pipelineStartedAt = Date.now();
// Cursor of the stage in flight; cleared on each stage's success so the outer
// catch can attribute which stage threw (issue #226).
const stageTracker = createStageTracker();

try {
ctx.log.info({ event: CORE_PIPELINE_LOG_EVENTS.started }, "Pipeline started");
Expand All @@ -299,28 +307,39 @@ export async function runPipeline(
} else if (ctx.skipTrackingComments === true) {
ctx.log.info("Skipping tracking comment (skipTrackingComments)");
} else {
trackingCommentId = await timeStage(ctx.log, "trackingComment.create", () =>
retryWithBackoff(() => createTrackingComment(ctx), {
maxAttempts: 3,
initialDelayMs: 1000,
log: ctx.log,
op: "tracking_comment.create",
}),
trackingCommentId = await timeStage(
ctx.log,
"trackingComment.create",
() =>
retryWithBackoff(() => createTrackingComment(ctx), {
maxAttempts: 3,
initialDelayMs: 1000,
log: ctx.log,
op: "tracking_comment.create",
}),
stageTracker,
);
}
const resolvedTrackingCommentId = trackingCommentId;

const installationToken = await timeStage(ctx.log, "token.resolve", () =>
resolveGithubToken(ctx.octokit),
const installationToken = await timeStage(
ctx.log,
"token.resolve",
() => resolveGithubToken(ctx.octokit),
stageTracker,
);

const data = await timeStage(ctx.log, "github.fetch", () =>
retryWithBackoff(() => fetchGitHubData(ctx), {
maxAttempts: 3,
initialDelayMs: 2000,
log: ctx.log,
op: "github.fetch",
}),
const data = await timeStage(
ctx.log,
"github.fetch",
() =>
retryWithBackoff(() => fetchGitHubData(ctx), {
maxAttempts: 3,
initialDelayMs: 2000,
log: ctx.log,
op: "github.fetch",
}),
stageTracker,
);

const enrichedCtx: EnrichedBotContext = {
Expand Down Expand Up @@ -365,6 +384,7 @@ export async function runPipeline(
// `promptParts` is forwarded when cacheable layout is on so the executor
// can pivot to systemPrompt.append + excludeDynamicSections (issue #134).
const promptBuildAt = Date.now();
stageTracker.active = { stage: "prompt.build", startedAt: promptBuildAt };
const prompt = buildPrompt(
enrichedCtx,
data,
Expand All @@ -376,6 +396,7 @@ export async function runPipeline(
? buildPromptParts(enrichedCtx, data, resolvedTrackingCommentId, overrides.discussionDigest)
: undefined;
logPipelineStage(ctx.log, "prompt.build", promptBuildAt);
stageTracker.active = null;

if (ctx.dryRun === true) {
ctx.log.info(
Expand All @@ -385,8 +406,11 @@ export async function runPipeline(
return { success: true, durationMs: 0, costUsd: 0, numTurns: 0, dryRun: true };
}

const { workDir, cleanup } = await timeStage(enrichedCtx.log, "repo.clone", () =>
checkoutRepo(enrichedCtx, installationToken, enrichedCtx.baseBranch),
const { workDir, cleanup } = await timeStage(
enrichedCtx.log,
"repo.clone",
() => checkoutRepo(enrichedCtx, installationToken, enrichedCtx.baseBranch),
stageTracker,
);
overrides.onWorkDirReady?.(workDir);

Expand Down Expand Up @@ -445,36 +469,48 @@ export async function runPipeline(
]
: withResolveTool;

const result = await timeStage(enrichedCtx.log, "executor.invoke", () =>
executeAgent({
ctx: enrichedCtx,
prompt,
mcpServers,
workDir,
artifactsDir,
allowedTools,
installationToken,
...(overrides.maxTurns !== undefined ? { maxTurns: overrides.maxTurns } : {}),
...(overrides.signal !== undefined ? { signal: overrides.signal } : {}),
...(promptParts !== undefined ? { promptParts } : {}),
}),
const result = await timeStage(
enrichedCtx.log,
"executor.invoke",
() =>
executeAgent({
ctx: enrichedCtx,
prompt,
mcpServers,
workDir,
artifactsDir,
allowedTools,
installationToken,
...(overrides.maxTurns !== undefined ? { maxTurns: overrides.maxTurns } : {}),
...(overrides.signal !== undefined ? { signal: overrides.signal } : {}),
...(promptParts !== undefined ? { promptParts } : {}),
}),
stageTracker,
);

if (resolvedTrackingCommentId !== undefined && !callerOwnsTrackingComment) {
try {
const finalOpts = buildFinalOpts(result);
await timeStage(enrichedCtx.log, "trackingComment.finalize", () =>
retryWithBackoff(
() => finalizeTrackingComment(enrichedCtx, resolvedTrackingCommentId, finalOpts),
{
maxAttempts: 3,
initialDelayMs: 1000,
log: enrichedCtx.log,
op: "tracking_comment.finalize",
},
),
await timeStage(
enrichedCtx.log,
"trackingComment.finalize",
() =>
retryWithBackoff(
() => finalizeTrackingComment(enrichedCtx, resolvedTrackingCommentId, finalOpts),
{
maxAttempts: 3,
initialDelayMs: 1000,
log: enrichedCtx.log,
op: "tracking_comment.finalize",
},
),
stageTracker,
);
} catch (finalizeError) {
// The finalize error is handled here, so no stage is in flight anymore.
// Clear the tracker so a later throw is not mis-attributed to
// trackingComment.finalize (timeStage sets active but never clears on throw).
stageTracker.active = null;
enrichedCtx.log.error(
{ err: finalizeError },
"Failed to finalize tracking comment after successful execution",
Expand Down Expand Up @@ -553,7 +589,7 @@ export async function runPipeline(
};
} finally {
try {
await timeStage(ctx.log, "workspace.cleanup", () => cleanup());
await timeStage(ctx.log, "workspace.cleanup", () => cleanup(), stageTracker);
} catch (cleanupError) {
ctx.log.error({ err: cleanupError }, "Failed to cleanup temp directory");
}
Comment thread
chrisleekr marked this conversation as resolved.
Expand All @@ -565,10 +601,16 @@ export async function runPipeline(
}
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
// Non-null only if a timed stage was in flight when the throw happened
// (issue #226); each stage clears it on success.
const failed = stageTracker.active;
ctx.log.error(
{
event: CORE_PIPELINE_LOG_EVENTS.failed,
err,
...(failed
? { failed_stage: failed.stage, failed_stage_delta_ms: Date.now() - failed.startedAt }
: {}),
pipeline_wall_clock_ms: Date.now() - pipelineStartedAt,
},
"Request processing failed",
Expand Down
Loading
Loading