diff --git a/CHANGELOG.md b/CHANGELOG.md
index affd152..f92560d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,60 @@
All notable changes to pi-taskflow are documented here. This project follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) format.
+## [0.0.28] — 2026-06-27
+
+> Granular-reuse release: **incremental recompute goes from whole-flow to
+> per-phase and per-item.** v0.0.27 *proved* the recompute cost win; this
+> release makes that win far larger and easier to opt into. Editing one phase
+> now invalidates only that phase and its transitive dependents (a sibling keeps
+> its cache hit), a `map` phase re-executes only the items that actually changed,
+> and a single `incremental` flag flips a whole flow into cross-run reuse without
+> annotating every phase.
+
+### Added
+- **Per-phase structural sub-fingerprint (`v3:phasefp`).** The cache key now
+ folds a per-phase fingerprint — the phase plus its transitive `dependsOn ∪ from`
+ closure — instead of the whole-flow `v2:flowdef` hash. Editing phase B
+ invalidates only B and its dependents; an independent sibling A keeps its hit.
+ `cacheKeys` emits a 4-tier read ladder (`v3:phasefp` write → `v2:flowdef` →
+ bare flowdef → legacy, all read-only) so the upgrade is additive — no
+ miss-storm for unchanged flows. Fail-open: any per-phase error degrades that
+ phase to the whole-flow hash. Soundness fallback to whole-flow when per-phase
+ invalidation can't be statically guaranteed (flow-wide `contextSharing`, any
+ `shareContext` phase in the closure, `join: "any"`, or sub-flow inner phases).
+ (`extensions/flowir/phasefp.ts`, `test/cache-phasefp.test.ts` — 11 tests.)
+- **Per-item cross-run caching for `map` phases.** When one of N items changes
+ between runs, only that item re-executes (N−1 cache hits) while the whole-map
+ fast path and every soundness fallback stay intact. Per-item keys omit the
+ structural fingerprint (which hashes the whole `over` source) so changing one
+ item no longer moves every key at once; they fold `[phase.id, it.agent, model,
+ it.task]` + the world-state tail, so task/agent/upstream/world changes still
+ invalidate the right items. Disabled (whole-map only) under run-only/off scope,
+ `shareContext`/flow-wide `contextSharing`, or inside a runtime-generated
+ sub-flow. (`test/cache-peritem.test.ts` — 11 tests.)
+- **`incremental` flag** — flow-level (`TaskflowSchema.incremental`) and
+ invocation-level (`run` tool arg). Defaults every phase to `scope:"cross-run"`
+ so re-running a flow reuses unchanged phases across runs/sessions, without
+ annotating each phase. The invocation arg wins over the flow field; per-phase
+ cache settings and the cross-run-blocked types (gate/approval/loop/tournament)
+ still take precedence; default remains the safe `run-only` (fresh each run).
+ (`resolveCacheScope` in `extensions/index.ts`, `test/incremental-flag.test.ts`.)
+- **Reuse reporting.** The end-of-run cache report and `/tf recompute` now show
+ reused-vs-executed counts and a per-phase "Why" trace (the explainable-
+ reactivity view: `▲ rerun / ✂ cutoff / ✓ reused / ✗ failed`, with `← causedBy`).
+ Dollar figures are reported only for within-run reuse, where the prior usage is
+ preserved; cross-run hits are counted but never attributed an invented saving.
+ (`summarizeReuse` / `RecomputeDecision` in `extensions/runtime.ts`,
+ `test/reuse-summary.test.ts`.)
+- Tests: 804 → 846 (+42).
+
+### Changed
+- **`phaseFingerprint` strips more policy fields** (`cache`, `retry`,
+ `concurrency`, `final`): none changes a phase's subagent *output*, so a no-op
+ config tweak no longer causes false cache invalidation.
+- **README** test count and feature line refreshed (804 → 846 across 46 files);
+ `per-item map caching` added to the headline capabilities.
+
## [0.0.27] — 2026-06-25
> Evidence release: **the incremental-recompute cost win is now proven, not
diff --git a/README.md b/README.md
index 34e6d21..86dcc34 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@
-
+
@@ -728,12 +728,12 @@ Copy one into `.pi/taskflows/.json` (or `~/.pi/agent/taskflows/`) and it r
-**0 runtime dependencies** · **804 tests** · **9 phase types** · **shared context tree** · **cross-session resume** · **cross-run memoization** · **incremental recompute** · **FlowIR compile seam** · **detached execution** · **`compile` Mermaid renderer** · **~9k LOC runtime**
+**0 runtime dependencies** · **846 tests** · **9 phase types** · **shared context tree** · **cross-session resume** · **cross-run memoization** · **per-item map caching** · **incremental recompute** · **FlowIR compile seam** · **detached execution** · **`compile` Mermaid renderer** · **~9k LOC runtime**
- **Zero runtime dependencies.** No `dependencies` field — the runtime is built entirely on Node built-ins (`fs` / `path` / `os` / `child_process` / `crypto`). The file lock is `fs.openSync("wx")`, not a third-party library.
-- **804 tests across 42 test files** covering concurrency, atomic file locking (8-process race regressions), path-traversal hardening, cross-session resume, cross-run cache freshness (flow/thinking/tools key isolation, fingerprint invalidation, TTL/LRU eviction), backward-compatible cache-key migration (3-tier legacy fallback), the FlowIR compile seam (determinism, declared-plane synthesis), incremental recompute (early-cutoff propagation, partial cascade strictly < full, observed ∪ declared union frontier), gate verdicts, budget caps, retry/backoff, approval flows, loop termination, tournament judging, sub-flow composition, the shared context tree (blackboard reuse, supervision spawn, subflow validation/nesting), workspace isolation (temp/dedicated/worktree lifecycle, fail-open degrade, dynamic-flow rejection), dynamic sub-flow security hardening, detached execution (PID persistence, stale detection, crash→failed, resume after failure), live run-history refresh, callback isolation, the idle watchdog, model-role init config, parseModelFromLabel with parenthesized-model-name regression, and multi-fence `safeParse` recovery, plus the `compile` Mermaid renderer (id-collision disambiguation, markdown-injection hardening, and full verify-overlay category coverage).
+- **846 tests across 46 test files** covering concurrency, atomic file locking (8-process race regressions), path-traversal hardening, cross-session resume, cross-run cache freshness (flow/thinking/tools key isolation, fingerprint invalidation, TTL/LRU eviction), backward-compatible cache-key migration (4-tier legacy fallback), per-phase structural sub-fingerprint (v3:phasefp — editing one phase invalidates only it and its dependents), per-item map caching (one changed item re-executes, N−1 cache hits), the `incremental` flag (run-wide cross-run default), reuse reporting, the FlowIR compile seam (determinism, declared-plane synthesis), incremental recompute (early-cutoff propagation, partial cascade strictly < full, observed ∪ declared union frontier), gate verdicts, budget caps, retry/backoff, approval flows, loop termination, tournament judging, sub-flow composition, the shared context tree (blackboard reuse, supervision spawn, subflow validation/nesting), workspace isolation (temp/dedicated/worktree lifecycle, fail-open degrade, dynamic-flow rejection), dynamic sub-flow security hardening, detached execution (PID persistence, stale detection, crash→failed, resume after failure), live run-history refresh, callback isolation, the idle watchdog, model-role init config, parseModelFromLabel with parenthesized-model-name regression, and multi-fence `safeParse` recovery, plus the `compile` Mermaid renderer (id-collision disambiguation, markdown-injection hardening, and full verify-overlay category coverage).
- **Hardened by design.** Path-traversal defense (lexical + `realpath` containment check), runId validation, HTML/error sanitization, atomic writes, stale-lock stealing via `rename`, and an idle watchdog that kills wedged subagents (SIGTERM → SIGKILL after 5 minutes of silence). Dynamic sub-flows additionally get breadth caps, `cwd` containment, budget clamping, nesting depth caps, and prototype-pollution defense.
- **Dogfooded.** Every new feature has to survive the project's own `self-improve` taskflow before it ships.
diff --git a/docs/internal/cache-migration.md b/docs/internal/cache-migration.md
index 2b6bf84..ee203fd 100644
--- a/docs/internal/cache-migration.md
+++ b/docs/internal/cache-migration.md
@@ -12,25 +12,55 @@ Before H1, the cache key folded the flow **definition** fingerprint under a bare
H1 versions the key with a `v2:` prefix and routes the fingerprint through the
FlowIR compile seam (`compileTaskflowToIR` → `flowDefHash`).
-To avoid a one-time miss-storm on upgrade, the runtime consults **three** keys
-on every cross-run lookup, read-only for the legacy tiers.
+M6 replaces the whole-flow `v2:flowdef:` tier with a **per-phase structural
+sub-fingerprint** (`v3:phasefp:`): the hash of a single phase plus its
+transitive dependency closure. Editing phase B now invalidates only B and its
+transitive dependents — independent sibling phase A keeps its cache hit.
-## Key shapes (H1)
+To avoid a one-time miss-storm on upgrade, the runtime consults **four** keys
+on every cross-run lookup, read-only for the fallback tiers.
-`cacheKeys()` (`extensions/runtime.ts`) returns three keys for a phase:
+## Key shapes (M6)
+
+`cacheKeys()` (`extensions/runtime.ts`) returns four keys for a phase:
| Tier | Shape | Written by | Status |
|------|-------|-----------|--------|
-| `key` (current) | `flow:` + `v2:flowdef:` + `` + `think/tools/ctx` + fingerprint | H1+ | **read + write** |
+| `key` (current) | `flow:` + `v3:phasefp:` + `` + `think/tools/ctx` + fingerprint | M6+ | **read + write** |
+| `v2Key` | `flow:` + `v2:flowdef:` + … | H1..M5 | **read-only** |
| `bareKey` | `flow:` + `flowdef:` (bare, unversioned) + … | pre-H1 | **read-only** (removed in v0.1.0) |
| `legacyKey` | `flow:` + … (flowdef line omitted) | pre-flowDefHash era | **read-only** (removed in v0.1.0) |
+### The per-phase sub-fingerprint (`v3:phasefp`)
+
+`phaseFingerprint(def, phaseId)` (`extensions/flowir/phasefp.ts`) hashes the
+phase itself plus its transitive `dependsOn ∪ from` closure, reusing the vendored
+`canonicalJson` + `hashCanonical` (byte-identical to overstory's contract). The
+`cache` policy field is stripped (its sub-fields reach the key via other paths);
+every other `Phase` field is hashed.
+
+**Soundness fallback.** Per-phase invalidation is only sound when a phase's real
+dependencies are fully captured by the static closure. `phaseFingerprint` returns
+`undefined` (→ the caller folds the whole-flow `flowDefHash` instead, preserving
+pre-M6 behavior) when:
+
+- the flow has `contextSharing: true`, OR
+- any phase in the closure (self included) has `shareContext: true`, OR
+- any phase in the closure (self included) has `type: "flow"`.
+
+These are the cases where a phase can read sibling state outside its declared
+deps (Shared Context Tree) or where sub-structure is resolved at runtime
+(`flow`). Sub-flow inner phases always use this fallback (their `phaseFp` is
+absent → `flowDefHash`), so editing one phase inside a sub-flow invalidates all
+sub-flow phases — a known, safe conservatism.
+
### Lookup order (`cachedPhase`)
1. within-run resume (`cc.prior.inputHash === keys.key`) — fastest, always allowed.
-2. `store.get(keys.key)` — current v2 entry.
-3. `store.get(keys.bareKey)` — pre-H1 bare entry.
-4. `store.get(keys.legacyKey)` — pre-flowDefHash entry.
+2. `store.get(keys.key)` — current v3 entry.
+3. `store.get(keys.v2Key)` — pre-M6 v2 entry.
+4. `store.get(keys.bareKey)` — pre-H1 bare entry.
+5. `store.get(keys.legacyKey)` — pre-flowDefHash entry.
A hit on **any** tier is restored as a `cacheHit: "cross-run"` result with zero
usage. The restored `PhaseState.inputHash` is always `keys.key` (the current
@@ -38,37 +68,48 @@ shape), so downstream phases and recompute see a consistent identity.
### Write policy (`recordCache`)
-Only `keys.key` (the current v2 shape) is ever written. Legacy/bare hits are
+Only `keys.key` (the current v3 shape) is ever written. v2/bare/legacy hits are
**not** write-through: re-storing under the new key would double the cache size
-for no benefit. Legacy/bare entries age out naturally via the 90-day hard cap
+for no benefit. Legacy/bare/v2 entries age out naturally via the 90-day hard cap
(`DEFAULT_MAX_AGE_MS`) and the LRU cap (`DEFAULT_MAX_ENTRIES`).
-## Why three tiers?
-
-- **`v2:flowdef:` (current):** the versioned prefix lets a future genuine
- overstory compiler advance to `v3:flowIR:` with its own fallback tier,
- without disturbing v2 entries.
-- **bare `flowdef:` (pre-H1):** pre-H1 code wrote this shape. Without the 3rd
- tier, every existing cross-run entry would silently miss on upgrade — a
- one-time miss-storm for opt-in cross-run users.
+## Why four tiers?
+
+- **`v3:phasefp:` (current):** the per-phase structural sub-fingerprint enables
+ precise invalidation — editing one phase no longer evicts independent
+ siblings. The versioned prefix lets a future genuine overstory compiler
+ advance to `v4:flowIR:` with its own fallback tier, without disturbing v3.
+- **`v2:flowdef:` (pre-M6):** M5-and-earlier code wrote this whole-flow shape.
+ Without this tier, every existing cross-run entry would silently miss on the
+ M6 upgrade — a one-time miss-storm for opt-in cross-run users.
+- **bare `flowdef:` (pre-H1):** pre-H1 code wrote this shape. Retained for
+ completeness.
- **no-flowdef (pre-flowDefHash):** the very earliest cross-run entries, before
the flow definition was folded into the key at all. Retained for completeness;
these are rare.
+### Upgrade note (one-time cost)
+
+On the first post-M6 run, if a sibling phase was edited between the last
+pre-M6 run and the upgrade, an *unchanged* independent phase may re-execute
+once: its v2 entry was keyed on the old `flowDefHash`, which no longer matches.
+This is bounded (per-flow, one-time, only when a sibling edit happened) and
+amortized over subsequent runs as v3 entries take over. For unchanged flows the
+v2 tier hits and no re-execution occurs.
+
## Retirement
-- **v0.1.0:** remove the `bareKey` and `legacyKey` tiers and the `CacheKeys`
- return to a single `key`. By then all pre-H1 entries will have aged out (90-day
- hard cap). The `v2:` prefix is retained as the version anchor for the *next*
- migration.
-- A pre-release verification step: inspect a real `.pi/taskflow/cache/` directory
- for bare-`flowdef:` entries. If cross-run is confirmed unused in production
- (opt-in, young), the bare tier can be dropped earlier.
+- **v0.1.0:** remove the `bareKey` and `legacyKey` tiers. By then all pre-H1
+ entries will have aged out (90-day hard cap).
+- **Later:** remove the `v2Key` tier once all pre-M6 entries have aged out.
+- The `v3:` prefix is retained as the version anchor for the *next* migration.
## See also
- `extensions/flowir/hash.ts` — the vendored overstory hash algorithm.
+- `extensions/flowir/phasefp.ts` — the per-phase structural sub-fingerprint.
- `extensions/flowir/index.ts` — `compileTaskflowToIR` (the seam that produces
- `hash` and `meta.declaredDeps`).
+ `hash` and `meta.declaredDeps`) and `phaseFingerprint`.
- `docs/internal/overstory-convergence-roadmap.md` §3 (M1).
- `test/cache-migration.test.ts` — the migration contract tests.
+- `test/cache-phasefp.test.ts` — the per-phase sub-fingerprint contract tests.
diff --git a/extensions/flowir/index.ts b/extensions/flowir/index.ts
index f5f8962..e061559 100644
--- a/extensions/flowir/index.ts
+++ b/extensions/flowir/index.ts
@@ -71,3 +71,5 @@ export type {
TaskflowIR,
TaskflowIRMeta,
} from "./meta.ts";
+
+export { phaseFingerprint } from "./phasefp.ts";
diff --git a/extensions/flowir/phasefp.ts b/extensions/flowir/phasefp.ts
new file mode 100644
index 0000000..02eda69
--- /dev/null
+++ b/extensions/flowir/phasefp.ts
@@ -0,0 +1,121 @@
+/**
+ * Per-phase structural sub-fingerprint (M6).
+ *
+ * `phaseFingerprint` produces a content-addressed hash of ONLY the subset of
+ * the flow definition that can affect a single phase's subagent output: the
+ * phase itself plus its transitive dependency closure. Folding this into the
+ * cross-run cache key (instead of the whole-flow `flowDefHash`) means editing
+ * phase B invalidates only B and its transitive dependents — independent
+ * sibling phase A keeps its cache hit.
+ *
+ * ## Soundness (the fallback gate)
+ *
+ * Per-phase invalidation is only sound when a phase's *real* dependencies are
+ * fully captured by the static `dependsOn ∪ from` closure. Three cases break
+ * that guarantee, so `phaseFingerprint` returns `undefined` for them and the
+ * caller falls back to the whole-flow `flowDefHash` (safe, = pre-M6 behavior):
+ *
+ * 1. **Shared Context Tree** (`def.contextSharing === true` or any closure
+ * member has `shareContext === true`): a sharing phase can read sibling
+ * blackboard writes OUTSIDE its declared deps, so the static closure
+ * under-approximates real reads.
+ * 2. **`flow` phase in the closure** (`type === "flow"`): a `flow` phase's
+ * sub-structure is resolved at runtime (inline `def`) or from a saved
+ * flow (`use`) and is not statically visible here. Editing the saved
+ * sub-flow would not move this phase's sub-fingerprint.
+ * 3. **`join: "any"` phase** (`phase.join === "any"`): validation exempts it
+ * from the `{steps.X}`-must-be-in-`dependsOn` check, so it may read
+ * phases outside its static closure. The closure under-approximates its
+ * real reads, so fall back to whole-flow invalidation.
+ *
+ * `cache`, `retry`, `concurrency`, and `final` are stripped from each phase
+ * before hashing: none of them changes the subagent's OUTPUT (they are policy,
+ * execution mechanics, or result selection). `cache`'s sub-fields
+ * (`scope`/`ttl`/`fingerprint`) reach the cache key through other paths
+ * (`cc.scope` gates the lookup, `cc.ttlMs` governs expiry, `cc.fingerprint` is
+ * in the key tail). Every other `Phase` field is hashed. `PhaseSchema` uses
+ * `additionalProperties: false`, so no surprise field can be missed.
+ *
+ * Pure + async (Web Crypto via `hashCanonical`). Reuses the vendored
+ * `canonicalJson`/`hashCanonical` (byte-identical to overstory's contract) so
+ * the sub-fingerprint shares one hashing contract with `flowDefHash`. Never
+ * throws — callers wrap in try/catch and degrade to `flowDefHash`.
+ *
+ * @see docs/internal/cache-migration.md (v3:phasefp tier)
+ */
+
+import { transitiveDependencies, type Phase, type Taskflow } from "../schema.ts";
+import { canonicalJson, hashCanonical } from "./hash.ts";
+
+/** Fields stripped before hashing because they do NOT affect a phase's
+ * subagent OUTPUT, only execution mechanics or result selection — folding
+ * them in would cause false cache invalidation on a no-op config change:
+ * - `cache`: policy object; its sub-fields reach the key via
+ * `cc.scope`/`cc.ttlMs`/`cc.fingerprint`.
+ * - `retry`: retry/backoff is execution mechanics; a successful phase
+ * produces the same output regardless of how many attempts it took.
+ * - `concurrency`: fan-out parallelism; does not change any item's output.
+ * - `final`: marks which phase's output is the flow result; does not change
+ * the phase's own output. */
+const PHASE_FP_STRIP = ["cache", "retry", "concurrency", "final"] as const;
+
+/** Clone a phase into a plain record with policy fields removed. */
+function stripPolicy(phase: Phase): Record {
+ const rec = phase as unknown as Record;
+ const out: Record = {};
+ for (const k of Object.keys(rec)) {
+ if ((PHASE_FP_STRIP as readonly string[]).includes(k)) continue;
+ out[k] = rec[k];
+ }
+ return out;
+}
+
+/**
+ * Per-phase structural sub-fingerprint.
+ *
+ * @returns the hex hash, or `undefined` when per-phase soundness cannot be
+ * guaranteed (caller falls back to the whole-flow `flowDefHash`). Never
+ * throws.
+ */
+export async function phaseFingerprint(def: Taskflow, phaseId: string): Promise {
+ const phases = def.phases as Phase[];
+ const byId = new Map(phases.map((p) => [p.id, p]));
+ const phase = byId.get(phaseId);
+ if (!phase) return undefined;
+
+ // --- Soundness gate: fall back to whole-flow when static closure is unsafe. ---
+ // Flow-wide context sharing enables cross-sibling reads outside declared deps.
+ if (def.contextSharing === true) return undefined;
+ // A `join: "any"` phase may interpolate `{steps.X.*}` refs to phases OUTSIDE
+ // its declared dependsOn (validation deliberately exempts it — schema.ts), so
+ // the static closure under-approximates its real reads. Fall back to
+ // whole-flow invalidation rather than rely on the key tail alone (which would
+ // be an undocumented coupling). Safe, = pre-M6 behavior.
+ if (phase.join === "any") return undefined;
+
+ const closureIds = transitiveDependencies(phases, phaseId);
+ const closurePhases: Phase[] = [];
+ for (const id of closureIds) {
+ const p = byId.get(id);
+ if (!p) continue; // unknown dep — validation reports elsewhere
+ // Per-phase sharing: this closure member can read sibling blackboard
+ // writes outside its own declared deps.
+ if (p.shareContext === true) return undefined;
+ // A flow phase's sub-structure is runtime/saved-flow-resolved and not
+ // statically visible — editing it would not move the sub-fingerprint.
+ if ((p.type ?? "agent") === "flow") return undefined;
+ closurePhases.push(p);
+ }
+ // The self phase's own sharing/type is part of the closure too.
+ if (phase.shareContext === true) return undefined;
+ if ((phase.type ?? "agent") === "flow") return undefined;
+
+ // --- Build the canonical payload. ---
+ // `deps` is the SORTED transitive closure (self excluded). canonicalJson
+ // sorts OBJECT keys but preserves ARRAY order, so we sort the array
+ // explicitly for determinism independent of dependency walk order.
+ const depsPayload = closurePhases.map((p) => ({ id: p.id, def: stripPolicy(p) }));
+ const payload = { self: stripPolicy(phase), deps: depsPayload };
+
+ return hashCanonical(canonicalJson(payload));
+}
diff --git a/extensions/index.ts b/extensions/index.ts
index 984a63f..d87c39f 100644
--- a/extensions/index.ts
+++ b/extensions/index.ts
@@ -28,7 +28,7 @@ import { type AgentScope, discoverAgents, readSubagentSettings, shouldSyncBuilti
import { renderRunResult, summarizeRun } from "./render.ts";
import { RunHistoryComponent, type RunHistoryResult } from "./runs-view.ts";
import { ApprovalViewComponent, type ApprovalChoice } from "./approval-view.ts";
-import { executeTaskflow, recomputeTaskflow, type ApprovalDecision, type ApprovalRequest, type RecomputeReport, type RuntimeDeps, type RuntimeResult } from "./runtime.ts";
+import { executeTaskflow, recomputeTaskflow, summarizeReuse, type ApprovalDecision, type ApprovalRequest, type RecomputeReport, type RuntimeDeps, type RuntimeResult } from "./runtime.ts";
import { type UsageStats } from "./usage.ts";
import { finalPhase, resolveArgs, type Taskflow, validateTaskflow, desugar, isShorthand } from "./schema.ts";
import {
@@ -150,6 +150,12 @@ const TaskflowParams = Type.Object({
description: "Run in background (detached child process); return runId immediately. Status polled via store.",
}),
),
+ incremental: Type.Optional(
+ Type.Boolean({
+ description:
+ "For action=run: default every phase to cross-run caching so re-running the flow reuses unchanged phases across runs/sessions (incremental recompute). Overrides the flow's own `incremental` field. Per-phase cache settings and cross-run-blocked types (gate/approval/loop/tournament) still take precedence. Omit to use the flow's setting (default: run-only — fresh each run).",
+ }),
+ ),
});
function formatFlowIR(ir: TaskflowIR): string {
@@ -225,6 +231,17 @@ function formatRecompute(r: RecomputeReport): string {
if (r.cutoff.length > 0) lines.push(` → saved ${r.cutoff.length} re-execution(s).`);
}
lines.push(`✓ reused (outside frontier): ${r.reused.join(", ") || "—"}`);
+ // Per-phase "why" — the explainable-reactivity trace (like React DevTools
+ // telling you why each component re-rendered). Only shown when present.
+ if (r.decisions && r.decisions.length > 0) {
+ const glyph: Record = { rerun: "▲", cutoff: "✂", reused: "✓", failed: "✗" };
+ lines.push("");
+ lines.push("Why:");
+ for (const d of r.decisions) {
+ const cause = d.causedBy && d.causedBy.length ? ` ← ${d.causedBy.join(", ")}` : "";
+ lines.push(` ${glyph[d.outcome] ?? "•"} ${d.phaseId}: ${d.reason}${cause}`);
+ }
+ }
return lines.join("\n");
}
@@ -242,6 +259,18 @@ function makeRunState(def: Taskflow, args: Record, cwd: string)
};
}
+/** Resolve the run-wide default cache scope from the incremental flags. The
+ * invocation-level override (the `incremental` tool arg) wins; otherwise the
+ * flow's own `incremental` field; otherwise the safe `run-only` default
+ * (each run starts fresh — cross-run reuse is opt-in). Exported for testing. */
+export function resolveCacheScope(
+ incrementalOverride: boolean | undefined,
+ flowIncremental: boolean | undefined,
+): "cross-run" | "run-only" {
+ const on = typeof incrementalOverride === "boolean" ? incrementalOverride : flowIncremental;
+ return on === true ? "cross-run" : "run-only";
+}
+
async function runFlow(
def: Taskflow,
args: Record,
@@ -249,6 +278,9 @@ async function runFlow(
signal: AbortSignal | undefined,
onUpdate: ((p: AgentToolResult) => void) | undefined,
existing?: RunState,
+ // Invocation-level incremental override: when set, wins over def.incremental.
+ // undefined → fall back to the flow's own `incremental` field (default off).
+ incrementalOverride?: boolean,
): Promise {
const state = existing ?? makeRunState(def, args, ctx.cwd);
@@ -374,11 +406,15 @@ async function runFlow(
persist: persistThrottled,
requestApproval,
loadFlow: (name: string) => getFlow(ctx.cwd, name)?.def,
- // Cross-run cache is opt-in per phase (cache:{scope:"cross-run"}).
- // Defaulting every real run to cross-run was reviewed out: it silently
- // persists phase outputs and can serve stale results for phases whose
- // agents read files at runtime (those files are not in the cache key).
- cacheScopeDefault: "run-only",
+ // Cross-run cache is opt-in. By default a real run is `run-only` (fresh
+ // each run): defaulting every phase to cross-run silently persists
+ // outputs and can serve stale results for phases whose agents read files
+ // at runtime (those files are not in the cache key). A user opts in
+ // explicitly — the invocation `incremental` arg wins, else the flow's
+ // own `incremental` field, else the safe run-only default. All the
+ // soundness fallbacks (blocked types, per-phase fingerprint, shareContext)
+ // still apply per phase inside executePhase.
+ cacheScopeDefault: resolveCacheScope(incrementalOverride, def.incremental),
});
// Auto-report cache savings at the end of a real run so the user sees the
// M1-M5 effect without running a separate /tf command.
@@ -958,7 +994,7 @@ export default function (pi: ExtensionAPI) {
};
}
- const result = await runFlow(def, args, ctx, signal, onUpdate as any);
+ const result = await runFlow(def, args, ctx, signal, onUpdate as any, undefined, params.incremental as boolean | undefined);
// Surface the validation warnings in the tool result so the model
// can acknowledge or fix them, and the user sees them in the chat.
if (v.warnings.length) {
@@ -1399,15 +1435,18 @@ function errorResult(action: string, message: string): ToolResult {
};
}
-function formatCacheReport(state: RunState, totalUsage: UsageStats): string {
- const cached = Object.values(state.phases).filter((p) => p.cacheHit === "cross-run");
- if (cached.length === 0) return "";
- // Honest reporting: we know these phases spent 0 tokens *this run* because
- // they were served from cache. We do NOT estimate dollars/tokens "saved" —
- // that requires guessing what a re-execution would have cost, and the mix of
- // cheap vs expensive phases (tournament/loop) makes such a guess misleading.
- const cachedTokens = cached.reduce((sum, p) => sum + ((p.usage?.input ?? 0) + (p.usage?.output ?? 0)), 0);
- return `💾 ${cached.length} phase(s) reused from cross-run cache (${cachedTokens.toLocaleString()} tokens spent on them this run)`;
+function formatCacheReport(state: RunState, _totalUsage: UsageStats): string {
+ const r = summarizeReuse(state);
+ const reused = r.reusedRunOnly + r.reusedCrossRun;
+ if (reused === 0) return ""; // nothing reused — no incremental story to tell
+ // Honest framing: report reused-vs-executed counts, and a dollar figure only
+ // for within-run reuse (where the prior usage is preserved). Cross-run hits
+ // zero their usage, so their original cost is genuinely unknown — we say
+ // "reused" without inventing a savings number for them.
+ const parts: string[] = [`♻️ ${reused}/${r.done} phase(s) reused (${r.executed} executed this run)`];
+ if (r.savedUSD > 0) parts.push(`~$${r.savedUSD.toFixed(4)} of re-execution avoided`);
+ if (r.reusedCrossRun > 0) parts.push(`${r.reusedCrossRun} from cross-run cache`);
+ return parts.join(" · ");
}
function finalResult(action: string, result: RuntimeResult): ToolResult {
diff --git a/extensions/runtime.ts b/extensions/runtime.ts
index 351b346..d8d8749 100644
--- a/extensions/runtime.ts
+++ b/extensions/runtime.ts
@@ -20,7 +20,7 @@ import { type Budget, type CacheScope, dependenciesOf, finalPhase, LOOP_DEFAULT_
import { verifyTaskflow } from "./verify.ts";
import { hashInput, newRunId, type PhaseState, type RunState, runsDir } from "./store.ts";
import { CacheStore, resolveFingerprint } from "./cache.ts";
-import { compileTaskflowToIR } from "./flowir/index.ts";
+import { compileTaskflowToIR, phaseFingerprint } from "./flowir/index.ts";
import { computeStaleFrontier, declaredReadMapOfDef, readMapOf } from "./stale.ts";
import { ctxDirFor, drainPendingSpawns, initCtxDir, registerNode, setNodeStatus, type SpawnAssignment } from "./context-store.ts";
import { allocateWorkspace, isWorkspaceKeyword, type Workspace } from "./workspace.ts";
@@ -72,6 +72,55 @@ export interface RuntimeResult {
finalOutput: string;
ok: boolean;
totalUsage: UsageStats;
+ /** Incremental-reuse summary: how many phases were reused from cache vs.
+ * freshly executed this run, and the cost the reused work would otherwise
+ * have incurred (known only for within-run resume; cross-run hits zero
+ * their usage so their original cost is not recoverable). Optional &
+ * additive — callers that ignore it are unaffected. */
+ reuse?: ReuseSummary;
+}
+
+/** A run's incremental-reuse accounting (see RuntimeResult.reuse). */
+export interface ReuseSummary {
+ /** Phases that completed by executing a subagent this run. */
+ executed: number;
+ /** Phases served from the within-run resume cache (no new tokens). */
+ reusedRunOnly: number;
+ /** Phases restored from the cross-run store (no new tokens). */
+ reusedCrossRun: number;
+ /** Total phases that reached `done` (executed + reused). */
+ done: number;
+ /** USD the within-run-reused phases would have cost if re-executed (their
+ * preserved prior usage). Cross-run hits are excluded (cost not recoverable). */
+ savedUSD: number;
+}
+
+/** Compute the incremental-reuse summary from a run's terminal phase states.
+ * Pure, total, never throws. A phase is "reused" iff it carries a `cacheHit`
+ * marker (set by `cachedPhase` for both within-run resume and cross-run hits). */
+export function summarizeReuse(state: RunState): ReuseSummary {
+ let executed = 0;
+ let reusedRunOnly = 0;
+ let reusedCrossRun = 0;
+ let savedUSD = 0;
+ for (const ps of Object.values(state.phases)) {
+ if (ps.status !== "done") continue;
+ if (ps.cacheHit === "run-only") {
+ reusedRunOnly++;
+ savedUSD += ps.usage?.cost ?? 0; // within-run resume preserves prior usage
+ } else if (ps.cacheHit === "cross-run") {
+ reusedCrossRun++; // cross-run hits zero their usage — cost not recoverable
+ } else {
+ executed++;
+ }
+ }
+ return {
+ executed,
+ reusedRunOnly,
+ reusedCrossRun,
+ done: executed + reusedRunOnly + reusedCrossRun,
+ savedUSD,
+ };
}
function buildInterpolationContext(
@@ -120,6 +169,31 @@ function resultToPhaseState(id: string, r: RunResult, inputHash: string, parseJs
};
}
+/**
+ * Synthesize a 0-token `RunResult` from a cached per-item `PhaseState` so a
+ * cross-run per-item cache hit flows through `mergePhaseState` as a normal
+ * successful fan-out item. `stopReason: "cache-hit"` is NOT in `isFailed`'s
+ * failure set (only "error"/"aborted"/non-zero exit), so the item counts as
+ * success. Usage is `emptyUsage()` — a cached item spent no new tokens this
+ * run, so `mergePhaseState`'s `aggregateUsage` charges nothing for it.
+ *
+ * Used only by the `map` per-item cache path (see `runFanout`). Fail-open by
+ * construction: this is only reached AFTER a successful `cachedPhase` lookup,
+ * so `ps.output` is always present.
+ */
+function phaseStateToRunResult(ps: PhaseState, it: { agent: string; task: string }): RunResult {
+ return {
+ agent: it.agent,
+ task: it.task,
+ exitCode: 0,
+ output: ps.output ?? "",
+ stderr: "",
+ usage: emptyUsage(),
+ model: ps.model,
+ stopReason: "cache-hit",
+ };
+}
+
/** Convert observed read refs (e.g. "steps.scout.output") into a structured
* readSet keyed by upstream phase id, tagging each with the version
* (= inputHash) that was current when read. Only `steps.*` refs are upstream
@@ -277,12 +351,20 @@ function mergePhaseState(
const model = ran.find((r) => r.model !== undefined)?.model;
// Combine outputs as a labelled list; also expose a JSON array of outputs.
// For failed items, use the error message instead of the useless placeholder.
- const combinedText = ran
+ // Labels are positionally aligned to the ORIGINAL `over` array: we iterate
+ // over ALL results (including budget-skipped, which are filtered to null) and
+ // use `results.length` as N, so item k's label reads `[k/N]` matching its
+ // position in `over` — not its rank among non-skipped items. Per-item cache
+ // hits (`stopReason: "cache-hit"`) are not budget-skipped, so they keep their
+ // original positional label.
+ const combinedText = results
.map((r, i) => {
- const label = `### [${i + 1}/${ran.length}] ${r.agent}${isFailed(r) ? " (failed)" : ""}`;
+ if (r.stopReason === "budget-skipped") return null;
+ const label = `### [${i + 1}/${results.length}] ${r.agent}${isFailed(r) ? " (failed)" : ""}`;
const content = isFailed(r) ? (r.errorMessage || r.stderr || r.output) : r.output;
return `${label}\n\n${content}`;
})
+ .filter((x): x is string => x !== null)
.join("\n\n---\n\n");
// Only successful runs feed the parsed JSON array (no error/skip strings).
const jsonArray = parseJson ? ran.filter((r) => !isFailed(r)).map((r) => safeParse(r.output) ?? r.output) : undefined;
@@ -721,6 +803,7 @@ async function executePhaseInner(
flowName: state.flowName,
runId: state.runId,
flowDefHash: state.flowDefHash === "failed" ? undefined : state.flowDefHash,
+ phaseFp: state.phaseFingerprints?.[phase.id],
forceRerun: opts?.forceRerun,
thinking: phase.thinking,
tools: phase.tools,
@@ -820,7 +903,14 @@ async function executePhaseInner(
const parseJson = phase.output === "json";
// Runs a list of sub-tasks with live fan-out progress + aggregate live usage/activity.
- const runFanout = async (items: Array<{ agent: string; task: string }>): Promise => {
+ // `perItem` (map only) enables per-item cross-run caching: each item is looked
+ // up in the cache before spawning a subagent, and a successful fresh item is
+ // recorded so a later run with that item unchanged hits per-item. When
+ // `perItem` is undefined (parallel, or non-cacheable maps) the path is inert.
+ const runFanout = async (
+ items: Array<{ agent: string; task: string }>,
+ perItem?: { keyOf: (idx: number) => CacheKeys | null; cc: PhaseCacheCtx },
+ ): Promise => {
let done = 0;
let running = 0;
let failed = 0;
@@ -854,6 +944,28 @@ async function executePhaseInner(
stopReason: "budget-skipped",
} satisfies RunResult;
}
+ // Per-item cross-run cache lookup (map only). A hit synthesizes a 0-token
+ // RunResult and returns immediately — the item never spawns a subagent and
+ // never reaches the ctx_spawn drain below (a cached item can't have queued
+ // new spawns). Fail-open: any error in the lookup path degrades to executing.
+ if (perItem) {
+ try {
+ const ckItem = perItem.keyOf(idx);
+ if (ckItem) {
+ const hit = cachedPhase(perItem.cc, ckItem);
+ if (hit) {
+ done++;
+ const synth = phaseStateToRunResult(hit, it);
+ liveUsages[idx] = emptyUsage();
+ if (hit.model) latestModel = hit.model;
+ refresh();
+ return synth;
+ }
+ }
+ } catch {
+ /* fail-open: a cache read error must never sink the item */
+ }
+ }
running++;
refresh();
if (ctxDir) {
@@ -869,6 +981,23 @@ async function executePhaseInner(
done++;
if (isFailed(r)) failed++;
liveUsages[idx] = r.usage;
+ // Per-item cross-run cache record (map only): persist a successful fresh
+ // item so a later run with this item unchanged hits per-item instead of
+ // re-running. Failed and budget-skipped items are never cached (a stale
+ // failure would be served on the next run). Fail-open: a write error never
+ // sinks the item — the fresh `r` is already in hand and flows downstream.
+ if (perItem && !isFailed(r) && r.stopReason !== "budget-skipped") {
+ try {
+ const ckItem = perItem.keyOf(idx);
+ if (ckItem) {
+ const ccItem: PhaseCacheCtx = { ...perItem.cc, phaseId: `${phase.id}#item${idx}` };
+ const itemPs = resultToPhaseState(`${phase.id}#item${idx}`, r, ckItem.key, parseJson);
+ recordCache(ccItem, itemPs);
+ }
+ } catch {
+ /* fail-open: cache write must never sink the item */
+ }
+ }
if (ctxDir) {
try {
const itemNid = nodeIdFor(String(idx));
@@ -1068,12 +1197,59 @@ async function executePhaseInner(
task: preRead + interpolate(phase.task ?? "", localCtx).text,
};
});
+ // Per-item caching is sound ONLY when ALL of:
+ // - cross-run scope: run-only has no persistent store, so per-item entries
+ // could never be re-read (no point keying them).
+ // - no Shared Context Tree (`!sharing`): a sharing map item can read sibling
+ // blackboard writes OUTSIDE its declared deps, so the per-item key (which
+ // folds only the item's own task) under-approximates real reads and could
+ // serve a stale result. Fall back to whole-map.
+ // - not inside a runtime-generated sub-flow (`def:` frame in the stack):
+ // such flows are untrusted / possibly non-deterministic, so per-item reuse
+ // is unsafe. Fall back to whole-map (which still applies breadth caps).
+ // `undefined phaseFingerprint` is NOT a blocker for soundness — it is a
+ // DELIBERATE design choice: per-item keys omit BOTH phaseFp and flowDefHash
+ // (via ccPerItem below) so a changing `over` cannot move unchanged items'
+ // keys. See ccPerItem for the full soundness argument.
+ const perItemCacheable =
+ cc.scope === "cross-run" &&
+ !sharing &&
+ !(deps._stack ?? []).some((s) => s.startsWith("def:"));
+ // Per-item cache context: structural fingerprints (phaseFp + flowDefHash)
+ // are OMITTED so a changing `over` cannot move unchanged items' keys. Both
+ // fingerprints hash `over` (the array source); folding either into a
+ // per-item key means editing one item invalidates EVERY per-item key at
+ // once (no partial reuse) — the bug fixed here. A single item's output is
+ // fully specified by `it.task` (template + {item}/{as} value + any
+ // upstream-output refs + args) + `it.agent` + model + thinking/tools/preRead
+ // + the world-state `fingerprint`; `over` only determines WHICH items
+ // exist, not WHAT any item computes. `flowName` is retained for cross-flow
+ // collision prevention. Soundness: docs/internal/cache-migration.md.
+ // NB: perItemCacheable already gates on scope === "cross-run", which is
+ // blocked upstream when flowDefHash === "failed", so ccPerItem is only
+ // built when flowDefHash is a real hash (or already undefined) — setting
+ // it to undefined here is a safe no-op for the failed case.
+ const ccPerItem: PhaseCacheCtx = { ...cc, phaseFp: undefined, flowDefHash: undefined };
+ // Pre-compute per-item CacheKeys once so the lookup and the record path use
+ // the IDENTICAL key (built from ccPerItem, NOT the whole-phase cc). The
+ // per-item key folds `it.agent` (Arbiter fix): a different agent means
+ // different output, so a per-item key WITHOUT the agent could serve a stale
+ // cross-agent hit when only `phase.agent` changed (the whole-map key would
+ // correctly miss via JSON.stringify(tasks), but per-item keys would not).
+ const perItemKeys: (CacheKeys | null)[] = perItemCacheable
+ ? tasks.map((it) => cacheKeys(ccPerItem, [phase.id, it.agent, phase.model ?? "", it.task]))
+ : tasks.map(() => null);
+ const perItem = perItemCacheable
+ ? { keyOf: (idx: number): CacheKeys | null => perItemKeys[idx] ?? null, cc: ccPerItem }
+ : undefined;
+ // Whole-map key keeps the FULL cc (phaseFp + flowDefHash) so its fast path
+ // and any pre-existing whole-map entries are unchanged (backward compat).
const ck = cacheKeys(cc, [phase.id, phase.model ?? "", JSON.stringify(tasks)]);
const inputHash = ck.key;
const cached = cachedPhase(cc, ck);
if (cached) return cached;
- const results = await runFanout(tasks);
+ const results = await runFanout(tasks, perItem);
const ps = mergePhaseState(phase.id, results, inputHash, parseJson);
if (readRefs.length) ps.reads = readRefsToReads(readRefs, state);
if (mapTruncated) {
@@ -1635,6 +1811,12 @@ export interface PhaseCacheCtx {
* key so two structurally-different flows that share a name can never
* collide, and a changed flow never serves a stale cross-run hit. */
flowDefHash?: string | "failed";
+ /** Per-phase structural sub-fingerprint (M6). When present, folds into the
+ * key as `v3:phasefp:` so editing phase B invalidates only B + its
+ * transitive dependents. When absent (sub-flow inner states, or a phase
+ * for which per-phase soundness couldn't be guaranteed), `cacheKeys`
+ * falls back to `flowDefHash` — preserving pre-M6 whole-flow behavior. */
+ phaseFp?: string;
/** Force this phase to re-execute, ignoring the within-run prior AND the
* cross-run store (M5 recompute seed). Downstream phases are NOT forced —
* they re-evaluate naturally: if the seed's new output changed their
@@ -1646,27 +1828,34 @@ export interface PhaseCacheCtx {
/** A computed cache identity: the new (versioned) key plus the read-only
* fallback keys used to honor entries written by older releases. The `key`
* is what we WRITE under and what `PhaseState.inputHash` carries; the
- * `legacyKey`/`bareKey` are consulted READ-ONLY on a miss so an upgrade
- * never produces a miss-storm. See docs/internal/cache-migration.md. */
+ * `v2Key`/`bareKey`/`legacyKey` are consulted READ-ONLY on a miss so an
+ * upgrade never produces a miss-storm. See docs/internal/cache-migration.md. */
export interface CacheKeys {
- /** Current key: folds `v2:flowdef:` (the overstory content fingerprint). */
+ /** Current key: folds `v3:phasefp:` (the per-phase structural
+ * sub-fingerprint; degrades to the whole-flow hash when per-phase
+ * soundness couldn't be guaranteed). */
key: string;
- /** Pre-flowDefHash-era key: the flowdef line OMITTED entirely. Read-only. */
- legacyKey: string;
+ /** Pre-M6 key: `v2:flowdef:` (whole-flow fingerprint).
+ * Read-only. */
+ v2Key: string;
/** Bare (unversioned) `flowdef:` key — written by pre-H1 code that folded
* the hash without a `v2:` prefix. Read-only. Removed in v0.1.0. */
bareKey: string;
+ /** Pre-flowDefHash-era key: the flowdef line OMITTED entirely. Read-only. */
+ legacyKey: string;
}
/** Fold the phase fingerprint into the base hash parts to form the cache keys.
*
- * Three keys are produced for backward compatibility (see
+ * Four keys are produced for backward compatibility (see
* docs/internal/cache-migration.md):
- * - `key` : `v2:flowdef:` — the current write key.
+ * - `key` : `v3:phasefp:` — the current write key (per-phase
+ * structural sub-fingerprint; falls back to the whole-flow hash when
+ * `cc.phaseFp` is absent).
+ * - `v2Key` : `v2:flowdef:` — pre-M6 whole-flow key.
+ * - `bareKey` : bare `flowdef:` (unversioned) — pre-H1 entries.
* - `legacyKey`: the flowdef line omitted — pre-flowDefHash entries.
- * - `bareKey` : bare `flowdef:` (unversioned) — pre-H1 entries that
- * folded the hash without the `v2:` prefix.
- * `cachedPhase` consults all three READ-ONLY on a miss; `recordCache` writes
+ * `cachedPhase` consults all four READ-ONLY on a miss; `recordCache` writes
* only `key`. This means an upgrade never produces a miss-storm: existing
* entries (whichever shape) still hit, and new writes converge on `key`. */
export function cacheKeys(cc: PhaseCacheCtx, baseParts: string[]): CacheKeys {
@@ -1682,10 +1871,15 @@ export function cacheKeys(cc: PhaseCacheCtx, baseParts: string[]): CacheKeys {
];
const fold = (parts: string[]): string =>
cc.fingerprint ? hashInput(...parts, cc.fingerprint) : hashInput(...parts);
+ // Per-phase sub-fingerprint; falls back to the whole-flow hash when absent
+ // (sub-flow inner states, or soundness fallback) — preserving pre-M6 behavior.
+ const fp = cc.phaseFp ?? cc.flowDefHash ?? "";
+ const fdh = cc.flowDefHash ?? "";
return {
- key: fold([`flow:${cc.flowName}`, `v2:flowdef:${cc.flowDefHash ?? ""}`, ...tail]),
+ key: fold([`flow:${cc.flowName}`, `v3:phasefp:${fp}`, ...tail]),
+ v2Key: fold([`flow:${cc.flowName}`, `v2:flowdef:${fdh}`, ...tail]),
+ bareKey: fold([`flow:${cc.flowName}`, `flowdef:${fdh}`, ...tail]),
legacyKey: fold([`flow:${cc.flowName}`, ...tail]),
- bareKey: fold([`flow:${cc.flowName}`, `flowdef:${cc.flowDefHash ?? ""}`, ...tail]),
};
}
@@ -1696,9 +1890,10 @@ export function cacheKeys(cc: PhaseCacheCtx, baseParts: string[]): CacheKeys {
* - "cross-run": within-run first, then the persistent cross-run store.
* On a cross-run hit, usage is zeroed and `cacheHit` records the source.
*
- * The cross-run read is THREE-TIER and READ-ONLY for fallback keys: it tries
- * `keys.key` (current `v2:flowdef:` shape) first, then `keys.bareKey` (pre-H1
- * bare `flowdef:`), then `keys.legacyKey` (pre-flowDefHash, no flowdef line).
+ * The cross-run read is FOUR-TIER and READ-ONLY for fallback keys: it tries
+ * `keys.key` (current `v3:phasefp:` shape) first, then `keys.v2Key` (pre-M6
+ * `v2:flowdef:`), then `keys.bareKey` (pre-H1 bare `flowdef:`), then
+ * `keys.legacyKey` (pre-flowDefHash, no flowdef line).
* A hit on ANY tier is restored as a cache hit; we do NOT write-through (no
* re-store under the new key) so the cache size stays stable and the legacy
* entry ages out naturally. See docs/internal/cache-migration.md.
@@ -1707,14 +1902,17 @@ function cachedPhase(cc: PhaseCacheCtx, keys: CacheKeys): PhaseState | null {
if (cc.scope === "off") return null;
if (cc.forceRerun) return null;
- // 1. within-run resume (fastest; always allowed unless scope is off)
+ // 1. within-run resume (fastest; always allowed unless scope is off). Flag
+ // it as a `run-only` cache hit so the run summary can count it as reused
+ // work (it spent no new tokens). The prior usage is preserved verbatim so
+ // the summary can report what the reuse would otherwise have cost.
if (cc.prior && cc.prior.status === "done" && cc.prior.inputHash === keys.key) {
- return { ...cc.prior, status: "done" };
+ return { ...cc.prior, status: "done", cacheHit: "run-only" };
}
- // 2. cross-run memoization (opt-in) — three-tier read-only fallback.
+ // 2. cross-run memoization (opt-in) — four-tier read-only fallback.
if (cc.scope === "cross-run") {
- for (const k of [keys.key, keys.bareKey, keys.legacyKey]) {
+ for (const k of [keys.key, keys.v2Key, keys.bareKey, keys.legacyKey]) {
const e = cc.store.get(k, cc.ttlMs);
if (!e) continue;
// If we stored the full PhaseState, restore it (preserving gate,
@@ -1895,6 +2093,22 @@ export interface RecomputeReport {
/** Phases in the frontier whose inputHash did NOT move → cached result
* reused, no re-execution (early cutoff). Empty in dry-run (unknowable). */
readonly cutoff: readonly string[];
+ /** Per-phase decision trace: WHY each phase was rerun / cut off / reused.
+ * The "explainable reactivity" layer — like React DevTools telling you why
+ * a component re-rendered. Additive; callers that ignore it are unaffected. */
+ readonly decisions: readonly RecomputeDecision[];
+}
+
+/** Why a single phase landed in its recompute outcome. */
+export interface RecomputeDecision {
+ readonly phaseId: string;
+ /** What happened (real run) or would happen (dry-run). */
+ readonly outcome: "rerun" | "cutoff" | "reused" | "failed";
+ /** Human-readable cause. */
+ readonly reason: string;
+ /** The upstream phase(s) that caused this outcome, when applicable
+ * (e.g. the changed upstreams that forced a rerun). */
+ readonly causedBy?: readonly string[];
}
/** Scan a flow for dependencies that cannot be observed through the readSet.
@@ -1946,6 +2160,30 @@ export async function recomputeTaskflow(
const allIds = Object.keys(newState.phases);
if (opts.dryRun) {
+ // Explain each phase WITHOUT executing: a frontier phase "may rerun"
+ // because it (transitively) reads a changed seed; everything else is
+ // reused as unreachable. We name the in-frontier upstream(s) as the cause.
+ const seedSet0 = new Set(seeds);
+ const upstreamsOf = (id: string): string[] => {
+ const observed = (newState.phases[id]?.reads ?? []).map((r) => r.stepId).filter((u) => u !== id);
+ const decl = (declared.get(id) ?? []).filter((u) => u !== id);
+ return [...new Set([...observed, ...decl])];
+ };
+ const decisions: RecomputeDecision[] = allIds.map((id) => {
+ if (!frontier.has(id)) {
+ return { phaseId: id, outcome: "reused", reason: "not reachable from any changed seed" };
+ }
+ if (seedSet0.has(id)) {
+ return { phaseId: id, outcome: "rerun", reason: "forced by recompute request (seed)" };
+ }
+ const causes = upstreamsOf(id).filter((u) => frontier.has(u));
+ return {
+ phaseId: id,
+ outcome: "rerun",
+ reason: "reads a phase in the stale frontier; may re-run if that upstream's output moves",
+ causedBy: causes.length ? causes : undefined,
+ };
+ });
return {
report: {
dryRun: true,
@@ -1954,6 +2192,7 @@ export async function recomputeTaskflow(
rerun: [...frontier],
reused: allIds.filter((id) => !frontier.has(id)),
cutoff: [],
+ decisions,
},
state: newState,
};
@@ -2003,6 +2242,11 @@ export async function recomputeTaskflow(
.filter((id) => frontier.has(id));
const rerun: string[] = [];
const cutoff: string[] = [];
+ const decisions: RecomputeDecision[] = [];
+ // Phases whose OUTPUT actually moved this recompute (seed forced, or result
+ // changed). Used to attribute a downstream rerun to the specific upstream(s)
+ // that changed — the "why" of the decision trace.
+ const outputMoved = new Set();
const noop = () => {};
let aborted = false;
for (const id of order) {
@@ -2015,17 +2259,50 @@ export async function recomputeTaskflow(
const phase = newState.def.phases.find((p) => p.id === id);
if (!phase) continue;
const before = newState.phases[id]?.inputHash;
- const execOpts = seedSet.has(id) ? { forceRerun: true } : undefined;
+ const isSeed = seedSet.has(id);
+ const execOpts = isSeed ? { forceRerun: true } : undefined;
+ // The upstream(s) of this phase whose output moved — the cause of a rerun.
+ const changedUpstreams = depsFor(id).filter((u) => outputMoved.has(u));
try {
const ps = await executePhase(phase, newState, deps, newState.phases[id], noop, 0, execOpts);
newState.phases[id] = ps;
// A phase counts as "rerun" if it was a forced seed OR its result moved;
// otherwise it hit its cache (inputHash unchanged) → early cutoff.
- if (seedSet.has(id) || ps.inputHash !== before) rerun.push(id);
- else cutoff.push(id);
+ if (isSeed || ps.inputHash !== before) {
+ rerun.push(id);
+ outputMoved.add(id);
+ decisions.push(
+ isSeed
+ ? { phaseId: id, outcome: "rerun", reason: "forced by recompute request (seed)" }
+ : {
+ phaseId: id,
+ outcome: "rerun",
+ reason: "input changed — an upstream's output moved",
+ causedBy: changedUpstreams.length ? changedUpstreams : undefined,
+ },
+ );
+ } else {
+ cutoff.push(id);
+ decisions.push({
+ phaseId: id,
+ outcome: "cutoff",
+ reason: "input unchanged — upstream(s) re-ran but produced identical output (early cutoff)",
+ causedBy: depsFor(id).filter((u) => frontier.has(u)).length
+ ? depsFor(id).filter((u) => frontier.has(u))
+ : undefined,
+ });
+ }
} catch {
// A failing recompute phase is recorded as rerun (it was attempted).
rerun.push(id);
+ outputMoved.add(id);
+ decisions.push({ phaseId: id, outcome: "failed", reason: "re-execution attempted but the phase failed" });
+ }
+ }
+ // Frontier-external phases were never touched — record them as reused.
+ for (const id of allIds) {
+ if (!frontier.has(id)) {
+ decisions.push({ phaseId: id, outcome: "reused", reason: "not reachable from any changed seed" });
}
}
return {
@@ -2036,6 +2313,7 @@ export async function recomputeTaskflow(
rerun,
reused: allIds.filter((id) => !frontier.has(id)),
cutoff,
+ decisions,
},
state: newState,
};
@@ -2099,6 +2377,27 @@ async function runTaskflowLayers(state: RunState, deps: RuntimeDeps): Promise = {};
+ for (const p of def.phases) {
+ try {
+ map[p.id] = (await phaseFingerprint(def, p.id)) ?? whole;
+ } catch {
+ map[p.id] = whole; // fail-open → whole-flow scope
+ }
+ }
+ state.phaseFingerprints = map;
+ }
+
state.status = "running";
safeEmit(deps, state);
@@ -2238,5 +2537,6 @@ async function runTaskflowLayers(state: RunState, deps: RuntimeDeps): Promise [p.id, p]));
+ const seen = new Set();
+ const queue: string[] = [];
+ const seed = byId.get(phaseId);
+ if (seed) for (const d of dependenciesOf(seed)) queue.push(d);
+ while (queue.length) {
+ const id = queue.shift()!;
+ if (seen.has(id)) continue;
+ if (!byId.has(id)) continue; // unknown dep — validation reports elsewhere
+ seen.add(id);
+ const dep = byId.get(id)!;
+ for (const d of dependenciesOf(dep)) {
+ if (!seen.has(d)) queue.push(d);
+ }
+ }
+ return Array.from(seen).sort();
+}
+
/** Topologically ordered layers; phases in the same layer can run concurrently. */
export function topoLayers(phases: Phase[]): Phase[][] {
const byId = new Map(phases.map((p) => [p.id, p]));
diff --git a/extensions/store.ts b/extensions/store.ts
index aa464d1..881f2e3 100644
--- a/extensions/store.ts
+++ b/extensions/store.ts
@@ -42,10 +42,11 @@ export interface PhaseState {
model?: string;
error?: string;
inputHash?: string;
- /** When this result was served from cache: 'cross-run' for the persistent
- * cross-run store. (Within-run resume reuses prior state verbatim and is not
- * flagged here.) */
- cacheHit?: "cross-run";
+ /** When this result was served from cache instead of executed:
+ * 'cross-run' = restored from the persistent cross-run store;
+ * 'run-only' = within-run resume (a prior attempt with the same inputHash).
+ * A phase with this set spent no new tokens this run. */
+ cacheHit?: "cross-run" | "run-only";
startedAt?: number;
endedAt?: number;
/** Live fan-out progress for map/parallel phases. */
@@ -114,6 +115,13 @@ export interface RunState {
* recompute derives this fresh from `def` so old runs (pre-H1) also get
* union semantics. */
declaredDeps?: Record;
+ /** Per-phase structural sub-fingerprints (M6). Computed once per run
+ * alongside `flowDefHash`. Each value is either a precise per-phase hash
+ * (when sound) or the whole-flow `flowDefHash` (fallback for
+ * shareContext / `flow` phases). Folded into the cross-run cache key as
+ * `v3:phasefp:` so editing phase B invalidates only B + its
+ * transitive dependents. Audit/resume only — recompute derives fresh. */
+ phaseFingerprints?: Record;
}
// ---------------------------------------------------------------------------
diff --git a/package.json b/package.json
index d520ccf..d89c42f 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "pi-taskflow",
- "version": "0.0.27",
+ "version": "0.0.28",
"description": "A declarative, verifiable graph of task nodes for the Pi coding agent — not a workflow you script, but a DAG you declare: statically verified before it runs, with dynamic fan-out, gates, isolated subagent context, resumable runs, and saveable commands.",
"keywords": [
"pi-package",
diff --git a/skills/taskflow/SKILL.md b/skills/taskflow/SKILL.md
index aca991b..a8863f1 100644
--- a/skills/taskflow/SKILL.md
+++ b/skills/taskflow/SKILL.md
@@ -549,10 +549,58 @@ Quick reference:
- **Flow:** `name`, `description`, `concurrency` (default 8), `budget` (`maxUSD`/`maxTokens`), `agentScope` (user|project|both), `args`, `strictInterpolation`.
- **Phase:** `model`, `thinking`, `tools` (whitelist), `cwd`, `output:"json"`, `concurrency` (map/parallel fan-out), `when`, `join` (all|any), `retry`, `use`/`with` (flow), `optional` (fail-soft — a failed/blocked phase won't abort the run), `final`.
-- **Cross-run caching:** add `cache: { "scope": "cross-run" }` to a phase to memoize its output across runs (same input → instant reuse, zero tokens). See `configuration.md` for `ttl`, `fingerprint` (git/glob/file/env invalidation), and scope options.
+- **Cross-run caching:** add `cache: { "scope": "cross-run" }` to a phase to memoize its output across runs (same input → instant reuse, zero tokens), or set `incremental: true` at the flow level (or pass `incremental: true` to `run`) to default every phase to cross-run reuse. See `configuration.md` for `ttl`, `fingerprint` (git/glob/file/env invalidation), scope options, and the `incremental` precedence rules.
- **Precedence (model/thinking/tools):** phase value → agent frontmatter (resolved via `modelRoles`) → global/default.
- **Concurrency:** same-layer phases use `flow.concurrency`; a `map`/`parallel` phase uses `phase.concurrency ?? flow.concurrency ?? 8`.
+### Per-item map caching (cross-run)
+
+A `map` phase with `cache: { "scope": "cross-run" }` is cached **per item**, not
+just as a whole. When one of N items changes between runs, only that item
+re-executes — the other N−1 are served from the cross-run cache for $0.
+
+```jsonc
+{ "id": "audit-each", "type": "map",
+ "over": "{steps.discover.json.files}", // array from an upstream phase
+ "task": "audit {item}",
+ "cache": { "scope": "cross-run" }, // ← enables per-item reuse
+ "dependsOn": ["discover"], "final": true }
+```
+
+How it works:
+
+- The **whole-map** entry is still checked first (fast path): an identical
+ re-run is a single $0 hit and never enters the fan-out.
+- On a whole-map miss, each item is looked up individually before it spawns a
+ subagent; a hit returns a 0-token synthesized result. Successful fresh items
+ are recorded so a later run with that item unchanged reuses them.
+- Per-item keys fold the item's resolved task **and agent** (so changing
+ `phase.agent` invalidates every item), plus the phase sub-fingerprint,
+ `thinking`/`tools`, and any `fingerprint` entries — exactly like a standalone
+ cross-run phase.
+
+Automatic fallbacks (per-item disables and the whole-map path is used):
+
+- `shareContext: true` on the phase, or flow-wide `contextSharing: true` — a
+ sharing item can read sibling blackboard writes outside its declared deps, so
+ the per-item key would under-approximate real reads.
+- The map runs **inside a runtime-generated sub-flow** (a `flow { def }` phase
+ or a `ctx_spawn({subflow})`) — untrusted / possibly non-deterministic.
+- `scope: "run-only"` (default) or `"off"` — no persistent store to reuse from.
+
+Notes & limitations:
+
+- Duplicate items (identical task + agent) share a single entry — reuse is
+ content-addressable, not positional.
+- Failed items and **budget-skipped** items are never cached, so they always
+ re-execute on the next run.
+- `{steps.