From c11fbd9abede76b8bcf59310a6aa6d8ce8ff617e Mon Sep 17 00:00:00 2001
From: heggria
Date: Thu, 25 Jun 2026 19:30:16 +0800
Subject: [PATCH 1/5] feat(cache): per-phase structural sub-fingerprint
(v3:phasefp)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Replace the whole-flow v2:flowdef cache-key tier with a per-phase
structural sub-fingerprint so editing phase B invalidates only B and its
transitive dependents — independent sibling phase A keeps its cache hit.
phaseFingerprint(def, phaseId) (extensions/flowir/phasefp.ts) hashes the
phase plus its transitive dependsOn ∪ from closure, reusing the vendored
canonicalJson + hashCanonical (byte-identical to overstory's contract).
Only the policy field cache is stripped; every other Phase field is hashed.
Soundness fallback: phaseFingerprint returns undefined (→ caller folds the
whole-flow flowDefHash, preserving pre-M6 behavior) when per-phase
invalidation cannot be statically guaranteed — contextSharing at flow level,
any shareContext phase in the closure, or any flow phase in the closure.
Sub-flow inner phases always use this fallback.
cacheKeys now produces a 4-tier ladder: key (v3:phasefp, write) → v2Key
(v2:flowdef, read-only) → bareKey (bare flowdef, read-only) → legacyKey
(no flowdef, read-only). cachedPhase consults all four read-only on a miss;
recordCache writes only key. This makes the M6 upgrade additive — no
miss-storm for unchanged flows.
phaseFingerprints computed once per run in runTaskflowLayers alongside
flowDefHash, plumbed through RunState + PhaseCacheCtx. Fail-open: any
per-phase error degrades that phase to the whole-flow hash.
Tests: test/cache-phasefp.test.ts (11 tests — soundness gate, determinism,
precise-diff win, transitive propagation, v2 fallback, cross-flow
isolation, shareContext fallback). Updated cache-migration.test.ts
(distinct 4-tier keys; structural-change test now scoped to p's closure)
and runtime.test.ts resume tests to the v3 key shape.
---
docs/internal/cache-migration.md | 93 ++++++++---
extensions/flowir/index.ts | 2 +
extensions/flowir/phasefp.ts | 103 ++++++++++++
extensions/runtime.ts | 220 ++++++++++++++++++++++---
extensions/schema.ts | 31 ++++
extensions/store.ts | 16 +-
test/cache-migration.test.ts | 36 ++--
test/cache-phasefp.test.ts | 274 +++++++++++++++++++++++++++++++
test/runtime.test.ts | 19 ++-
9 files changed, 719 insertions(+), 75 deletions(-)
create mode 100644 extensions/flowir/phasefp.ts
create mode 100644 test/cache-phasefp.test.ts
diff --git a/docs/internal/cache-migration.md b/docs/internal/cache-migration.md
index 2b6bf84..ee203fd 100644
--- a/docs/internal/cache-migration.md
+++ b/docs/internal/cache-migration.md
@@ -12,25 +12,55 @@ Before H1, the cache key folded the flow **definition** fingerprint under a bare
H1 versions the key with a `v2:` prefix and routes the fingerprint through the
FlowIR compile seam (`compileTaskflowToIR` → `flowDefHash`).
-To avoid a one-time miss-storm on upgrade, the runtime consults **three** keys
-on every cross-run lookup, read-only for the legacy tiers.
+M6 replaces the whole-flow `v2:flowdef:` tier with a **per-phase structural
+sub-fingerprint** (`v3:phasefp:`): the hash of a single phase plus its
+transitive dependency closure. Editing phase B now invalidates only B and its
+transitive dependents — independent sibling phase A keeps its cache hit.
-## Key shapes (H1)
+To avoid a one-time miss-storm on upgrade, the runtime consults **four** keys
+on every cross-run lookup, read-only for the fallback tiers.
-`cacheKeys()` (`extensions/runtime.ts`) returns three keys for a phase:
+## Key shapes (M6)
+
+`cacheKeys()` (`extensions/runtime.ts`) returns four keys for a phase:
| Tier | Shape | Written by | Status |
|------|-------|-----------|--------|
-| `key` (current) | `flow:` + `v2:flowdef:` + `` + `think/tools/ctx` + fingerprint | H1+ | **read + write** |
+| `key` (current) | `flow:` + `v3:phasefp:` + `` + `think/tools/ctx` + fingerprint | M6+ | **read + write** |
+| `v2Key` | `flow:` + `v2:flowdef:` + … | H1..M5 | **read-only** |
| `bareKey` | `flow:` + `flowdef:` (bare, unversioned) + … | pre-H1 | **read-only** (removed in v0.1.0) |
| `legacyKey` | `flow:` + … (flowdef line omitted) | pre-flowDefHash era | **read-only** (removed in v0.1.0) |
+### The per-phase sub-fingerprint (`v3:phasefp`)
+
+`phaseFingerprint(def, phaseId)` (`extensions/flowir/phasefp.ts`) hashes the
+phase itself plus its transitive `dependsOn ∪ from` closure, reusing the vendored
+`canonicalJson` + `hashCanonical` (byte-identical to overstory's contract). The
+`cache` policy field is stripped (its sub-fields reach the key via other paths);
+every other `Phase` field is hashed.
+
+**Soundness fallback.** Per-phase invalidation is only sound when a phase's real
+dependencies are fully captured by the static closure. `phaseFingerprint` returns
+`undefined` (→ the caller folds the whole-flow `flowDefHash` instead, preserving
+pre-M6 behavior) when:
+
+- the flow has `contextSharing: true`, OR
+- any phase in the closure (self included) has `shareContext: true`, OR
+- any phase in the closure (self included) has `type: "flow"`.
+
+These are the cases where a phase can read sibling state outside its declared
+deps (Shared Context Tree) or where sub-structure is resolved at runtime
+(`flow`). Sub-flow inner phases always use this fallback (their `phaseFp` is
+absent → `flowDefHash`), so editing one phase inside a sub-flow invalidates all
+sub-flow phases — a known, safe conservatism.
+
### Lookup order (`cachedPhase`)
1. within-run resume (`cc.prior.inputHash === keys.key`) — fastest, always allowed.
-2. `store.get(keys.key)` — current v2 entry.
-3. `store.get(keys.bareKey)` — pre-H1 bare entry.
-4. `store.get(keys.legacyKey)` — pre-flowDefHash entry.
+2. `store.get(keys.key)` — current v3 entry.
+3. `store.get(keys.v2Key)` — pre-M6 v2 entry.
+4. `store.get(keys.bareKey)` — pre-H1 bare entry.
+5. `store.get(keys.legacyKey)` — pre-flowDefHash entry.
A hit on **any** tier is restored as a `cacheHit: "cross-run"` result with zero
usage. The restored `PhaseState.inputHash` is always `keys.key` (the current
@@ -38,37 +68,48 @@ shape), so downstream phases and recompute see a consistent identity.
### Write policy (`recordCache`)
-Only `keys.key` (the current v2 shape) is ever written. Legacy/bare hits are
+Only `keys.key` (the current v3 shape) is ever written. v2/bare/legacy hits are
**not** write-through: re-storing under the new key would double the cache size
-for no benefit. Legacy/bare entries age out naturally via the 90-day hard cap
+for no benefit. Legacy/bare/v2 entries age out naturally via the 90-day hard cap
(`DEFAULT_MAX_AGE_MS`) and the LRU cap (`DEFAULT_MAX_ENTRIES`).
-## Why three tiers?
-
-- **`v2:flowdef:` (current):** the versioned prefix lets a future genuine
- overstory compiler advance to `v3:flowIR:` with its own fallback tier,
- without disturbing v2 entries.
-- **bare `flowdef:` (pre-H1):** pre-H1 code wrote this shape. Without the 3rd
- tier, every existing cross-run entry would silently miss on upgrade — a
- one-time miss-storm for opt-in cross-run users.
+## Why four tiers?
+
+- **`v3:phasefp:` (current):** the per-phase structural sub-fingerprint enables
+ precise invalidation — editing one phase no longer evicts independent
+ siblings. The versioned prefix lets a future genuine overstory compiler
+ advance to `v4:flowIR:` with its own fallback tier, without disturbing v3.
+- **`v2:flowdef:` (pre-M6):** M5-and-earlier code wrote this whole-flow shape.
+ Without this tier, every existing cross-run entry would silently miss on the
+ M6 upgrade — a one-time miss-storm for opt-in cross-run users.
+- **bare `flowdef:` (pre-H1):** pre-H1 code wrote this shape. Retained for
+ completeness.
- **no-flowdef (pre-flowDefHash):** the very earliest cross-run entries, before
the flow definition was folded into the key at all. Retained for completeness;
these are rare.
+### Upgrade note (one-time cost)
+
+On the first post-M6 run, if a sibling phase was edited between the last
+pre-M6 run and the upgrade, an *unchanged* independent phase may re-execute
+once: its v2 entry was keyed on the old `flowDefHash`, which no longer matches.
+This is bounded (per-flow, one-time, only when a sibling edit happened) and
+amortized over subsequent runs as v3 entries take over. For unchanged flows the
+v2 tier hits and no re-execution occurs.
+
## Retirement
-- **v0.1.0:** remove the `bareKey` and `legacyKey` tiers and the `CacheKeys`
- return to a single `key`. By then all pre-H1 entries will have aged out (90-day
- hard cap). The `v2:` prefix is retained as the version anchor for the *next*
- migration.
-- A pre-release verification step: inspect a real `.pi/taskflow/cache/` directory
- for bare-`flowdef:` entries. If cross-run is confirmed unused in production
- (opt-in, young), the bare tier can be dropped earlier.
+- **v0.1.0:** remove the `bareKey` and `legacyKey` tiers. By then all pre-H1
+ entries will have aged out (90-day hard cap).
+- **Later:** remove the `v2Key` tier once all pre-M6 entries have aged out.
+- The `v3:` prefix is retained as the version anchor for the *next* migration.
## See also
- `extensions/flowir/hash.ts` — the vendored overstory hash algorithm.
+- `extensions/flowir/phasefp.ts` — the per-phase structural sub-fingerprint.
- `extensions/flowir/index.ts` — `compileTaskflowToIR` (the seam that produces
- `hash` and `meta.declaredDeps`).
+ `hash` and `meta.declaredDeps`) and `phaseFingerprint`.
- `docs/internal/overstory-convergence-roadmap.md` §3 (M1).
- `test/cache-migration.test.ts` — the migration contract tests.
+- `test/cache-phasefp.test.ts` — the per-phase sub-fingerprint contract tests.
diff --git a/extensions/flowir/index.ts b/extensions/flowir/index.ts
index f5f8962..e061559 100644
--- a/extensions/flowir/index.ts
+++ b/extensions/flowir/index.ts
@@ -71,3 +71,5 @@ export type {
TaskflowIR,
TaskflowIRMeta,
} from "./meta.ts";
+
+export { phaseFingerprint } from "./phasefp.ts";
diff --git a/extensions/flowir/phasefp.ts b/extensions/flowir/phasefp.ts
new file mode 100644
index 0000000..a7f3c46
--- /dev/null
+++ b/extensions/flowir/phasefp.ts
@@ -0,0 +1,103 @@
+/**
+ * Per-phase structural sub-fingerprint (M6).
+ *
+ * `phaseFingerprint` produces a content-addressed hash of ONLY the subset of
+ * the flow definition that can affect a single phase's subagent output: the
+ * phase itself plus its transitive dependency closure. Folding this into the
+ * cross-run cache key (instead of the whole-flow `flowDefHash`) means editing
+ * phase B invalidates only B and its transitive dependents — independent
+ * sibling phase A keeps its cache hit.
+ *
+ * ## Soundness (the fallback gate)
+ *
+ * Per-phase invalidation is only sound when a phase's *real* dependencies are
+ * fully captured by the static `dependsOn ∪ from` closure. Three cases break
+ * that guarantee, so `phaseFingerprint` returns `undefined` for them and the
+ * caller falls back to the whole-flow `flowDefHash` (safe, = pre-M6 behavior):
+ *
+ * 1. **Shared Context Tree** (`def.contextSharing === true` or any closure
+ * member has `shareContext === true`): a sharing phase can read sibling
+ * blackboard writes OUTSIDE its declared deps, so the static closure
+ * under-approximates real reads.
+ * 2. **`flow` phase in the closure** (`type === "flow"`): a `flow` phase's
+ * sub-structure is resolved at runtime (inline `def`) or from a saved
+ * flow (`use`) and is not statically visible here. Editing the saved
+ * sub-flow would not move this phase's sub-fingerprint.
+ *
+ * `cache` (the policy object) is the ONLY field stripped from each phase
+ * before hashing: its sub-fields (`scope`/`ttl`/`fingerprint`) are folded into
+ * the cache key through other paths (`cc.scope` gates the lookup, `cc.ttlMs`
+ * governs expiry, `cc.fingerprint` is in the key tail). Every other `Phase`
+ * field is hashed. `PhaseSchema` uses `additionalProperties: false`, so no
+ * surprise field can be missed.
+ *
+ * Pure + async (Web Crypto via `hashCanonical`). Reuses the vendored
+ * `canonicalJson`/`hashCanonical` (byte-identical to overstory's contract) so
+ * the sub-fingerprint shares one hashing contract with `flowDefHash`. Never
+ * throws — callers wrap in try/catch and degrade to `flowDefHash`.
+ *
+ * @see docs/internal/cache-migration.md (v3:phasefp tier)
+ */
+
+import { transitiveDependencies, type Phase, type Taskflow } from "../schema.ts";
+import { canonicalJson, hashCanonical } from "./hash.ts";
+
+/** Policy field stripped before hashing (its sub-fields reach the key via
+ * `cc.scope` / `cc.ttlMs` / `cc.fingerprint` — folding them here would be
+ * recursive and redundant). This is the ONLY field stripped. */
+const PHASE_FP_STRIP = ["cache"] as const;
+
+/** Clone a phase into a plain record with policy fields removed. */
+function stripPolicy(phase: Phase): Record {
+ const rec = phase as unknown as Record;
+ const out: Record = {};
+ for (const k of Object.keys(rec)) {
+ if ((PHASE_FP_STRIP as readonly string[]).includes(k)) continue;
+ out[k] = rec[k];
+ }
+ return out;
+}
+
+/**
+ * Per-phase structural sub-fingerprint.
+ *
+ * @returns the hex hash, or `undefined` when per-phase soundness cannot be
+ * guaranteed (caller falls back to the whole-flow `flowDefHash`). Never
+ * throws.
+ */
+export async function phaseFingerprint(def: Taskflow, phaseId: string): Promise {
+ const phases = def.phases as Phase[];
+ const byId = new Map(phases.map((p) => [p.id, p]));
+ const phase = byId.get(phaseId);
+ if (!phase) return undefined;
+
+ // --- Soundness gate: fall back to whole-flow when static closure is unsafe. ---
+ // Flow-wide context sharing enables cross-sibling reads outside declared deps.
+ if (def.contextSharing === true) return undefined;
+
+ const closureIds = transitiveDependencies(phases, phaseId);
+ const closurePhases: Phase[] = [];
+ for (const id of closureIds) {
+ const p = byId.get(id);
+ if (!p) continue; // unknown dep — validation reports elsewhere
+ // Per-phase sharing: this closure member can read sibling blackboard
+ // writes outside its own declared deps.
+ if (p.shareContext === true) return undefined;
+ // A flow phase's sub-structure is runtime/saved-flow-resolved and not
+ // statically visible — editing it would not move the sub-fingerprint.
+ if ((p.type ?? "agent") === "flow") return undefined;
+ closurePhases.push(p);
+ }
+ // The self phase's own sharing/type is part of the closure too.
+ if (phase.shareContext === true) return undefined;
+ if ((phase.type ?? "agent") === "flow") return undefined;
+
+ // --- Build the canonical payload. ---
+ // `deps` is the SORTED transitive closure (self excluded). canonicalJson
+ // sorts OBJECT keys but preserves ARRAY order, so we sort the array
+ // explicitly for determinism independent of dependency walk order.
+ const depsPayload = closurePhases.map((p) => ({ id: p.id, def: stripPolicy(p) }));
+ const payload = { self: stripPolicy(phase), deps: depsPayload };
+
+ return hashCanonical(canonicalJson(payload));
+}
diff --git a/extensions/runtime.ts b/extensions/runtime.ts
index 351b346..49c3eae 100644
--- a/extensions/runtime.ts
+++ b/extensions/runtime.ts
@@ -20,7 +20,7 @@ import { type Budget, type CacheScope, dependenciesOf, finalPhase, LOOP_DEFAULT_
import { verifyTaskflow } from "./verify.ts";
import { hashInput, newRunId, type PhaseState, type RunState, runsDir } from "./store.ts";
import { CacheStore, resolveFingerprint } from "./cache.ts";
-import { compileTaskflowToIR } from "./flowir/index.ts";
+import { compileTaskflowToIR, phaseFingerprint } from "./flowir/index.ts";
import { computeStaleFrontier, declaredReadMapOfDef, readMapOf } from "./stale.ts";
import { ctxDirFor, drainPendingSpawns, initCtxDir, registerNode, setNodeStatus, type SpawnAssignment } from "./context-store.ts";
import { allocateWorkspace, isWorkspaceKeyword, type Workspace } from "./workspace.ts";
@@ -72,6 +72,55 @@ export interface RuntimeResult {
finalOutput: string;
ok: boolean;
totalUsage: UsageStats;
+ /** Incremental-reuse summary: how many phases were reused from cache vs.
+ * freshly executed this run, and the cost the reused work would otherwise
+ * have incurred (known only for within-run resume; cross-run hits zero
+ * their usage so their original cost is not recoverable). Optional &
+ * additive — callers that ignore it are unaffected. */
+ reuse?: ReuseSummary;
+}
+
+/** A run's incremental-reuse accounting (see RuntimeResult.reuse). */
+export interface ReuseSummary {
+ /** Phases that completed by executing a subagent this run. */
+ executed: number;
+ /** Phases served from the within-run resume cache (no new tokens). */
+ reusedRunOnly: number;
+ /** Phases restored from the cross-run store (no new tokens). */
+ reusedCrossRun: number;
+ /** Total phases that reached `done` (executed + reused). */
+ done: number;
+ /** USD the within-run-reused phases would have cost if re-executed (their
+ * preserved prior usage). Cross-run hits are excluded (cost not recoverable). */
+ savedUSD: number;
+}
+
+/** Compute the incremental-reuse summary from a run's terminal phase states.
+ * Pure, total, never throws. A phase is "reused" iff it carries a `cacheHit`
+ * marker (set by `cachedPhase` for both within-run resume and cross-run hits). */
+export function summarizeReuse(state: RunState): ReuseSummary {
+ let executed = 0;
+ let reusedRunOnly = 0;
+ let reusedCrossRun = 0;
+ let savedUSD = 0;
+ for (const ps of Object.values(state.phases)) {
+ if (ps.status !== "done") continue;
+ if (ps.cacheHit === "run-only") {
+ reusedRunOnly++;
+ savedUSD += ps.usage?.cost ?? 0; // within-run resume preserves prior usage
+ } else if (ps.cacheHit === "cross-run") {
+ reusedCrossRun++; // cross-run hits zero their usage — cost not recoverable
+ } else {
+ executed++;
+ }
+ }
+ return {
+ executed,
+ reusedRunOnly,
+ reusedCrossRun,
+ done: executed + reusedRunOnly + reusedCrossRun,
+ savedUSD,
+ };
}
function buildInterpolationContext(
@@ -721,6 +770,7 @@ async function executePhaseInner(
flowName: state.flowName,
runId: state.runId,
flowDefHash: state.flowDefHash === "failed" ? undefined : state.flowDefHash,
+ phaseFp: state.phaseFingerprints?.[phase.id],
forceRerun: opts?.forceRerun,
thinking: phase.thinking,
tools: phase.tools,
@@ -1635,6 +1685,12 @@ export interface PhaseCacheCtx {
* key so two structurally-different flows that share a name can never
* collide, and a changed flow never serves a stale cross-run hit. */
flowDefHash?: string | "failed";
+ /** Per-phase structural sub-fingerprint (M6). When present, folds into the
+ * key as `v3:phasefp:` so editing phase B invalidates only B + its
+ * transitive dependents. When absent (sub-flow inner states, or a phase
+ * for which per-phase soundness couldn't be guaranteed), `cacheKeys`
+ * falls back to `flowDefHash` — preserving pre-M6 whole-flow behavior. */
+ phaseFp?: string;
/** Force this phase to re-execute, ignoring the within-run prior AND the
* cross-run store (M5 recompute seed). Downstream phases are NOT forced —
* they re-evaluate naturally: if the seed's new output changed their
@@ -1646,27 +1702,34 @@ export interface PhaseCacheCtx {
/** A computed cache identity: the new (versioned) key plus the read-only
* fallback keys used to honor entries written by older releases. The `key`
* is what we WRITE under and what `PhaseState.inputHash` carries; the
- * `legacyKey`/`bareKey` are consulted READ-ONLY on a miss so an upgrade
- * never produces a miss-storm. See docs/internal/cache-migration.md. */
+ * `v2Key`/`bareKey`/`legacyKey` are consulted READ-ONLY on a miss so an
+ * upgrade never produces a miss-storm. See docs/internal/cache-migration.md. */
export interface CacheKeys {
- /** Current key: folds `v2:flowdef:` (the overstory content fingerprint). */
+ /** Current key: folds `v3:phasefp:` (the per-phase structural
+ * sub-fingerprint; degrades to the whole-flow hash when per-phase
+ * soundness couldn't be guaranteed). */
key: string;
- /** Pre-flowDefHash-era key: the flowdef line OMITTED entirely. Read-only. */
- legacyKey: string;
+ /** Pre-M6 key: `v2:flowdef:` (whole-flow fingerprint).
+ * Read-only. */
+ v2Key: string;
/** Bare (unversioned) `flowdef:` key — written by pre-H1 code that folded
* the hash without a `v2:` prefix. Read-only. Removed in v0.1.0. */
bareKey: string;
+ /** Pre-flowDefHash-era key: the flowdef line OMITTED entirely. Read-only. */
+ legacyKey: string;
}
/** Fold the phase fingerprint into the base hash parts to form the cache keys.
*
- * Three keys are produced for backward compatibility (see
+ * Four keys are produced for backward compatibility (see
* docs/internal/cache-migration.md):
- * - `key` : `v2:flowdef:` — the current write key.
+ * - `key` : `v3:phasefp:` — the current write key (per-phase
+ * structural sub-fingerprint; falls back to the whole-flow hash when
+ * `cc.phaseFp` is absent).
+ * - `v2Key` : `v2:flowdef:` — pre-M6 whole-flow key.
+ * - `bareKey` : bare `flowdef:` (unversioned) — pre-H1 entries.
* - `legacyKey`: the flowdef line omitted — pre-flowDefHash entries.
- * - `bareKey` : bare `flowdef:` (unversioned) — pre-H1 entries that
- * folded the hash without the `v2:` prefix.
- * `cachedPhase` consults all three READ-ONLY on a miss; `recordCache` writes
+ * `cachedPhase` consults all four READ-ONLY on a miss; `recordCache` writes
* only `key`. This means an upgrade never produces a miss-storm: existing
* entries (whichever shape) still hit, and new writes converge on `key`. */
export function cacheKeys(cc: PhaseCacheCtx, baseParts: string[]): CacheKeys {
@@ -1682,10 +1745,15 @@ export function cacheKeys(cc: PhaseCacheCtx, baseParts: string[]): CacheKeys {
];
const fold = (parts: string[]): string =>
cc.fingerprint ? hashInput(...parts, cc.fingerprint) : hashInput(...parts);
+ // Per-phase sub-fingerprint; falls back to the whole-flow hash when absent
+ // (sub-flow inner states, or soundness fallback) — preserving pre-M6 behavior.
+ const fp = cc.phaseFp ?? cc.flowDefHash ?? "";
+ const fdh = cc.flowDefHash ?? "";
return {
- key: fold([`flow:${cc.flowName}`, `v2:flowdef:${cc.flowDefHash ?? ""}`, ...tail]),
+ key: fold([`flow:${cc.flowName}`, `v3:phasefp:${fp}`, ...tail]),
+ v2Key: fold([`flow:${cc.flowName}`, `v2:flowdef:${fdh}`, ...tail]),
+ bareKey: fold([`flow:${cc.flowName}`, `flowdef:${fdh}`, ...tail]),
legacyKey: fold([`flow:${cc.flowName}`, ...tail]),
- bareKey: fold([`flow:${cc.flowName}`, `flowdef:${cc.flowDefHash ?? ""}`, ...tail]),
};
}
@@ -1696,9 +1764,10 @@ export function cacheKeys(cc: PhaseCacheCtx, baseParts: string[]): CacheKeys {
* - "cross-run": within-run first, then the persistent cross-run store.
* On a cross-run hit, usage is zeroed and `cacheHit` records the source.
*
- * The cross-run read is THREE-TIER and READ-ONLY for fallback keys: it tries
- * `keys.key` (current `v2:flowdef:` shape) first, then `keys.bareKey` (pre-H1
- * bare `flowdef:`), then `keys.legacyKey` (pre-flowDefHash, no flowdef line).
+ * The cross-run read is FOUR-TIER and READ-ONLY for fallback keys: it tries
+ * `keys.key` (current `v3:phasefp:` shape) first, then `keys.v2Key` (pre-M6
+ * `v2:flowdef:`), then `keys.bareKey` (pre-H1 bare `flowdef:`), then
+ * `keys.legacyKey` (pre-flowDefHash, no flowdef line).
* A hit on ANY tier is restored as a cache hit; we do NOT write-through (no
* re-store under the new key) so the cache size stays stable and the legacy
* entry ages out naturally. See docs/internal/cache-migration.md.
@@ -1707,14 +1776,17 @@ function cachedPhase(cc: PhaseCacheCtx, keys: CacheKeys): PhaseState | null {
if (cc.scope === "off") return null;
if (cc.forceRerun) return null;
- // 1. within-run resume (fastest; always allowed unless scope is off)
+ // 1. within-run resume (fastest; always allowed unless scope is off). Flag
+ // it as a `run-only` cache hit so the run summary can count it as reused
+ // work (it spent no new tokens). The prior usage is preserved verbatim so
+ // the summary can report what the reuse would otherwise have cost.
if (cc.prior && cc.prior.status === "done" && cc.prior.inputHash === keys.key) {
- return { ...cc.prior, status: "done" };
+ return { ...cc.prior, status: "done", cacheHit: "run-only" };
}
- // 2. cross-run memoization (opt-in) — three-tier read-only fallback.
+ // 2. cross-run memoization (opt-in) — four-tier read-only fallback.
if (cc.scope === "cross-run") {
- for (const k of [keys.key, keys.bareKey, keys.legacyKey]) {
+ for (const k of [keys.key, keys.v2Key, keys.bareKey, keys.legacyKey]) {
const e = cc.store.get(k, cc.ttlMs);
if (!e) continue;
// If we stored the full PhaseState, restore it (preserving gate,
@@ -1895,6 +1967,22 @@ export interface RecomputeReport {
/** Phases in the frontier whose inputHash did NOT move → cached result
* reused, no re-execution (early cutoff). Empty in dry-run (unknowable). */
readonly cutoff: readonly string[];
+ /** Per-phase decision trace: WHY each phase was rerun / cut off / reused.
+ * The "explainable reactivity" layer — like React DevTools telling you why
+ * a component re-rendered. Additive; callers that ignore it are unaffected. */
+ readonly decisions: readonly RecomputeDecision[];
+}
+
+/** Why a single phase landed in its recompute outcome. */
+export interface RecomputeDecision {
+ readonly phaseId: string;
+ /** What happened (real run) or would happen (dry-run). */
+ readonly outcome: "rerun" | "cutoff" | "reused" | "failed";
+ /** Human-readable cause. */
+ readonly reason: string;
+ /** The upstream phase(s) that caused this outcome, when applicable
+ * (e.g. the changed upstreams that forced a rerun). */
+ readonly causedBy?: readonly string[];
}
/** Scan a flow for dependencies that cannot be observed through the readSet.
@@ -1946,6 +2034,30 @@ export async function recomputeTaskflow(
const allIds = Object.keys(newState.phases);
if (opts.dryRun) {
+ // Explain each phase WITHOUT executing: a frontier phase "may rerun"
+ // because it (transitively) reads a changed seed; everything else is
+ // reused as unreachable. We name the in-frontier upstream(s) as the cause.
+ const seedSet0 = new Set(seeds);
+ const upstreamsOf = (id: string): string[] => {
+ const observed = (newState.phases[id]?.reads ?? []).map((r) => r.stepId).filter((u) => u !== id);
+ const decl = (declared.get(id) ?? []).filter((u) => u !== id);
+ return [...new Set([...observed, ...decl])];
+ };
+ const decisions: RecomputeDecision[] = allIds.map((id) => {
+ if (!frontier.has(id)) {
+ return { phaseId: id, outcome: "reused", reason: "not reachable from any changed seed" };
+ }
+ if (seedSet0.has(id)) {
+ return { phaseId: id, outcome: "rerun", reason: "forced by recompute request (seed)" };
+ }
+ const causes = upstreamsOf(id).filter((u) => frontier.has(u));
+ return {
+ phaseId: id,
+ outcome: "rerun",
+ reason: "reads a phase in the stale frontier; may re-run if that upstream's output moves",
+ causedBy: causes.length ? causes : undefined,
+ };
+ });
return {
report: {
dryRun: true,
@@ -1954,6 +2066,7 @@ export async function recomputeTaskflow(
rerun: [...frontier],
reused: allIds.filter((id) => !frontier.has(id)),
cutoff: [],
+ decisions,
},
state: newState,
};
@@ -2003,6 +2116,11 @@ export async function recomputeTaskflow(
.filter((id) => frontier.has(id));
const rerun: string[] = [];
const cutoff: string[] = [];
+ const decisions: RecomputeDecision[] = [];
+ // Phases whose OUTPUT actually moved this recompute (seed forced, or result
+ // changed). Used to attribute a downstream rerun to the specific upstream(s)
+ // that changed — the "why" of the decision trace.
+ const outputMoved = new Set();
const noop = () => {};
let aborted = false;
for (const id of order) {
@@ -2015,17 +2133,50 @@ export async function recomputeTaskflow(
const phase = newState.def.phases.find((p) => p.id === id);
if (!phase) continue;
const before = newState.phases[id]?.inputHash;
- const execOpts = seedSet.has(id) ? { forceRerun: true } : undefined;
+ const isSeed = seedSet.has(id);
+ const execOpts = isSeed ? { forceRerun: true } : undefined;
+ // The upstream(s) of this phase whose output moved — the cause of a rerun.
+ const changedUpstreams = depsFor(id).filter((u) => outputMoved.has(u));
try {
const ps = await executePhase(phase, newState, deps, newState.phases[id], noop, 0, execOpts);
newState.phases[id] = ps;
// A phase counts as "rerun" if it was a forced seed OR its result moved;
// otherwise it hit its cache (inputHash unchanged) → early cutoff.
- if (seedSet.has(id) || ps.inputHash !== before) rerun.push(id);
- else cutoff.push(id);
+ if (isSeed || ps.inputHash !== before) {
+ rerun.push(id);
+ outputMoved.add(id);
+ decisions.push(
+ isSeed
+ ? { phaseId: id, outcome: "rerun", reason: "forced by recompute request (seed)" }
+ : {
+ phaseId: id,
+ outcome: "rerun",
+ reason: "input changed — an upstream's output moved",
+ causedBy: changedUpstreams.length ? changedUpstreams : undefined,
+ },
+ );
+ } else {
+ cutoff.push(id);
+ decisions.push({
+ phaseId: id,
+ outcome: "cutoff",
+ reason: "input unchanged — upstream(s) re-ran but produced identical output (early cutoff)",
+ causedBy: depsFor(id).filter((u) => frontier.has(u)).length
+ ? depsFor(id).filter((u) => frontier.has(u))
+ : undefined,
+ });
+ }
} catch {
// A failing recompute phase is recorded as rerun (it was attempted).
rerun.push(id);
+ outputMoved.add(id);
+ decisions.push({ phaseId: id, outcome: "failed", reason: "re-execution attempted but the phase failed" });
+ }
+ }
+ // Frontier-external phases were never touched — record them as reused.
+ for (const id of allIds) {
+ if (!frontier.has(id)) {
+ decisions.push({ phaseId: id, outcome: "reused", reason: "not reachable from any changed seed" });
}
}
return {
@@ -2036,6 +2187,7 @@ export async function recomputeTaskflow(
rerun,
reused: allIds.filter((id) => !frontier.has(id)),
cutoff,
+ decisions,
},
state: newState,
};
@@ -2099,6 +2251,27 @@ async function runTaskflowLayers(state: RunState, deps: RuntimeDeps): Promise = {};
+ for (const p of def.phases) {
+ try {
+ map[p.id] = (await phaseFingerprint(def, p.id)) ?? whole;
+ } catch {
+ map[p.id] = whole; // fail-open → whole-flow scope
+ }
+ }
+ state.phaseFingerprints = map;
+ }
+
state.status = "running";
safeEmit(deps, state);
@@ -2238,5 +2411,6 @@ async function runTaskflowLayers(state: RunState, deps: RuntimeDeps): Promise [p.id, p]));
+ const seen = new Set();
+ const queue: string[] = [];
+ const seed = byId.get(phaseId);
+ if (seed) for (const d of dependenciesOf(seed)) queue.push(d);
+ while (queue.length) {
+ const id = queue.shift()!;
+ if (seen.has(id)) continue;
+ if (!byId.has(id)) continue; // unknown dep — validation reports elsewhere
+ seen.add(id);
+ const dep = byId.get(id)!;
+ for (const d of dependenciesOf(dep)) {
+ if (!seen.has(d)) queue.push(d);
+ }
+ }
+ return Array.from(seen).sort();
+}
+
/** Topologically ordered layers; phases in the same layer can run concurrently. */
export function topoLayers(phases: Phase[]): Phase[][] {
const byId = new Map(phases.map((p) => [p.id, p]));
diff --git a/extensions/store.ts b/extensions/store.ts
index aa464d1..881f2e3 100644
--- a/extensions/store.ts
+++ b/extensions/store.ts
@@ -42,10 +42,11 @@ export interface PhaseState {
model?: string;
error?: string;
inputHash?: string;
- /** When this result was served from cache: 'cross-run' for the persistent
- * cross-run store. (Within-run resume reuses prior state verbatim and is not
- * flagged here.) */
- cacheHit?: "cross-run";
+ /** When this result was served from cache instead of executed:
+ * 'cross-run' = restored from the persistent cross-run store;
+ * 'run-only' = within-run resume (a prior attempt with the same inputHash).
+ * A phase with this set spent no new tokens this run. */
+ cacheHit?: "cross-run" | "run-only";
startedAt?: number;
endedAt?: number;
/** Live fan-out progress for map/parallel phases. */
@@ -114,6 +115,13 @@ export interface RunState {
* recompute derives this fresh from `def` so old runs (pre-H1) also get
* union semantics. */
declaredDeps?: Record;
+ /** Per-phase structural sub-fingerprints (M6). Computed once per run
+ * alongside `flowDefHash`. Each value is either a precise per-phase hash
+ * (when sound) or the whole-flow `flowDefHash` (fallback for
+ * shareContext / `flow` phases). Folded into the cross-run cache key as
+ * `v3:phasefp:` so editing phase B invalidates only B + its
+ * transitive dependents. Audit/resume only — recompute derives fresh. */
+ phaseFingerprints?: Record;
}
// ---------------------------------------------------------------------------
diff --git a/test/cache-migration.test.ts b/test/cache-migration.test.ts
index d4182a5..1e6d276 100644
--- a/test/cache-migration.test.ts
+++ b/test/cache-migration.test.ts
@@ -49,11 +49,14 @@ function countingRunner(counter: { n: number }): RuntimeDeps["runTask"] {
}
/** Build a minimal PhaseCacheCtx matching what executeTaskflow constructs for
- * a cross-run agent phase, so we can compute the exact legacy/bare keys to
- * pre-seed. Derives flowDefHash by running compileTaskflowToIR once. */
+ * a cross-run agent phase, so we can compute the exact legacy/bare/v2 keys to
+ * pre-seed. Derives flowDefHash + per-phase sub-fingerprint by running
+ * compileTaskflowToIR + phaseFingerprint once (mirrors the runtime). */
async function ccFor(def: Taskflow, cwd: string, store: CacheStore, phaseId: string): Promise {
- const { compileTaskflowToIR } = await import("../extensions/flowir/index.ts");
+ const { compileTaskflowToIR, phaseFingerprint } = await import("../extensions/flowir/index.ts");
const ir = await compileTaskflowToIR(def);
+ const fdh = ir.hash;
+ const subfp = (await phaseFingerprint(def, phaseId)) ?? fdh ?? "";
return {
scope: "cross-run",
fingerprint: "",
@@ -62,7 +65,8 @@ async function ccFor(def: Taskflow, cwd: string, store: CacheStore, phaseId: str
phaseId,
flowName: def.name,
runId: "seed",
- flowDefHash: ir.hash,
+ flowDefHash: fdh,
+ phaseFp: subfp,
};
}
@@ -70,7 +74,7 @@ async function ccFor(def: Taskflow, cwd: string, store: CacheStore, phaseId: str
// Key shape: new key uses v2:flowdef prefix; legacy/bare differ.
// ---------------------------------------------------------------------------
-test("cacheKeys: key, legacyKey, bareKey are all distinct", async () => {
+test("cacheKeys: key, v2Key, bareKey, legacyKey are all distinct (M6 4-tier)", async () => {
const dir = tmpDir();
const store = new CacheStore(dir);
const def: Taskflow = {
@@ -80,10 +84,13 @@ test("cacheKeys: key, legacyKey, bareKey are all distinct", async () => {
const cc = await ccFor(def, dir, store, "p");
// baseParts must match what the agent branch uses: [phase.id, agentName, model, fullTask]
const ck = cacheKeys(cc, ["p", "a", "", "fixed"]);
- assert.ok(ck.key !== ck.legacyKey, "v2 key differs from legacy (no-flowdef)");
- assert.ok(ck.key !== ck.bareKey, "v2 key differs from bare (unversioned flowdef)");
- assert.ok(ck.legacyKey !== ck.bareKey, "legacy differs from bare");
- assert.match(ck.key, /^[0-9a-f]+$/);
+ assert.ok(ck.key !== ck.v2Key, "v3 key differs from v2 (per-phase subfp vs whole-flow)");
+ assert.ok(ck.key !== ck.bareKey, "v3 key differs from bare (unversioned flowdef)");
+ assert.ok(ck.key !== ck.legacyKey, "v3 key differs from legacy (no-flowdef)");
+ assert.ok(ck.v2Key !== ck.bareKey, "v2 differs from bare");
+ assert.ok(ck.v2Key !== ck.legacyKey, "v2 differs from legacy");
+ assert.ok(ck.bareKey !== ck.legacyKey, "bare differs from legacy");
+ assert.match(ck.key, /^[0-9a-f]+$/); // all four are hashInput hex digests
fs.rmSync(dir, { recursive: true, force: true });
});
@@ -221,11 +228,14 @@ test("cache migration: identical re-run is free (v2 write round-trips)", async (
test("cache migration: structural change invalidates (flowdef hash differs)", async () => {
const dir = tmpDir();
const store = new CacheStore(dir);
+ // M6: only a structural change WITHIN a phase's transitive closure
+ // invalidates it. Adding an unrelated independent phase must NOT. So `q`
+ // is made a dependency of `p` — adding it moves p's sub-fingerprint.
const mk = (extra: boolean): Taskflow => ({
name: "struct-change",
phases: extra
? [
- { id: "p", type: "agent", agent: "a", task: "fixed", cache: { scope: "cross-run" }, final: true },
+ { id: "p", type: "agent", agent: "a", task: "fixed", cache: { scope: "cross-run" }, dependsOn: ["q"], final: true },
{ id: "q", type: "agent", agent: "a", task: "extra" },
]
: [{ id: "p", type: "agent", agent: "a", task: "fixed", cache: { scope: "cross-run" }, final: true }],
@@ -235,10 +245,10 @@ test("cache migration: structural change invalidates (flowdef hash differs)", as
await executeTaskflow(mkState(mk(false), dir), deps);
assert.equal(counter.n, 1);
- // Different structure (extra phase) → different flowDefHash → different v2 key → miss.
- // (q also runs, so counter increments by 2.)
+ // Adding `q` (now in p's closure) → p's sub-fingerprint changes → v3 key
+ // differs → miss. (q also runs, so counter increments by 2.)
await executeTaskflow(mkState(mk(true), dir), deps);
- assert.equal(counter.n, 3, "structural change → miss on p (and q runs)");
+ assert.equal(counter.n, 3, "structural change in p's closure → miss on p (and q runs)");
fs.rmSync(dir, { recursive: true, force: true });
});
diff --git a/test/cache-phasefp.test.ts b/test/cache-phasefp.test.ts
new file mode 100644
index 0000000..ce23446
--- /dev/null
+++ b/test/cache-phasefp.test.ts
@@ -0,0 +1,274 @@
+import assert from "node:assert/strict";
+import * as fs from "node:fs";
+import * as os from "node:os";
+import * as path from "node:path";
+import { test } from "node:test";
+import type { AgentConfig } from "../extensions/agents.ts";
+import { CacheStore } from "../extensions/cache.ts";
+import { phaseFingerprint } from "../extensions/flowir/index.ts";
+import { executeTaskflow, cacheKeys, type PhaseCacheCtx, type RuntimeDeps } from "../extensions/runtime.ts";
+import type { RunResult, RunOptions } from "../extensions/runner.ts";
+import type { Taskflow } from "../extensions/schema.ts";
+import type { RunState } from "../extensions/store.ts";
+import { emptyUsage } from "../extensions/usage.ts";
+
+// ---------------------------------------------------------------------------
+// helpers (minimal set, mirroring test/cache.test.ts)
+// ---------------------------------------------------------------------------
+
+const AGENTS: AgentConfig[] = [
+ { name: "a", description: "test agent", systemPrompt: "", source: "user", filePath: "" },
+];
+
+function tmpDir(): string {
+ return fs.mkdtempSync(path.join(os.tmpdir(), "tf-phasefp-"));
+}
+
+function mkState(def: Taskflow, cwd: string): RunState {
+ return {
+ runId: `run-${Math.random().toString(36).slice(2, 8)}`,
+ flowName: def.name,
+ def,
+ args: {},
+ status: "running",
+ phases: {},
+ createdAt: Date.now(),
+ updatedAt: Date.now(),
+ cwd,
+ };
+}
+
+function countingRunner(counter: { n: number }): RuntimeDeps["runTask"] {
+ return async (_cwd, _agents, agentName, task, _o: RunOptions): Promise => {
+ counter.n++;
+ return {
+ agent: agentName,
+ task,
+ exitCode: 0,
+ output: `out:${task}#${counter.n}`,
+ stderr: "",
+ usage: { ...emptyUsage(), output: 10, cost: 0.001, turns: 1 },
+ stopReason: "end",
+ };
+ };
+}
+
+// ===========================================================================
+// Unit tests for phaseFingerprint (soundness gate + determinism)
+// ===========================================================================
+
+test("phaseFingerprint: returns undefined when def.contextSharing is true (soundness gate)", async () => {
+ const def: Taskflow = {
+ name: "sharing-flow",
+ contextSharing: true,
+ phases: [{ id: "p", type: "agent", agent: "a", task: "t", cache: { scope: "cross-run" }, final: true }],
+ };
+ assert.equal(await phaseFingerprint(def, "p"), undefined);
+});
+
+test("phaseFingerprint: returns undefined when a closure member has shareContext", async () => {
+ const def: Taskflow = {
+ name: "sharing-closure",
+ phases: [
+ { id: "scout", type: "agent", agent: "a", task: "scan", shareContext: true },
+ { id: "p", type: "agent", agent: "a", task: "use {steps.scout.output}", dependsOn: ["scout"], cache: { scope: "cross-run" }, final: true },
+ ],
+ };
+ // p transitively depends on scout (shareContext) → fallback.
+ assert.equal(await phaseFingerprint(def, "p"), undefined);
+ // scout itself has shareContext → fallback.
+ assert.equal(await phaseFingerprint(def, "scout"), undefined);
+});
+
+test("phaseFingerprint: returns undefined when a closure member is a flow phase", async () => {
+ const def: Taskflow = {
+ name: "flow-closure",
+ phases: [
+ { id: "sub", type: "flow", use: "some-saved-flow" },
+ { id: "p", type: "agent", agent: "a", task: "use {steps.sub.output}", dependsOn: ["sub"], cache: { scope: "cross-run" }, final: true },
+ ],
+ } as Taskflow;
+ // p transitively depends on a flow phase → fallback.
+ assert.equal(await phaseFingerprint(def, "p"), undefined);
+ // the flow phase itself → fallback.
+ assert.equal(await phaseFingerprint(def, "sub"), undefined);
+});
+
+test("phaseFingerprint: deterministic + changes when an included field changes", async () => {
+ const mk = (task: string): Taskflow => ({
+ name: "det",
+ phases: [{ id: "p", type: "agent", agent: "a", task, cache: { scope: "cross-run" }, final: true }],
+ });
+ const a1 = await phaseFingerprint(mk("t1"), "p");
+ const a2 = await phaseFingerprint(mk("t1"), "p");
+ const b = await phaseFingerprint(mk("t2"), "p");
+ assert.equal(a1, a2, "stable across calls");
+ assert.notEqual(a1, b, "changes when task text changes");
+ assert.match(a1!, /^[0-9a-f]+$/);
+});
+
+test("phaseFingerprint: cache policy field does NOT affect the sub-fingerprint", async () => {
+ // cache.scope/ttl/fingerprint reach the key via other paths; the sub-fingerprint
+ // must be invariant to them (else changing TTL would not invalidate via the
+ // dedicated expiry path but perturb the structural hash).
+ const mk = (cache: Taskflow["phases"][number]["cache"]): Taskflow => ({
+ name: "policy-inv",
+ phases: [{ id: "p", type: "agent", agent: "a", task: "t", cache, final: true }],
+ });
+ const a = await phaseFingerprint(mk({ scope: "cross-run" }), "p");
+ const b = await phaseFingerprint(mk({ scope: "cross-run", ttl: "30m" }), "p");
+ const c = await phaseFingerprint(mk({ scope: "cross-run", fingerprint: ["file:x"] }), "p");
+ assert.equal(a, b);
+ assert.equal(a, c);
+});
+
+test("phaseFingerprint: adding an independent phase does NOT move a phase's sub-fingerprint", async () => {
+ const base: Taskflow = {
+ name: "indep",
+ phases: [{ id: "p", type: "agent", agent: "a", task: "t", cache: { scope: "cross-run" }, final: true }],
+ };
+ const withExtra: Taskflow = {
+ name: "indep",
+ phases: [
+ { id: "p", type: "agent", agent: "a", task: "t", cache: { scope: "cross-run" }, final: true },
+ { id: "q", type: "agent", agent: "a", task: "extra" },
+ ],
+ };
+ // q is NOT in p's closure → p's sub-fingerprint is unchanged.
+ assert.equal(await phaseFingerprint(base, "p"), await phaseFingerprint(withExtra, "p"));
+});
+
+// ===========================================================================
+// Integration tests through the runtime (the Test Matrix)
+// ===========================================================================
+
+test("phasefp: editing phase B does NOT invalidate independent phase A", async () => {
+ const dir = tmpDir();
+ const store = new CacheStore(dir);
+ const mk = (bTask: string): Taskflow => ({
+ name: "indep-edit",
+ phases: [
+ { id: "scout", type: "agent", agent: "a", task: "scan", cache: { scope: "cross-run" } },
+ { id: "A", type: "agent", agent: "a", task: "A uses {steps.scout.output}", dependsOn: ["scout"], cache: { scope: "cross-run" } },
+ { id: "B", type: "agent", agent: "a", task: bTask, dependsOn: ["scout"], cache: { scope: "cross-run" }, final: true },
+ ],
+ });
+ const counter = { n: 0 };
+ const deps: RuntimeDeps = { cwd: dir, agents: AGENTS, runTask: countingRunner(counter), cacheStore: store };
+
+ await executeTaskflow(mkState(mk("B original"), dir), deps);
+ assert.equal(counter.n, 3, "scout + A + B run once");
+ // Edit ONLY B's task text. scout + A are unaffected (their closures don't include B).
+ const r2 = await executeTaskflow(mkState(mk("B edited"), dir), deps);
+ assert.equal(counter.n, 4, "only B re-runs; scout + A hit");
+ assert.equal(r2.state.phases.scout.cacheHit, "cross-run");
+ assert.equal(r2.state.phases.A.cacheHit, "cross-run");
+ assert.equal(r2.state.phases.B.cacheHit, undefined, "B missed (its task changed)");
+ fs.rmSync(dir, { recursive: true, force: true });
+});
+
+test("phasefp: editing phase B invalidates B and its transitive dependents", async () => {
+ const dir = tmpDir();
+ const store = new CacheStore(dir);
+ const mk = (bTask: string): Taskflow => ({
+ name: "transitive",
+ phases: [
+ { id: "scout", type: "agent", agent: "a", task: "scan", cache: { scope: "cross-run" } },
+ { id: "B", type: "agent", agent: "a", task: bTask, dependsOn: ["scout"], cache: { scope: "cross-run" } },
+ { id: "C", type: "agent", agent: "a", task: "C uses {steps.B.output}", dependsOn: ["B"], cache: { scope: "cross-run" } },
+ { id: "A", type: "agent", agent: "a", task: "A uses {steps.scout.output}", dependsOn: ["scout"], cache: { scope: "cross-run" }, final: true },
+ ],
+ });
+ const counter = { n: 0 };
+ const deps: RuntimeDeps = { cwd: dir, agents: AGENTS, runTask: countingRunner(counter), cacheStore: store };
+
+ await executeTaskflow(mkState(mk("B original"), dir), deps);
+ assert.equal(counter.n, 4, "scout + B + C + A run once");
+ // Edit B's task. B's closure changes → B misses. C depends on B → C's closure
+ // (which includes B) changes → C misses. scout + A are unaffected.
+ const r2 = await executeTaskflow(mkState(mk("B edited"), dir), deps);
+ assert.equal(counter.n, 6, "B + C re-run; scout + A hit");
+ assert.equal(r2.state.phases.scout.cacheHit, "cross-run");
+ assert.equal(r2.state.phases.A.cacheHit, "cross-run", "A independent of B → hit");
+ assert.equal(r2.state.phases.B.cacheHit, undefined, "B missed");
+ assert.equal(r2.state.phases.C.cacheHit, undefined, "C (transitive dependent) missed");
+ fs.rmSync(dir, { recursive: true, force: true });
+});
+
+test("phasefp: pre-v3 (v2) entry still hits — no miss-storm", async () => {
+ const dir = tmpDir();
+ const store = new CacheStore(dir);
+ const def: Taskflow = {
+ name: "v2-fallback",
+ phases: [{ id: "p", type: "agent", agent: "a", task: "fixed", cache: { scope: "cross-run" }, final: true }],
+ };
+ // Compute the v2 key the runtime will look up, and pre-seed it.
+ const { compileTaskflowToIR } = await import("../extensions/flowir/index.ts");
+ const ir = await compileTaskflowToIR(def);
+ const cc: PhaseCacheCtx = {
+ scope: "cross-run", fingerprint: "", store, prior: undefined,
+ phaseId: "p", flowName: def.name, runId: "old",
+ flowDefHash: ir.hash, phaseFp: (await phaseFingerprint(def, "p")) ?? ir.hash,
+ thinking: undefined, tools: undefined, preRead: "",
+ };
+ const ck = cacheKeys(cc, ["p", "a", "", "fixed"]);
+ store.put({ key: ck.v2Key, createdAt: Date.now(), output: "V2-OUTPUT", model: "v2-model", state: undefined, flowName: def.name, phaseId: "p", runId: "old" });
+
+ const counter = { n: 0 };
+ const deps: RuntimeDeps = { cwd: dir, agents: AGENTS, runTask: countingRunner(counter), cacheStore: store };
+ const r = await executeTaskflow(mkState(def, dir), deps);
+ assert.equal(counter.n, 0, "v2 entry must hit via fallback — no execution");
+ assert.equal(r.state.phases.p.cacheHit, "cross-run");
+ assert.equal(r.state.phases.p.output, "V2-OUTPUT");
+ fs.rmSync(dir, { recursive: true, force: true });
+});
+
+test("phasefp: two structurally-different flows do not collide", async () => {
+ const dir = tmpDir();
+ const store = new CacheStore(dir);
+ const mk = (extra: boolean): Taskflow => ({
+ name: "collide",
+ phases: extra
+ ? [
+ { id: "p", type: "agent", agent: "a", task: "same", cache: { scope: "cross-run" }, dependsOn: ["q"], final: true },
+ { id: "q", type: "agent", agent: "a", task: "extra" },
+ ]
+ : [{ id: "p", type: "agent", agent: "a", task: "same", cache: { scope: "cross-run" }, final: true }],
+ });
+ const counter = { n: 0 };
+ const deps: RuntimeDeps = { cwd: dir, agents: AGENTS, runTask: countingRunner(counter), cacheStore: store };
+
+ await executeTaskflow(mkState(mk(false), dir), deps);
+ assert.equal(counter.n, 1);
+ // Same name + phaseId + task, but p's closure differs (q added as a dep) →
+ // different sub-fingerprint → no cross-flow collision.
+ await executeTaskflow(mkState(mk(true), dir), deps);
+ assert.equal(counter.n, 3, "p misses (closure changed) and q runs");
+ fs.rmSync(dir, { recursive: true, force: true });
+});
+
+test("phasefp: shareContext falls back to whole-flow invalidation", async () => {
+ const dir = tmpDir();
+ const store = new CacheStore(dir);
+ const mk = (bTask: string): Taskflow => ({
+ name: "sharing-fallback",
+ contextSharing: true,
+ phases: [
+ { id: "A", type: "agent", agent: "a", task: "A", cache: { scope: "cross-run" } },
+ { id: "B", type: "agent", agent: "a", task: bTask, cache: { scope: "cross-run" }, final: true },
+ ],
+ });
+ const counter = { n: 0 };
+ const deps: RuntimeDeps = { cwd: dir, agents: AGENTS, runTask: countingRunner(counter), cacheStore: store };
+
+ await executeTaskflow(mkState(mk("B original"), dir), deps);
+ assert.equal(counter.n, 2, "A + B run once");
+ // With contextSharing, per-phase soundness cannot be guaranteed → both
+ // phases fall back to the whole-flow flowDefHash. Editing B moves the
+ // whole-flow hash → A ALSO misses (whole-flow invalidation, not per-phase).
+ const r2 = await executeTaskflow(mkState(mk("B edited"), dir), deps);
+ assert.equal(counter.n, 4, "both A and B re-run — whole-flow hash moved");
+ assert.equal(r2.state.phases.A.cacheHit, undefined, "A NOT reused — fallback to whole-flow");
+ assert.equal(r2.state.phases.B.cacheHit, undefined, "B missed (its task changed)");
+ fs.rmSync(dir, { recursive: true, force: true });
+});
diff --git a/test/runtime.test.ts b/test/runtime.test.ts
index c1a9840..663f493 100644
--- a/test/runtime.test.ts
+++ b/test/runtime.test.ts
@@ -259,14 +259,14 @@ test("runtime: resume skips cached completed phases", async () => {
const state = mkState(def);
// Pre-seed phase one as already done with the matching input hash.
const { hashInput } = await import("../extensions/store.ts");
- const { flowDefHash } = await import("../extensions/flowir/hash.ts");
- const fh = await flowDefHash(def);
+ const { phaseFingerprint } = await import("../extensions/flowir/index.ts");
+ const subfpOne = (await phaseFingerprint(def, "one")) ?? "";
state.phases.one = {
id: "one",
status: "done",
output: "out:start",
- // Must match runtime cacheKey(): flow name + flowDefHash + base parts + thinking + tools + ctx.
- inputHash: hashInput(`flow:${def.name}`, `v2:flowdef:${fh}`, "one", "a", "", "start", "think:", "tools:[]", "ctx:"),
+ // Must match runtime cacheKey(): flow name + v3:phasefp sub-fingerprint + base parts + thinking + tools + ctx.
+ inputHash: hashInput(`flow:${def.name}`, `v3:phasefp:${subfpOne}`, "one", "a", "", "start", "think:", "tools:[]", "ctx:"),
usage: emptyUsage(),
};
@@ -287,16 +287,17 @@ test("runtime: resume caches a completed reduce phase (unified inputHash)", asyn
const record: string[] = [];
const runner = mockRunner((t) => `o:${t}`, { record });
const { hashInput } = await import("../extensions/store.ts");
- const { flowDefHash } = await import("../extensions/flowir/hash.ts");
- const fh = await flowDefHash(def);
+ const { phaseFingerprint } = await import("../extensions/flowir/index.ts");
+ const subfpX = (await phaseFingerprint(def, "x")) ?? "";
+ const subfpSum = (await phaseFingerprint(def, "sum")) ?? "";
const state = mkState(def);
- state.phases.x = { id: "x", status: "done", output: "o:tx", inputHash: hashInput(`flow:${def.name}`, `v2:flowdef:${fh}`, "x", "a", "", "tx", "think:", "tools:[]", "ctx:"), usage: emptyUsage() };
- // reduce cache key has the same shape as agent/gate (flow + flowDefHash + base parts + thinking + tools).
+ state.phases.x = { id: "x", status: "done", output: "o:tx", inputHash: hashInput(`flow:${def.name}`, `v3:phasefp:${subfpX}`, "x", "a", "", "tx", "think:", "tools:[]", "ctx:"), usage: emptyUsage() };
+ // reduce cache key has the same shape as agent/gate (flow + v3:phasefp + base parts + thinking + tools).
state.phases.sum = {
id: "sum",
status: "done",
output: "o:combine o:tx",
- inputHash: hashInput(`flow:${def.name}`, `v2:flowdef:${fh}`, "sum", "a", "", "combine o:tx", "think:", "tools:[]", "ctx:"),
+ inputHash: hashInput(`flow:${def.name}`, `v3:phasefp:${subfpSum}`, "sum", "a", "", "combine o:tx", "think:", "tools:[]", "ctx:"),
usage: emptyUsage(),
};
const res = await executeTaskflow(state, baseDeps(runner));
From 31b2d49c49c834b18aaa599f876906cc57ad8c1e Mon Sep 17 00:00:00 2001
From: heggria
Date: Thu, 25 Jun 2026 20:35:16 +0800
Subject: [PATCH 2/5] feat(cache): per-item cross-run caching for map phases
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Add per-item cross-run memoization to the map phase so that when one of N
items changes between runs, only that item re-executes (N-1 cache hits) —
while preserving the existing whole-map fast path and all soundness fallbacks.
Mechanism:
- runFanout accepts an optional perItem hook. Before spawning a subagent for
an item, it consults cachedPhase with a per-item key; a hit returns a
0-token synthesized RunResult (stopReason "cache-hit") that flows through
mergePhaseState as a normal successful item. Successful fresh items are
recorded per-item for future runs.
- Per-item keys fold [phase.id, it.agent, model, it.task] + the existing
v3:phasefp/flowName/fingerprint/thinking/tools/preRead tail. Folding
it.agent (arbiter fix) prevents a stale cross-agent hit when only
phase.agent changes.
- Whole-map lookup stays first (fast path); per-item engages only on a
whole-map miss. A trailing whole-map record keeps the fast path warm.
Soundness gates (per-item disabled -> whole-map only):
- cross-run scope required (run-only/"off" have no persistent store)
- shareContext / flow-wide contextSharing disabled (items may read sibling
blackboard writes outside declared deps)
- inside a runtime-generated sub-flow (def: frame — untrusted)
- undefined phaseFingerprint is NOT a blocker (cacheKeys falls back to
flowDefHash, which is stable for a fixed def)
Correctness:
- merged output labels are positionally aligned with over ([k/N] using
results.length), budget-skipped items filtered to null; cache-hit items
keep their positional slot
- cached items contribute emptyUsage -> partial-hit cost == re-executed item only
- failed and budget-skipped items are never recorded per-item
- fail-open: any cache read/write error degrades to executing the item
Backward-compat: pre-existing whole-map entries (any tier) still hit via
cachedPhase's 4-tier read-only fallback; the whole-map key format is unchanged.
Tests: new test/cache-peritem.test.ts (11 tests) covering the Test Matrix —
partial reuse, positional alignment, duplicate sharing, shareContext/def-frame
fallbacks, whole-map fast path, revert, usage/subProgress, failed/skipped
non-caching, and the agent-invalidation arbiter fix.
---
extensions/runtime.ts | 117 ++++++++-
skills/taskflow/SKILL.md | 48 ++++
test/cache-peritem.test.ts | 491 +++++++++++++++++++++++++++++++++++++
3 files changed, 652 insertions(+), 4 deletions(-)
create mode 100644 test/cache-peritem.test.ts
diff --git a/extensions/runtime.ts b/extensions/runtime.ts
index 49c3eae..76c4a99 100644
--- a/extensions/runtime.ts
+++ b/extensions/runtime.ts
@@ -169,6 +169,31 @@ function resultToPhaseState(id: string, r: RunResult, inputHash: string, parseJs
};
}
+/**
+ * Synthesize a 0-token `RunResult` from a cached per-item `PhaseState` so a
+ * cross-run per-item cache hit flows through `mergePhaseState` as a normal
+ * successful fan-out item. `stopReason: "cache-hit"` is NOT in `isFailed`'s
+ * failure set (only "error"/"aborted"/non-zero exit), so the item counts as
+ * success. Usage is `emptyUsage()` — a cached item spent no new tokens this
+ * run, so `mergePhaseState`'s `aggregateUsage` charges nothing for it.
+ *
+ * Used only by the `map` per-item cache path (see `runFanout`). Fail-open by
+ * construction: this is only reached AFTER a successful `cachedPhase` lookup,
+ * so `ps.output` is always present.
+ */
+function phaseStateToRunResult(ps: PhaseState, it: { agent: string; task: string }): RunResult {
+ return {
+ agent: it.agent,
+ task: it.task,
+ exitCode: 0,
+ output: ps.output ?? "",
+ stderr: "",
+ usage: emptyUsage(),
+ model: ps.model,
+ stopReason: "cache-hit",
+ };
+}
+
/** Convert observed read refs (e.g. "steps.scout.output") into a structured
* readSet keyed by upstream phase id, tagging each with the version
* (= inputHash) that was current when read. Only `steps.*` refs are upstream
@@ -326,12 +351,20 @@ function mergePhaseState(
const model = ran.find((r) => r.model !== undefined)?.model;
// Combine outputs as a labelled list; also expose a JSON array of outputs.
// For failed items, use the error message instead of the useless placeholder.
- const combinedText = ran
+ // Labels are positionally aligned to the ORIGINAL `over` array: we iterate
+ // over ALL results (including budget-skipped, which are filtered to null) and
+ // use `results.length` as N, so item k's label reads `[k/N]` matching its
+ // position in `over` — not its rank among non-skipped items. Per-item cache
+ // hits (`stopReason: "cache-hit"`) are not budget-skipped, so they keep their
+ // original positional label.
+ const combinedText = results
.map((r, i) => {
- const label = `### [${i + 1}/${ran.length}] ${r.agent}${isFailed(r) ? " (failed)" : ""}`;
+ if (r.stopReason === "budget-skipped") return null;
+ const label = `### [${i + 1}/${results.length}] ${r.agent}${isFailed(r) ? " (failed)" : ""}`;
const content = isFailed(r) ? (r.errorMessage || r.stderr || r.output) : r.output;
return `${label}\n\n${content}`;
})
+ .filter((x): x is string => x !== null)
.join("\n\n---\n\n");
// Only successful runs feed the parsed JSON array (no error/skip strings).
const jsonArray = parseJson ? ran.filter((r) => !isFailed(r)).map((r) => safeParse(r.output) ?? r.output) : undefined;
@@ -870,7 +903,14 @@ async function executePhaseInner(
const parseJson = phase.output === "json";
// Runs a list of sub-tasks with live fan-out progress + aggregate live usage/activity.
- const runFanout = async (items: Array<{ agent: string; task: string }>): Promise => {
+ // `perItem` (map only) enables per-item cross-run caching: each item is looked
+ // up in the cache before spawning a subagent, and a successful fresh item is
+ // recorded so a later run with that item unchanged hits per-item. When
+ // `perItem` is undefined (parallel, or non-cacheable maps) the path is inert.
+ const runFanout = async (
+ items: Array<{ agent: string; task: string }>,
+ perItem?: { keyOf: (idx: number) => CacheKeys | null },
+ ): Promise => {
let done = 0;
let running = 0;
let failed = 0;
@@ -904,6 +944,28 @@ async function executePhaseInner(
stopReason: "budget-skipped",
} satisfies RunResult;
}
+ // Per-item cross-run cache lookup (map only). A hit synthesizes a 0-token
+ // RunResult and returns immediately — the item never spawns a subagent and
+ // never reaches the ctx_spawn drain below (a cached item can't have queued
+ // new spawns). Fail-open: any error in the lookup path degrades to executing.
+ if (perItem) {
+ try {
+ const ckItem = perItem.keyOf(idx);
+ if (ckItem) {
+ const hit = cachedPhase(cc, ckItem);
+ if (hit) {
+ done++;
+ const synth = phaseStateToRunResult(hit, it);
+ liveUsages[idx] = emptyUsage();
+ if (hit.model) latestModel = hit.model;
+ refresh();
+ return synth;
+ }
+ }
+ } catch {
+ /* fail-open: a cache read error must never sink the item */
+ }
+ }
running++;
refresh();
if (ctxDir) {
@@ -919,6 +981,23 @@ async function executePhaseInner(
done++;
if (isFailed(r)) failed++;
liveUsages[idx] = r.usage;
+ // Per-item cross-run cache record (map only): persist a successful fresh
+ // item so a later run with this item unchanged hits per-item instead of
+ // re-running. Failed and budget-skipped items are never cached (a stale
+ // failure would be served on the next run). Fail-open: a write error never
+ // sinks the item — the fresh `r` is already in hand and flows downstream.
+ if (perItem && !isFailed(r) && r.stopReason !== "budget-skipped") {
+ try {
+ const ckItem = perItem.keyOf(idx);
+ if (ckItem) {
+ const ccItem: PhaseCacheCtx = { ...cc, phaseId: `${phase.id}#item${idx}` };
+ const itemPs = resultToPhaseState(`${phase.id}#item${idx}`, r, ckItem.key, parseJson);
+ recordCache(ccItem, itemPs);
+ }
+ } catch {
+ /* fail-open: cache write must never sink the item */
+ }
+ }
if (ctxDir) {
try {
const itemNid = nodeIdFor(String(idx));
@@ -1118,12 +1197,42 @@ async function executePhaseInner(
task: preRead + interpolate(phase.task ?? "", localCtx).text,
};
});
+ // Per-item caching is sound ONLY when ALL of:
+ // - cross-run scope: run-only has no persistent store, so per-item entries
+ // could never be re-read (no point keying them).
+ // - no Shared Context Tree (`!sharing`): a sharing map item can read sibling
+ // blackboard writes OUTSIDE its declared deps, so the per-item key (which
+ // folds only the item's own task) under-approximates real reads and could
+ // serve a stale result. Fall back to whole-map.
+ // - not inside a runtime-generated sub-flow (`def:` frame in the stack):
+ // such flows are untrusted / possibly non-deterministic, so per-item reuse
+ // is unsafe. Fall back to whole-map (which still applies breadth caps).
+ // `undefined phaseFingerprint` is NOT a blocker: `cacheKeys` falls back to
+ // the whole-flow `flowDefHash`, which is stable across runs for a fixed def,
+ // so per-item keys for unchanged items remain stable.
+ const perItemCacheable =
+ cc.scope === "cross-run" &&
+ !sharing &&
+ !(deps._stack ?? []).some((s) => s.startsWith("def:"));
+ // Pre-compute per-item CacheKeys once so the lookup and the record path use
+ // the IDENTICAL key (and share cacheKeys' v3:phasefp + flow-name +
+ // fingerprint + thinking/tools/preRead contract). The per-item key folds
+ // `it.agent` (Arbiter fix): a different agent means different output, so a
+ // per-item key WITHOUT the agent could serve a stale cross-agent hit when
+ // only `phase.agent` changed (the whole-map key would correctly miss via
+ // JSON.stringify(tasks), but per-item keys would not).
+ const perItemKeys: (CacheKeys | null)[] = perItemCacheable
+ ? tasks.map((it) => cacheKeys(cc, [phase.id, it.agent, phase.model ?? "", it.task]))
+ : tasks.map(() => null);
+ const perItem = perItemCacheable
+ ? { keyOf: (idx: number): CacheKeys | null => perItemKeys[idx] ?? null }
+ : undefined;
const ck = cacheKeys(cc, [phase.id, phase.model ?? "", JSON.stringify(tasks)]);
const inputHash = ck.key;
const cached = cachedPhase(cc, ck);
if (cached) return cached;
- const results = await runFanout(tasks);
+ const results = await runFanout(tasks, perItem);
const ps = mergePhaseState(phase.id, results, inputHash, parseJson);
if (readRefs.length) ps.reads = readRefsToReads(readRefs, state);
if (mapTruncated) {
diff --git a/skills/taskflow/SKILL.md b/skills/taskflow/SKILL.md
index aca991b..cd10531 100644
--- a/skills/taskflow/SKILL.md
+++ b/skills/taskflow/SKILL.md
@@ -553,6 +553,54 @@ Quick reference:
- **Precedence (model/thinking/tools):** phase value → agent frontmatter (resolved via `modelRoles`) → global/default.
- **Concurrency:** same-layer phases use `flow.concurrency`; a `map`/`parallel` phase uses `phase.concurrency ?? flow.concurrency ?? 8`.
+### Per-item map caching (cross-run)
+
+A `map` phase with `cache: { "scope": "cross-run" }` is cached **per item**, not
+just as a whole. When one of N items changes between runs, only that item
+re-executes — the other N−1 are served from the cross-run cache for $0.
+
+```jsonc
+{ "id": "audit-each", "type": "map",
+ "over": "{steps.discover.json.files}", // array from an upstream phase
+ "task": "audit {item}",
+ "cache": { "scope": "cross-run" }, // ← enables per-item reuse
+ "dependsOn": ["discover"], "final": true }
+```
+
+How it works:
+
+- The **whole-map** entry is still checked first (fast path): an identical
+ re-run is a single $0 hit and never enters the fan-out.
+- On a whole-map miss, each item is looked up individually before it spawns a
+ subagent; a hit returns a 0-token synthesized result. Successful fresh items
+ are recorded so a later run with that item unchanged reuses them.
+- Per-item keys fold the item's resolved task **and agent** (so changing
+ `phase.agent` invalidates every item), plus the phase sub-fingerprint,
+ `thinking`/`tools`, and any `fingerprint` entries — exactly like a standalone
+ cross-run phase.
+
+Automatic fallbacks (per-item disables and the whole-map path is used):
+
+- `shareContext: true` on the phase, or flow-wide `contextSharing: true` — a
+ sharing item can read sibling blackboard writes outside its declared deps, so
+ the per-item key would under-approximate real reads.
+- The map runs **inside a runtime-generated sub-flow** (a `flow { def }` phase
+ or a `ctx_spawn({subflow})`) — untrusted / possibly non-deterministic.
+- `scope: "run-only"` (default) or `"off"` — no persistent store to reuse from.
+
+Notes & limitations:
+
+- Duplicate items (identical task + agent) share a single entry — reuse is
+ content-addressable, not positional.
+- Failed items and **budget-skipped** items are never cached, so they always
+ re-execute on the next run.
+- `{steps.
@@ -728,12 +728,12 @@ Copy one into `.pi/taskflows/.json` (or `~/.pi/agent/taskflows/`) and it r
-**0 runtime dependencies** · **804 tests** · **9 phase types** · **shared context tree** · **cross-session resume** · **cross-run memoization** · **incremental recompute** · **FlowIR compile seam** · **detached execution** · **`compile` Mermaid renderer** · **~9k LOC runtime**
+**0 runtime dependencies** · **846 tests** · **9 phase types** · **shared context tree** · **cross-session resume** · **cross-run memoization** · **per-item map caching** · **incremental recompute** · **FlowIR compile seam** · **detached execution** · **`compile` Mermaid renderer** · **~9k LOC runtime**
- **Zero runtime dependencies.** No `dependencies` field — the runtime is built entirely on Node built-ins (`fs` / `path` / `os` / `child_process` / `crypto`). The file lock is `fs.openSync("wx")`, not a third-party library.
-- **804 tests across 42 test files** covering concurrency, atomic file locking (8-process race regressions), path-traversal hardening, cross-session resume, cross-run cache freshness (flow/thinking/tools key isolation, fingerprint invalidation, TTL/LRU eviction), backward-compatible cache-key migration (3-tier legacy fallback), the FlowIR compile seam (determinism, declared-plane synthesis), incremental recompute (early-cutoff propagation, partial cascade strictly < full, observed ∪ declared union frontier), gate verdicts, budget caps, retry/backoff, approval flows, loop termination, tournament judging, sub-flow composition, the shared context tree (blackboard reuse, supervision spawn, subflow validation/nesting), workspace isolation (temp/dedicated/worktree lifecycle, fail-open degrade, dynamic-flow rejection), dynamic sub-flow security hardening, detached execution (PID persistence, stale detection, crash→failed, resume after failure), live run-history refresh, callback isolation, the idle watchdog, model-role init config, parseModelFromLabel with parenthesized-model-name regression, and multi-fence `safeParse` recovery, plus the `compile` Mermaid renderer (id-collision disambiguation, markdown-injection hardening, and full verify-overlay category coverage).
+- **846 tests across 46 test files** covering concurrency, atomic file locking (8-process race regressions), path-traversal hardening, cross-session resume, cross-run cache freshness (flow/thinking/tools key isolation, fingerprint invalidation, TTL/LRU eviction), backward-compatible cache-key migration (4-tier legacy fallback), per-phase structural sub-fingerprint (v3:phasefp — editing one phase invalidates only it and its dependents), per-item map caching (one changed item re-executes, N−1 cache hits), the `incremental` flag (run-wide cross-run default), reuse reporting, the FlowIR compile seam (determinism, declared-plane synthesis), incremental recompute (early-cutoff propagation, partial cascade strictly < full, observed ∪ declared union frontier), gate verdicts, budget caps, retry/backoff, approval flows, loop termination, tournament judging, sub-flow composition, the shared context tree (blackboard reuse, supervision spawn, subflow validation/nesting), workspace isolation (temp/dedicated/worktree lifecycle, fail-open degrade, dynamic-flow rejection), dynamic sub-flow security hardening, detached execution (PID persistence, stale detection, crash→failed, resume after failure), live run-history refresh, callback isolation, the idle watchdog, model-role init config, parseModelFromLabel with parenthesized-model-name regression, and multi-fence `safeParse` recovery, plus the `compile` Mermaid renderer (id-collision disambiguation, markdown-injection hardening, and full verify-overlay category coverage).
- **Hardened by design.** Path-traversal defense (lexical + `realpath` containment check), runId validation, HTML/error sanitization, atomic writes, stale-lock stealing via `rename`, and an idle watchdog that kills wedged subagents (SIGTERM → SIGKILL after 5 minutes of silence). Dynamic sub-flows additionally get breadth caps, `cwd` containment, budget clamping, nesting depth caps, and prototype-pollution defense.
- **Dogfooded.** Every new feature has to survive the project's own `self-improve` taskflow before it ships.
diff --git a/package.json b/package.json
index d520ccf..d89c42f 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "pi-taskflow",
- "version": "0.0.27",
+ "version": "0.0.28",
"description": "A declarative, verifiable graph of task nodes for the Pi coding agent — not a workflow you script, but a DAG you declare: statically verified before it runs, with dynamic fan-out, gates, isolated subagent context, resumable runs, and saveable commands.",
"keywords": [
"pi-package",
diff --git a/skills/taskflow/SKILL.md b/skills/taskflow/SKILL.md
index cd10531..a8863f1 100644
--- a/skills/taskflow/SKILL.md
+++ b/skills/taskflow/SKILL.md
@@ -549,7 +549,7 @@ Quick reference:
- **Flow:** `name`, `description`, `concurrency` (default 8), `budget` (`maxUSD`/`maxTokens`), `agentScope` (user|project|both), `args`, `strictInterpolation`.
- **Phase:** `model`, `thinking`, `tools` (whitelist), `cwd`, `output:"json"`, `concurrency` (map/parallel fan-out), `when`, `join` (all|any), `retry`, `use`/`with` (flow), `optional` (fail-soft — a failed/blocked phase won't abort the run), `final`.
-- **Cross-run caching:** add `cache: { "scope": "cross-run" }` to a phase to memoize its output across runs (same input → instant reuse, zero tokens). See `configuration.md` for `ttl`, `fingerprint` (git/glob/file/env invalidation), and scope options.
+- **Cross-run caching:** add `cache: { "scope": "cross-run" }` to a phase to memoize its output across runs (same input → instant reuse, zero tokens), or set `incremental: true` at the flow level (or pass `incremental: true` to `run`) to default every phase to cross-run reuse. See `configuration.md` for `ttl`, `fingerprint` (git/glob/file/env invalidation), scope options, and the `incremental` precedence rules.
- **Precedence (model/thinking/tools):** phase value → agent frontmatter (resolved via `modelRoles`) → global/default.
- **Concurrency:** same-layer phases use `flow.concurrency`; a `map`/`parallel` phase uses `phase.concurrency ?? flow.concurrency ?? 8`.
diff --git a/skills/taskflow/configuration.md b/skills/taskflow/configuration.md
index 22fa9f9..7476933 100644
--- a/skills/taskflow/configuration.md
+++ b/skills/taskflow/configuration.md
@@ -283,6 +283,28 @@ for the design.
| `cross-run` | Reuse an identical-input result from **any** prior run (the persistent store). |
| `off` | Never reuse, even within a run (force re-execution every time). |
+### Flow-wide opt-in: `incremental`
+
+Rather than annotating every phase with `cache: { "scope": "cross-run" }`, set
+`incremental: true` at the **flow** level (or pass `incremental: true` as the
+`run` tool argument) to default *every* phase to cross-run reuse:
+
+```jsonc
+{
+ "name": "audit",
+ "incremental": true, // ← every phase defaults to scope:"cross-run"
+ "phases": [ /* ... */ ]
+}
+```
+
+Precedence: the invocation `incremental` argument wins over the flow's
+`incremental` field, which is in turn overridden by any **per-phase** `cache`
+setting. The cross-run-blocked phase types (`gate`/`approval`/`loop`/
+`tournament`) and all per-phase soundness fallbacks still apply. The default
+remains `run-only` (each run starts fresh unless something opts in), because
+cross-run reuse silently persists outputs and can serve stale results for phases
+whose agents read files at runtime.
+
### `ttl` (cross-run only)
Max age before a cross-run hit is treated as a miss: e.g. `"30m"`, `"6h"`, `"7d"`.