[WIP] perf(core): Add a new mutation pipeline for per predicate runners#9467
[WIP] perf(core): Add a new mutation pipeline for per predicate runners#9467
Conversation
e4e2caf to
f0a5928
Compare
|
d233f29 to
e8a07d6
Compare
b5c3b4c to
978a0d4
Compare
Revival of
|
| Commit | What |
|---|---|
| 436b55f8b | Resolve the merge of main into the branch (import paths hypermodeinc/dgraph → dgraph-io/dgraph, license header, two-line fix in mvcc_test.go::TestRegression9597 for the new Deltas type). |
| a2fcc09f7 | Strip ~30 unconditional fmt.Println debug calls from hot paths (posting/lists.go, index.go, list.go, mvcc.go). They fired on every read/mutation/rollup. |
| 4fe27e1d4 | Remove dead commented-out code: ~175 lines of an alternate InsertTokenizerIndexes impl, plus the 101 commented-out lines of BenchmarkProcessListIndex in worker/draft_test.go (referenced methods that don't exist). |
| 8fbb0bdf2 | Replace the hardcoded featureFlag := true in applyMutations with a real superflag knob mutations-use-pipeline. (Subsequently superseded by the threshold form below.) |
Phase 2 — correctness fixes (real bugs)
| Commit | What |
|---|---|
| 2d2afb190 | Document why TestCount in worker/sort_test.go is t.Skip()'d. The test bypasses Oracle conflict checking; both legacy and new paths fail it. |
| 43b826d0a | Scalar Del-of-old-value wiping the new Set in ProcessCount. handleOldDeleteForSingle appends a synthetic Del oldVal posting next to the user's Set newVal so index/count diffing can see the prior value. ProcessCount then iterated postings and called updateMutationLayer(post, singleUidUpdate=true) on each. For non-Lang scalars fingerprintEdge returns math.MaxUint64, so both postings collide on the same Uid; the second iteration's Del unconditionally rewrote currentEntries to [DeleteAll], dropping the new value. Fix: in the !isListEdge branch, when the postings list contains a Set/Ovr, treat any Del as synthetic and skip it for the data-list update. |
| 792923279 | InsertTokenizerIndexes deadlock on uids ≥ 2^63. The dispatcher in posting/index.go did chMap[int(uid)%numGo] <- uid. int(uid) is signed, so for any uid in [2^63, 2^64) the modulo result is negative; chMap[-N] returns Go's zero value — a nil channel — and sending on a nil channel blocks forever. Surfaced by live load on 1million.rdf.gz: the very first batch wedged. Two-character fix: chMap[int(uid%uint64(numGo))] <- uid. |
| 8ccdae630 | IterateDisk had IsEmpty stubbed to false (introduced by branch commit 41d6445ce1 "fixed some bug" with no comment). That forced every key the iterator found to be reported as non-empty. Broke has(<predicate>) for any uid whose value had been removed via star-deletion. Surfaced by TestSystestSuite/TestHasDeletedEdge in systest/mutations-and-queries. Restored. |
| de90d97e0 | Nil-pointer deref in ProcessSingle on multi-Del-per-uid mutations + strip two leftover fmt.Println("READING…") calls in worker/task.go that the Phase 1B sweep missed. The GraphQL deleteTask(filter: {}) cleanup expands into multiple Del edges per entity (one per predicate); when ≥2 land on the same uid in one batch, findSingleValueInPostingList returns nil on the second iteration, then string(edge.Value) == string(oldVal.Value) panics. Fix: nil-guard the deref and move oldVal per-iteration so stale values can't bleed across edges. Surfaced by graphql/e2e/auth/TestOrderAndOffset. |
| 0a9ffc174 | Reused-key-buffer aliasing across ReadPostingList, the in-memory cache, and the rollup queue. The pipeline's ProcessCount loop allocates one dataKey buffer per call and mutates its trailing 8 bytes per uid. Two distant sites captured the slice header rather than the bytes: (a) ReadPostingList's l.key = key aliased the caller's buffer, and saveInCache stored a copyList that shared the alias — by the time async rollup pulled the cached list and built kv.Key = alloc.Copy(l.key), the bytes had collapsed to whatever uid was processed last, so a BitCompletePosting (with WithDiscard) got written to a different list's key, silently overwriting an unrelated reverse list with this list's contents; (b) addKeyToBatch appended the slice header verbatim, so all queued rollup targets from one ProcessCount goroutine ended up pointing at the same final-iteration key. Both fixed by taking ownership of the bytes. Legacy runMutation allocates a fresh key per call so it never aliased anything; the bug was only visible when a caller deliberately reused one buffer the way ProcessCount does. This was the cause of systest/21million/live's consistent failure. |
Feature flag — threshold instead of on/off
| Commit | What |
|---|---|
| c8f5fbcf3 | Replace MutationsUsePipeline bool with MutationsPipelineThreshold int. A mutation runs through the pipeline only when threshold > 0 && len(m.Edges) >= threshold; 0 = pipeline off, 1 = always on, larger values opt small interactive mutations out (benches show ~2× slowdown for ≤10-edge txns). CLI: dgraph alpha --feature-flags="mutations-pipeline-threshold=200". |
| 86ad1eea9 | Flip the default to 1 so test suites exercise the pipeline path on every mutation. Reset to 0 before merging once the canonical CI has been green for a soak window. |
Tests / regressions
| Commit | What |
|---|---|
| 7c95203b8 | Four in-process regression harnesses for [uid] @reverse @count: single-txn, multi-batch sequential, multi-batch multi-predicate (parallel goroutines), and multi-batch concurrent through a fakeOracle conflict-checking harness. None reproduce the buffer-aliasing bug fixed in 0a9ffc1 — that needs the async rollup path that only fires on a real running cluster — but they pin down the in-memory mutation path's reverse-list bookkeeping. |
| d0c631a16 | New systest/21million/queries/query-073: an exhaustive count(~genre) check across every one of the 592 non-empty Genre entities, with the exact expected count pinned. Catches any future regression that mis-routes reverse edges, which the existing query-017 only does indirectly. |
Benchmarks
In-process matrix (worker/pipeline_bench_test.go, included in 792923279) on Apple M4 Pro, 30 iterations:
| Config | Legacy ns/op | Pipeline ns/op | Pipeline / Legacy |
|---|---|---|---|
| 1 pred × 1 edge | 39,753 | 89,581 | 2.25× slower |
| 10 preds × 1 edge | 190,672 | 451,854 | 2.37× slower |
| 1 pred × 100 edges | 654,775 | 727,931 | 1.11× slower |
| 50 preds × 100 edges | 31,630,750 | 29,959,582 | 1.06× faster |
| 10 preds × 100 edges | 9,448,342 | 7,441,912 | 1.27× faster |
| 1 pred × 1000 edges | 5,753,151 | 4,957,482 | 1.16× faster |
| 10 preds × 1000 edges | 69,132,992 | 58,908,736 | 1.17× faster |
| 10 preds × 1000 edges, no index | 31,308,575 | 20,238,396 | 1.55× faster |
| 50 preds × 1000 edges | 403,867,619 | 375,424,165 | 1.07× faster |
Crossover is around 100 total edges.
End-to-end live load against the 1M dataset (1million.rdf.gz, official 1M schema with mixed [uid] @reverse @count, [uid] @count, datetime @index(year), string @index(hash, term, trigram, fulltext) @lang, geo @index(geo), string @index(exact) @upsert):
mutations-pipeline-threshold |
Avg time | Avg N-Quads/s |
|---|---|---|
0 (legacy) |
~14.3 s | ~77K |
1 (always-on pipeline) |
~9.5 s | ~116K |
~1.50× faster on a realistic bulk-shaped workload — the case the per-predicate runner was designed for.
Test results
With mutations-pipeline-threshold=1:
posting/,worker/,types/,schema/— all unit tests pass, including the newTestPipelineCountIndexConcurrentand fourTestPipelineReverseListCount*harnesses under-count=20 -race../t --suite=systest-baseline— all 13 packages green:systest,systest/acl/restore,systest/audit,systest/audit_encrypted,systest/backup/filesystem,systest/cdc,systest/cloud,systest/export,systest/group-delete,systest/loader,systest/multi-tenancy,systest/mutations-and-queries(66 tests including the previously-failingTestHasDeletedEdge),systest/plugin. Total wall clock 7m38s../t --suite=core,integration—acl(150.8s) green plus four largesystest/vectortests (TestVectorIncrBackupRestore124s,TestVectorBackupRestore72s,TestVectorBackupRestoreDropIndex23s,TestVectorBackupRestoreReIndexing110s). Suite then died on a pre-existing Mac-only bug indgraphtest(commitef6f27da79"Fix running tests on mac" onmain) that's unrelated to the pipeline.systest/21million/liveTestQueries— green. Sweep across all 764 Genre entities reportscount(~genre) == count(forward edges)for every genre. Pre-fix the same sweep showed sets of 4 mismatched genres with thousands of stale or missing reverse entries each load, with the specific affected genres varying run-to-run.
Out of scope, worth noting
- The bulk loader (
dgraph/cmd/bulk/) doesn't import any of the pipeline machinery. The "speed up bulk loader" framing in the PR's lineage doesn't survive code inspection — the bulk loader has its own map-reduce path. TestCountinworker/sort_test.gois leftt.Skip()'d with a clear reason (harness lacks Oracle conflict checking; both legacy and pipeline fail it).
Recommended deployment
- Reset the default to
0before merging. - Operators who want the speedup on bulk-only workloads can opt in with
--feature-flags="mutations-pipeline-threshold=200"(or whatever the per-mutation crossover is for their workload). - Setting
1always-on is fine for systest CI runs and is a useful "ramp-up" knob.
This comment has been minimized.
This comment has been minimized.
Adds an alternative path through `applyMutations` that fans the edges of a single mutation out by predicate and processes each predicate's batch in its own goroutine, exposing parallelism the legacy serial-by-edge path cannot. The per-predicate runners drive the full mutation lifecycle (scalar/list, reverse, count, tokenized index, vector index) through new helpers in posting/index.go: `MutationPipeline.Process` → `ProcessPredicate` → `ProcessSingle` / `ProcessList` / `ProcessVectorIndex`, with `InsertTokenizerIndexes`, `ProcessReverse`, and `ProcessCount` shared across the predicate-shaped paths. Supporting infrastructure: - `posting.Deltas` — per-txn delta store split into a sharded raw-bytes map and a per-predicate `indexMap` for batched index writes. - `types.LockedShardedMap` — generic sharded RWMutex map used by Deltas and the in-memory index aggregation. - New mutation-time helpers on `posting.Txn` (`AddDelta`, `GetScalarList`, `addConflictKey`, `addConflictKeyWithUid`). - Test scaffolding in `worker/mutation_unit_test.go`, `worker/sort_test.go`, and `worker/draft_test.go`. This commit squashes the original WIP series authored by Harshil Goel (commits 978a0d4…41d6445ce on the abandoned branch) plus the merge into current `main` and the mechanical clean-ups required to compile against it: import paths `hypermodeinc/dgraph` → `dgraph-io/dgraph`, license headers, a `posting/mvcc_test.go::TestRegression9597` fix-up for the `LocalCache.deltas` map → `*Deltas` refactor, and a stray doc reference in TESTING.md. The pipeline ships gated and disabled; the on/off knob and subsequent correctness fixes follow in later commits in this series. Co-Authored-By: Matthew McNeely <matthew.mcneely@gmail.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Removes the unconditional debug prints scattered through posting/ during the original WIP work — they fired on every read, mutation, rollup, and commit. None of them were guarded by a verbosity flag, so under load they would have produced megabytes of stdout noise per second. Sites stripped: - posting/lists.go: READING / READING SINGLE / GETTING KEY FROM DELTAS - posting/index.go: TOKENS, LOCAL MAP, INSERTING INDEX, UPDATE INDEX, ERRORRRING, "Inserting tokenizer indexes ... took" - posting/mvcc.go: COMMITTING (and unused fmt import) - posting/list.go: "Buidlding committed uids", "Setting mutation", PrintRollup helper (called once internally, never elsewhere) Left in place: printTreeStats() in index.go, which is already gated by the DEBUG_SHOW_HNSW_TREE env var and is an intentional opt-in HNSW debug helper. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two large dead-code blocks left over from the original WIP: - posting/index.go: ~175 lines of an alternate InsertTokenizerIndexes implementation, fully commented out. The live implementation directly above it supersedes it; keeping the commented variant just made the file harder to follow. Also drop the scattered "//fmt.Println(...)" leftovers next to live code. - worker/draft_test.go: BenchmarkProcessListIndex was added entirely commented out and references methods (DefaultPipeline, ProcessListWithoutIndex, ProcessListIndex) that don't exist on MutationPipeline. If we want a benchmark for the pipeline, we should write one against the real API. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the hardcoded \`featureFlag := true\` in applyMutations with a real superflag knob, defaulted off: - Add WorkerOptions.MutationsUsePipeline (bool) in x/config.go. - Extend the feature-flags superflag with mutations-use-pipeline=false and wire alpha to populate WorkerConfig.MutationsUsePipeline from it. - worker/draft.go applyMutations now branches on x.WorkerConfig.MutationsUsePipeline; default false routes mutations through the legacy path, preserving current behavior. Tests can opt into the new pipeline by setting x.WorkerConfig.MutationsUsePipeline = true. CLI usage: dgraph alpha --feature-flags="mutations-use-pipeline=true" Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
TestCount is t.Skip()'d on the branch, but the reason wasn't recorded. Investigation: the test launches concurrent transactions sharing entity uids and bypasses the Oracle's conflict-checking commit path — it just calls CommitToDisk() directly with disjoint commit timestamps. Both the legacy AddMutationWithIndex path and the new mutation pipeline fail it identically: with two threads adding edges to the same subject's [uid] @count predicate, neither path can serialize @count updates without real txn conflicts, so the count index ends up inconsistent and many subjects are missing from count(N). This is expected without conflict checking — the unit harness can't exercise the safety the Oracle provides. Re-enable when we wire either: (a) Oracle.WaitForTs/IsAborted into the harness, or (b) this test through worker.applyMutations() (which does invoke the Oracle conflict path). Single-thread TestCount passes, so the per-predicate pipeline's own count logic is correct in the absence of contention. The existing TestStringIndexWithLang covers the multithreaded happy path with disjoint uids. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ssCount Bug: scalar @count writes were nondeterministically losing data under concurrent transactions. Roughly half the deltas committed by the new mutation pipeline contained only a [DeleteAll] posting and no Set, so reads at maxTs returned an empty value list. Root cause: in ProcessSingle, handleOldDeleteForSingle appends a synthetic Del-of-old-value to postings[uid] alongside the user's Set, so InsertTokenizerIndexes / ProcessReverse / count diffing can see the prior value. ProcessCount then iterates the postings and calls list.updateMutationLayer(post, singleUidUpdate=true, ...) on each. For non-Lang scalar predicates fingerprintEdge returns math.MaxUint64, so the synthetic Del and the user Set both have Uid == math.MaxUint64. The first iteration (Set new) leaves mutationMap.currentEntries = [DeleteAll, Set new]; the second iteration (Del old) finds the Set we just inserted via findPosting and applies updateMutationLayer in singleUidUpdate mode, which unconditionally rewrites currentEntries to [DeleteAll] (the Del branch never appends mpost) — wiping the new value. Fix: in ProcessCount, when iterating a !isListEdge predicate's postings, if the list contains a Set/Ovr posting, treat any Del as synthetic and skip it for the data-list update. Standalone user Dels (no accompanying Set) are still applied. Index/reverse/count diffing already happen before ProcessCount runs and aren't affected. Repro: TestPipelineCountIndexConcurrent in worker/sort_test.go is a new conflict-aware in-process harness that mirrors the systest TestCountIndexConcurrentSetDelScalarPredicate. It runs 200 contending transactions setting <0x1> <name> "name<rand>" against a "string @index(exact) @count" schema with a fakeOracle that implements the same hasConflict algorithm as dgraph/cmd/zero/oracle.go. Pre-fix the test fails roughly 50% of runs with an empty data list and the wrong count buckets; post-fix it is stable across 20+ -count iterations and under -race. Existing tests (TestScalarPredicateIntCount, *RevCount, *Count, TestSingleUidReplacement, TestDeleteSetWithVarEdgeCorruptsData, TestStringIndexWithLang, TestMultipleTxnListCount, TestGetScalarList, TestDatetime) all pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When loading via dgraph live (or any mutation source whose uids span
the full uint64 range, including xidmap-assigned uids), the
per-predicate pipeline hung indefinitely on the very first batch with
zero forward progress. A goroutine dump showed the dispatcher
goroutine wedged on \`chan send (nil chan)\` at the line:
chMap[int(uid)%numGo] <- uid
uid is uint64. Casting directly to int produces a negative value for
uid >= 2^63, so int(uid)%10 can be in [-9, -1]. chMap[-3] returns the
zero value for a chan uint64, which is a nil channel; sending on a
nil channel blocks forever.
The 10 worker goroutines (also created here) were idle on
\`for uid := range uids\` since no uids ever reached them, so the
parent \`wg.Wait()\` and the surrounding errgroup never returned.
applyMutations therefore never released the txn, the alpha's old-txn
abort loop kept retrying every minute, and live-load showed
"Txns: 0 N-Quads: 0" indefinitely.
Fix: hash unsigned, then cast: \`chMap[int(uid%uint64(numGo))]\`.
Verified end-to-end with the live loader against the 1million.rdf.gz
benchmark dataset (1,041,684 n-quads, schema mixes [uid] @reverse
@count, [uid] @count, datetime @index(year), string @index(...) @lang,
geo @index(geo), string @index(exact) @upsert):
legacy : 13.85s / 14.74s (avg ~14.3s, ~77k n-quads/s)
pipeline : 9.65s / 9.36s (avg ~9.5s, ~116k n-quads/s)
That is ~1.50x faster on a realistic multi-predicate, multi-index
workload — i.e. the case the per-predicate runner pipeline is built
for.
Also adds worker/pipeline_bench_test.go: in-process Go benchmarks
comparing legacy runMutation vs newRunMutations across a matrix of
(predicates, edges-per-predicate, indexed/non-indexed) shapes. They
show the pipeline loses ~2x on tiny mutations (1-10 edges) and wins
1.2x-1.55x on bulk (10 preds x 100+ edges, indexed or not), which is
why the feature flag stays default-off and the live-loader speedup
above is the right place to evaluate this work.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The benchmark matrix in worker/pipeline_bench_test.go showed the
pipeline loses ~2x on small mutations (≤10 edges total) and wins
~1.5x on bulk (live-loader sized: 1000 edges per txn across many
predicates). A binary on/off flag forces an all-or-nothing choice,
penalising whichever side of that crossover the workload spends most
time on.
Replace MutationsUsePipeline (bool) with MutationsPipelineThreshold
(int):
threshold = 0 -> never use the pipeline (default; legacy behavior)
threshold = 1 -> always use the pipeline (any txn with ≥1 edge)
threshold = N -> use the pipeline only when len(m.Edges) >= N
The threshold compares against total edges in the proposal. From the
benches the crossover is around 100; the live-loader 1M dataset uses
~1000 edges per txn, so anything from 100-1000 will engage the
pipeline only on bulk-shaped mutations and leave small interactive
mutations on the legacy serial path.
Wiring:
- x.WorkerConfig.MutationsPipelineThreshold (int) replaces the
bool field.
- feature-flags superflag: "mutations-pipeline-threshold=0".
- alpha/run.go reads it via featureFlagsConf.GetInt64.
- worker/draft.go applyMutations branches on
`t > 0 && len(m.Edges) >= t`.
Verified end-to-end against the live-loader benchmark
(1million.rdf.gz, official 1M schema):
threshold=0 : 13.56s, 80,129 N-Quads/s (legacy, matches baseline)
threshold=1 : 9.92s, 115,742 N-Quads/s (always-on, matches prior)
CLI usage:
dgraph alpha --feature-flags="mutations-pipeline-threshold=200"
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Engages the per-predicate mutation pipeline by default so that systest runs (and any other test suites that don't override feature-flags) exercise the pipeline path on every mutation, not the legacy serial path. Threshold of 1 means "any mutation with ≥1 edge takes the pipeline" — i.e. always on. This is a deliberate ramp toward shipping the pipeline. Operators who want to opt small interactive mutations out of the pipeline (where benches showed ~2x slowdown for ≤10-edge txns) can set a higher threshold: dgraph alpha --feature-flags="mutations-pipeline-threshold=200" To turn the pipeline fully off, set 0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The original branch commit 41d6445 ("fixed some bug") replaced the IsEmpty(readTs) call in IterateDisk with a hardcoded `false`, forcing every key found by the iterator to be reported as non-empty. That broke has(<predicate>) for any uid whose value had been removed via star-deletion (<uid> <pred> *): the data list still exists in badger with a DeleteAll marker on top, but the live posting list is empty at readTs — IsEmpty returns true and the uid should be skipped. Surfaced by systest TestSystestSuite/TestHasDeletedEdge in systest/mutations-and-queries: 3 nodes are created with <end> "", one is star-deleted, follow-up has(end) is expected to return 2 uids. With IsEmpty stubbed to false it returned 3. No comment was left on the original change. Restoring the call. The mutations-and-queries package is fully green with this in place (66/66 tests pass including TestHasDeletedEdge); if a real underlying issue motivated the original disable we'll chase it with a real diagnosis instead of silently dropping a safety check. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…p stale READING debug prints
Two bugs surfaced by graphql/e2e/auth's TestOrderAndOffset (cleanup
mutation \`mutation DelTask { deleteTask(filter: {}) }\` crashed the
alpha mid-request, causing the test client to see EOF on POST):
1. posting/index.go ProcessSingle SIGSEGV at line 675.
GraphQL deleteTask cleanup expands into multiple Del edges per
entity (one per predicate the entity has — uid, type, list edges,
etc.). When two Del edges to the same uid land in one transaction's
batch, the second iteration through ProcessSingle's per-edge loop
does:
pl, exists := postings[uid]
if exists {
if edge.Op == DEL {
oldVal = findSingleValueInPostingList(pl)
if string(edge.Value) == string(oldVal.Value) { ... }
^^^^^^ nil deref
}
}
findSingleValueInPostingList only returns Set postings; if the
accumulated list holds only Dels (from the prior iteration), it
returns nil and we panic dereferencing oldVal.Value.
Two fixes here:
- Guard the deref: \`if oldVal != nil && string(...) == ...\`.
- Move \`var oldVal *pb.Posting\` inside the loop. It was declared
at function scope, so a stale value from one edge could bleed
into the nil-guarded branch for a different uid on a later
iteration. Per-edge scope makes the intent explicit.
2. worker/task.go: two leftover \`fmt.Println("READING SINGLE", ...)\`
and \`fmt.Println("READING", ...)\` calls in the value-postings
read path. Same class of debug spew Phase 1B stripped from
posting/, missed because that sweep didn't include worker/.
Removed both. Safe — they were unconditional prints on every
query value read.
The graphql/e2e/auth and graphql/e2e/auth/debug_off packages now both
pass in 30s. \`./posting/\` and \`./worker/\` unit tests still green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…addKeyToBatch The 21million live-load systest failed with `~genre @filter(uid(F))` returning wrong films for some genres — corruption pattern was that one genre's reverse list contained another genre's films verbatim, while a third genre's reverse list was missing thousands of entries. Different genres were affected on each load, but always: legacy runMutation produced correct reverse counts; the per-predicate pipeline produced corrupt ones. Root cause: the pipeline's ProcessCount per-uid loop allocates one key buffer per ProcessCount call and mutates its trailing 8 bytes each iteration via `binary.BigEndian.PutUint64(dataKey[len-8:], uid)`. Two distinct sites then captured the slice header rather than the bytes: 1. ReadPostingList's `l.key = key` aliased the caller's buffer. saveInCache stores a copyList of the freshly-read list, and copyList sets `key: l.key` — also an alias. The cached list's key field therefore points at a buffer that the pipeline keeps mutating; by the time the async rollup path retrieves the cached list and runs `kv.Key = alloc.Copy(l.key)` to build the rolled-up KV, the bytes are whatever the LAST iteration left behind. Rollup then writes a BitCompletePosting with WithDiscard() to the wrong key, overwriting an unrelated reverse list with this list's (rolled-up) contents. 2. ReadPostingList's defer `IncrRollup.addKeyToBatch(key, ...)` appended the slice header verbatim to the rollup queue. Every queued entry from one ProcessCount goroutine ended up pointing at the same shared dataKey buffer; by the time the rollup goroutine processed the batch the bytes had collapsed to the final iteration's uid, redirecting many distinct rollup targets to the same key. Both sites fixed by taking ownership of the bytes (`append nil` / explicit `make+copy`). Legacy runMutation hits ReadPostingList too but allocates a fresh key per call, so it never aliased anything; the bug is only visible when a caller deliberately reuses one buffer across many uids the way ProcessCount does. Verified end-to-end on the systest/21million/live load with mutations-pipeline-threshold=1: a fresh load + sweep across all 764 Genre entities now reports `count(~genre) == count(forward edges)` for every genre. Pre-fix the same sweep showed 4 mismatched genres with thousands of stale or missing reverse entries; on a different load it showed Documentary missing 23,325 of its 31,370 reverse entries while Children's/Family had 1,435 stale Crime-Thriller entries. Unit tests in posting/ and worker/ still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Added in pursuit of the 21million live-load failure that turned out to be the dataKey buffer-aliasing bug fixed in 0a9ffc1. None of these tests reproduce that specific corruption — it needs the async rollup path that only triggers on a real running cluster — but they pin down what the per-predicate pipeline does correctly for the non-rollup cases: - TestPipelineReverseListCount: one transaction, multiple subjects pointing at multiple objects on a [uid] @reverse @count predicate. Verifies forward and reverse lists are both complete. - TestPipelineReverseListCountMultiBatch: 50 subjects x 20 objects spread across 143 sequential transactions in batches of 7. - TestPipelineReverseListCountMultiPred: 30 subjects x 12 objects across 3 distinct list-uid + reverse + count predicates, with shuffled edges and small batches so the per-predicate pipeline goroutines for each predicate run in parallel inside Process(). - TestPipelineReverseListCountConcurrent: same shape as the multi-batch case but with 10 worker goroutines submitting batches in parallel through the fakeOracle conflict-checking harness. All four pass cleanly. They guard against regressions in the in-memory mutation path's reverse-list bookkeeping; the rollup path that the 0a9ffc1 fix actually targets is exercised by the systest/21million/live integration test. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…empty genres A comprehensive `count(~genre)` check across every Genre entity in the 21million dataset. The query asks for each genre's name and the size of its reverse posting list, sorted by name; the fixture pins the expected count for all 592 genres that have at least one film. Motivation: the per-predicate mutation pipeline had a buffer-aliasing bug (fixed in 0a9ffc1) that produced wrong reverse-list contents on a different small subset of genres each load — sometimes Documentary lost most of its 31,370 entries, sometimes Children's/Family gained ~1,500 spurious Crime-Thriller entries, sometimes Backstage Musical and Indie film were the affected pair. The existing query-017 (Taraji-films-by-genre) only catches it when that specific actor's films happen to land on a corrupted genre, and query-016 / query-044 only test genres above 30,000 films. This new query exercises every genre's reverse list and pins the exact expected count, so any regression that mis-routes reverse edges for any genre will fail it directly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two related bugs in the per-predicate mutation pipeline that surface as stale index entries under TestStringIndex (systest/bgindex): 1. Process() (the star-delete fast path) was passing the caller's bare context to handleDeleteAll. schema.State().IsIndexed / IsReversed / HasCount only consult the pending mutSchema (the new schema with the index being built) when the context carries the isWrite flag. The legacy runMutation path calls schema.GetWriteContext at the top; the pipeline didn't. During a background index build, every star-delete saw isIndexed=false and skipped addIndexMutations(DEL) for the prior value, leaving stale uids in the index permanently. Lifting GetWriteContext into Process covers both the star-delete path and the later predicate-pipeline goroutines. 2. ProcessSingle's data-list write was passing the unfiltered postings list to AddDelta. handleOldDeleteForSingle appends a synthetic Del(oldVal) alongside the user's Set(newVal) so InsertTokenizerIndexes and ProcessReverse can emit Del-of-old entries. For scalar non-list non-lang predicates both postings share Uid == math.MaxUint64, and at read time pickPostings' equal-ts tie-break falls back to Go's unstable sort.Slice while setMutationAfterCommit overwrites committedUids[mpost.Uid] in append order. Either way Del can clobber the new Set and the data list reads as "no value." ProcessCount already strips the synthetic Del via skipSyntheticDel; mirror that filter in ProcessSingle's main data-list path. systest/bgindex now passes (TestStringIndex, TestReverseIndex, TestCountIndex, TestParallelIndexing). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
HNSW Insert performs multi-key read-modify-write across the entry-pointer
key, the per-level edge lists, and back-edges into the neighbors of the
new node. Per-list locks make each key's update atomic, but the
cross-key sequences ("read entry, lock entry, read neighbor, modify
neighbor, append back-edge to neighbor's list…") are not. Running 10
worker goroutines concurrently against the same txn cache lets updates
stomp on each other, leaving nodes that have a data-list entry but are
unreachable from the entry point.
Surface symptom: similar_to(k=N) returns fewer than N hits even though
all N vectors committed to the data list. TestVectorTwoTxnWithoutCommit
reliably reproduced this on Linux CI (and not on the macOS dev box,
likely a scheduling artifact).
Legacy applyMutations arrives at single-threaded vector handling
indirectly via x.DivideAndRule: for any num < 256 the helper rounds
numGo down to 1, so a 5-edge vector mutation runs serially on main and
the bug never surfaces there. Mirror that here by dropping
numThreads to 1 — correctness over within-txn parallelism for vector
predicates. Cross-txn parallelism is unaffected.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
60f51c8 to
97609b3
Compare
Per-predicate mutation pipeline — revival of #9467
This branch revives the abandoned per-predicate mutation pipeline started by @harshil-goel. The original work added an alternative path through
applyMutationsthat fans mutation edges out by predicate and processes each predicate's batch concurrently — exposing parallelism the legacy serial path can't. It was abandoned with red CI (most notablyTestCountIndexConcurrentSetDelScalarPredicate) and a trail of WIP debris; this work picks it up and gets it green.The bulk of the diff is correctness fixes the original author didn't get to, each committed individually with a full trace narrative. TL;DR: scalar
Del-of-old-value was overwriting newSets inside bothProcessCountandProcessSingle's data-list write, leaving counts off-by-one and post-mutation reads returning "no value";InsertTokenizerIndexesdeadlocked on uids ≥ 2⁶³ via a signed-int dispatch hash; reused key buffers inReadPostingListand the async rollup queue silently corrupted unrelated reverse lists under the 21M live load;IterateDisk'sIsEmptyhad been stubbed tofalseand brokehas(...)after star-deletes; the pipeline's star-delete fast path skipped index Del generation during background indexing because it forgot to set the write-context flag; vector inserts were fanned across 10 goroutines that raced on shared HNSW graph state. There are also four new in-process regression harnesses for[uid] @reverse @countand an exhaustivecount(~genre)check across all 592 non-empty Genre entities in the 21M dataset.The on/off feature flag became
--feature-flags="mutations-pipeline-threshold=N"—N=0disables the pipeline entirely (legacy path),N=1always uses it,N>1only routes mutations with ≥ N edges to it. The pipeline pays a per-predicate goroutine spin-up cost, so tiny mutations are slightly slower on it; bulk multi-predicate mutations are faster (crossover ≈ 100 edges in in-process benches; live-loading the 1M dataset is ~1.5× faster end-to-end at threshold=1 vs. legacy).Status: local
./t --suite=systest-baseline,./t --suite=load(incl.systest/bgindex), and the unit tests are green; the default is currently flipped to1so CI exercises the pipeline path on every mutation. Before merge, reset the default to0(off-by-default, opt-in via the feature flag) once the canonical Linux CI has been clean for a soak window. Operators who want the bulk-load speedup can opt in with a higher threshold (e.g.200) so only large mutations take the new path.Commits in this PR are stacked for easier review.
TODO: set the default
mutations-pipeline-thresholdto either zero or 100 (following more testing).