Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/operate/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ The load-bearing event is `retry.succeeded_after_retry`: it is the only signal i
| `retry.exhausted` | error | All `max_attempts` attempts failed. Carries the full retry-window `elapsed_ms`; rethrows the last error. Neither `delay_ms` nor `status` are emitted on this event. |
| `retry.succeeded_after_retry` | info | The call succeeded on `attempt > 1`. Weak-flake leading indicator: gated on `attempt > 1` so first-try successes stay silent. Alert on `count(...) by op` over 5-minute windows. Neither `delay_ms` nor `status` are emitted on this event. |

## Idempotency log fields

`claimDelivery` (`src/webhook/idempotency.ts`) is the webhook dedup chokepoint: a Valkey `SET key 1 NX EX` claim that returns `true` exactly once per `deliveryId` **only on the healthy Valkey path** within GitHub's 3-day redelivery window. When Valkey is unavailable or errors it fails **open**, returning `true` for every delivery (including redeliveries), so the exactly-once guarantee degrades to at-least-once. Its three-event family is pinned by a union of strict objects in `src/webhook/idempotency-log-fields.ts` so an emitter that mistypes an event name, drops `reason` from a fail-open line, or attaches `err` to the `unavailable` path trips the co-located test. Every event carries `deliveryId` (camelCase, the established child-logger delivery identifier binding). Behaviour is fail-open: `idempotency.claimed` and `idempotency.failed_open` both proceed with processing; only `idempotency.duplicate_skipped` skips.

| `event` | Level | Fields |
| ------------------------------- | ----- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `idempotency.claimed` | debug | `deliveryId`. The SET-NX won the claim (first time this delivery is seen); the caller proceeds. At `debug` because it fires once per non-duplicate delivery, too loud at `info` for a busy installation. |
| `idempotency.duplicate_skipped` | info | `deliveryId`. The SET-NX found an existing key (a redelivery); the caller skips. |
| `idempotency.failed_open` | warn | `deliveryId`, `reason` (`unavailable` when Valkey is unconfigured/disconnected, `error` when the SET threw), and `err` (the error message, on the `error` branch only). The caller proceeds (at-least-once degradation). |

## Output secret-guard log events

`safePostToGitHub` (`src/utils/github-output-guard.ts`) is the output-side chokepoint for every byte sent to GitHub. It emits structured `warn`/`error` events when the regex pass or the optional LLM scanner acts on a body. Per the logging contract, none of these carry the matched bytes, surrounding context, or a hash, only `kinds`, counts, lengths, `callsite`, and `deliveryId`.
Expand Down
58 changes: 58 additions & 0 deletions src/webhook/idempotency-log-fields.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Canonical pino log-field schema for `claimDelivery` observability (issue #232).
*
* Mirrors `src/utils/retry-log-fields.ts`: a strict Zod shape pins the
* structured `idempotency.*` event family so the emit sites in `claimDelivery`
* cannot drift on a field name (e.g. `reason` value or `failed_open` vs
* `failed-open`) without the co-located test catching it. Emitters log plain
* objects via `log.info` / `log.warn`; the schema is the drift-prevention
* contract, not a runtime validator on the hot path.
*
* The schema is a union of strict per-outcome objects so per-event field
* presence is pinned exactly: `claimed` / `duplicate_skipped` carry only
* `deliveryId`; `failed_open` splits into a `reason: "unavailable"` branch
* (no `err`) and a `reason: "error"` branch (`err` required). The fail-open
* split is two branches rather than a `reason` enum + optional `err` so the
* contract "`err` only on the `error` path" is enforced by the schema itself,
* not just by emit-site discipline: a future emit attaching `err` to an
* `unavailable` line is rejected here and trips the co-located test.
*/
import { z } from "zod";

export const IDEMPOTENCY_LOG_EVENTS = {
claimed: "idempotency.claimed",
duplicateSkipped: "idempotency.duplicate_skipped",
failedOpen: "idempotency.failed_open",
} as const;

// `deliveryId` stays camelCase because it is the established repo-wide
// child-logger delivery identifier binding; new metric-style fields use snake_case.
const deliveryId = z.string().min(1);

export const IdempotencyLogFieldsSchema = z.union([
/** Info: the SET-NX won the claim (first delivery seen); the caller proceeds. */
z.strictObject({
event: z.literal(IDEMPOTENCY_LOG_EVENTS.claimed),
deliveryId,
}),
/** Info: the SET-NX found an existing key (a redelivery); the caller skips. */
z.strictObject({
event: z.literal(IDEMPOTENCY_LOG_EVENTS.duplicateSkipped),
deliveryId,
}),
/** Warn: Valkey unconfigured/disconnected, no SET issued. The caller proceeds. */
z.strictObject({
event: z.literal(IDEMPOTENCY_LOG_EVENTS.failedOpen),
deliveryId,
reason: z.literal("unavailable"),
}),
/** Warn: the SET threw; `err` carries the message. The caller proceeds. */
z.strictObject({
event: z.literal(IDEMPOTENCY_LOG_EVENTS.failedOpen),
deliveryId,
reason: z.literal("error"),
err: z.string(),
}),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
]);
Comment thread
chrisleekr marked this conversation as resolved.

export type IdempotencyLogFields = z.infer<typeof IdempotencyLogFieldsSchema>;
26 changes: 22 additions & 4 deletions src/webhook/idempotency.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Logger } from "pino";

import { getValkeyClient, isValkeyHealthy } from "../orchestrator/valkey";
import { IDEMPOTENCY_LOG_EVENTS } from "./idempotency-log-fields";
Comment thread
chrisleekr marked this conversation as resolved.

/**
* Webhook delivery idempotency (issue #202).
Comment thread
chrisleekr marked this conversation as resolved.
Expand Down Expand Up @@ -44,7 +45,10 @@ export async function claimDelivery(deliveryId: string, log: Logger): Promise<bo
// takes the immediate fail-open path, leaving the durable backstops
// (`idx_workflow_runs_inflight` + tracking-comment marker scan) to dedup.
if (client === null || !isValkeyHealthy()) {
log.warn({ deliveryId }, "claimDelivery: Valkey unavailable, proceeding (fail-open)");
log.warn(
{ deliveryId, event: IDEMPOTENCY_LOG_EVENTS.failedOpen, reason: "unavailable" },
"claimDelivery: Valkey unavailable, proceeding (fail-open)",
);
return true;
}
try {
Expand All @@ -58,15 +62,29 @@ export async function claimDelivery(deliveryId: string, log: Logger): Promise<bo
"EX",
String(TTL_SECONDS),
])) as string | null;
if (res === "OK") return true;
if (res === "OK") {
// debug, not info: this fires once per non-duplicate delivery, too loud at
// info for a busy installation (issue #232 volume policy, mirroring
// github.api.request). duplicate_skipped/failed_open carry the operator signal.
log.debug(
{ deliveryId, event: IDEMPOTENCY_LOG_EVENTS.claimed },
"claimDelivery: claimed new webhook delivery",
);
Comment thread
chrisleekr marked this conversation as resolved.
return true;
}
log.info(
{ deliveryId, event: "dedup-skip" },
{ deliveryId, event: IDEMPOTENCY_LOG_EVENTS.duplicateSkipped },
"claimDelivery: duplicate webhook delivery, skipping",
);
return false;
} catch (err) {
log.warn(
{ deliveryId, err: err instanceof Error ? err.message : String(err) },
{
deliveryId,
event: IDEMPOTENCY_LOG_EVENTS.failedOpen,
reason: "error",
err: err instanceof Error ? err.message : String(err),
},
"claimDelivery: Valkey error, proceeding (fail-open)",
);
return true;
Expand Down
114 changes: 114 additions & 0 deletions test/webhook/idempotency-log-fields.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { describe, expect, it } from "bun:test";

import {
IDEMPOTENCY_LOG_EVENTS,
IdempotencyLogFieldsSchema,
} from "../../src/webhook/idempotency-log-fields";

describe("IDEMPOTENCY_LOG_EVENTS", () => {
it("pins the three canonical event strings", () => {
expect(IDEMPOTENCY_LOG_EVENTS.claimed).toBe("idempotency.claimed");
expect(IDEMPOTENCY_LOG_EVENTS.duplicateSkipped).toBe("idempotency.duplicate_skipped");
expect(IDEMPOTENCY_LOG_EVENTS.failedOpen).toBe("idempotency.failed_open");
});
});

describe("IdempotencyLogFieldsSchema: accepts well-formed events", () => {
it("accepts a valid idempotency.claimed record", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.claimed,
deliveryId: "d1",
});
expect(result.success).toBe(true);
});

it("accepts a valid idempotency.duplicate_skipped record", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.duplicateSkipped,
deliveryId: "d1",
});
expect(result.success).toBe(true);
});

it("accepts a valid idempotency.failed_open record with reason unavailable", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.failedOpen,
deliveryId: "d1",
reason: "unavailable",
});
expect(result.success).toBe(true);
});

it("accepts a valid idempotency.failed_open record with reason error and err", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.failedOpen,
deliveryId: "d1",
reason: "error",
err: "ECONNREFUSED",
});
expect(result.success).toBe(true);
});
});

describe("IdempotencyLogFieldsSchema: rejects drift and bad input", () => {
it("rejects an unknown extra field (strict)", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.claimed,
deliveryId: "d1",
surprise: "boo",
});
expect(result.success).toBe(false);
});

it("rejects an unknown event literal", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: "idempotency.bogus",
deliveryId: "d1",
});
expect(result.success).toBe(false);
});

it("rejects an invalid reason on failed_open", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.failedOpen,
deliveryId: "d1",
reason: "weird",
});
expect(result.success).toBe(false);
});

it("rejects a failed_open record missing reason", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.failedOpen,
deliveryId: "d1",
});
expect(result.success).toBe(false);
});

it("rejects err on the unavailable branch (err only valid with reason error)", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.failedOpen,
deliveryId: "d1",
reason: "unavailable",
err: "boom",
});
expect(result.success).toBe(false);
});

it("rejects a failed_open record with reason error but no err", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.failedOpen,
deliveryId: "d1",
reason: "error",
});
expect(result.success).toBe(false);
});

it("rejects an empty deliveryId", () => {
const result = IdempotencyLogFieldsSchema.safeParse({
event: IDEMPOTENCY_LOG_EVENTS.claimed,
deliveryId: "",
});
expect(result.success).toBe(false);
});
});
35 changes: 33 additions & 2 deletions test/webhook/idempotency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import { beforeEach, describe, expect, it, mock } from "bun:test";
import type { Logger } from "pino";

import { IdempotencyLogFieldsSchema } from "../../src/webhook/idempotency-log-fields";

// Configurable Valkey stub: the mock is wired once at module load (below);
// each test swaps `clientImpl` (and `healthy`) before invoking claimDelivery.
// getValkeyClient returns whatever clientImpl holds at call time.
Expand All @@ -26,11 +28,33 @@ void mock.module("../../src/orchestrator/valkey", () => ({

const { claimDelivery } = await import("../../src/webhook/idempotency");

// Recording logger: captures the structured field object of each emit and
// parses it through the canonical schema AT CAPTURE TIME, so every emitted line
// (not only those a test later asserts on) is held to the strict contract. A
// stray/misnamed field, a `reason` on the wrong event, or `err` on the
// `unavailable` path throws here, surfacing the offending test directly.
type Level = "debug" | "info" | "warn";
let logged: { level: Level; fields: Record<string, unknown> }[] = [];
function record(level: Level) {
return (fields: Record<string, unknown>) => {
IdempotencyLogFieldsSchema.parse(fields);
logged.push({ level, fields });
};
}
const log = {
warn: () => {},
info: () => {},
debug: record("debug"),
info: record("info"),
warn: record("warn"),
} as unknown as Logger;
Comment thread
chrisleekr marked this conversation as resolved.

// Assert an emit with `event` was captured (its schema validity is already
// guaranteed by the capture-time parse above) and return its fields.
function expectEmittedEvent(event: string): Record<string, unknown> {
const rec = logged.find((r) => r.fields.event === event);
expect(rec).toBeDefined();
return rec?.fields ?? {};
}

// SET-NX-EX semantics: first SET of a key returns "OK", a second SET of the
// same key returns null (the key already exists). Mirrors real Valkey NX.
function nxClient(): { send: SendFn; store: Set<string> } {
Expand All @@ -52,6 +76,7 @@ describe("claimDelivery (issue #202)", () => {
beforeEach(() => {
clientImpl = null;
healthy = true;
logged = [];
});

it("claims a new delivery once, then rejects the redelivery", async () => {
Expand All @@ -60,6 +85,8 @@ describe("claimDelivery (issue #202)", () => {
const second = await claimDelivery("delivery-abc", log);
expect(first).toBe(true);
expect(second).toBe(false);
expectEmittedEvent("idempotency.claimed");
expectEmittedEvent("idempotency.duplicate_skipped");
});

it("treats distinct deliveryIds independently", async () => {
Expand All @@ -83,13 +110,17 @@ describe("claimDelivery (issue #202)", () => {
it("fails OPEN (true) when Valkey is unconfigured (null client)", async () => {
clientImpl = null;
expect(await claimDelivery("delivery-no-valkey", log)).toBe(true);
expect(expectEmittedEvent("idempotency.failed_open").reason).toBe("unavailable");
});

it("fails OPEN (true) when the Valkey SET throws", async () => {
clientImpl = {
send: () => Promise.reject(new Error("ECONNREFUSED")),
};
expect(await claimDelivery("delivery-error", log)).toBe(true);
const failed = expectEmittedEvent("idempotency.failed_open");
expect(failed.reason).toBe("error");
expect(failed.err).toBe("ECONNREFUSED");
});

it("fails OPEN (true) without issuing SET when configured-but-disconnected", async () => {
Expand Down
Loading