Skip to content

[WIP] perf(core): Add a new mutation pipeline for per predicate runners#9467

Open
ghost wants to merge 17 commits intomainfrom
harshil-goel/mutation-pipeline
Open

[WIP] perf(core): Add a new mutation pipeline for per predicate runners#9467
ghost wants to merge 17 commits intomainfrom
harshil-goel/mutation-pipeline

Conversation

@ghost
Copy link
Copy Markdown

@ghost ghost commented Jul 10, 2025

Per-predicate mutation pipeline — revival of #9467

This branch revives the abandoned per-predicate mutation pipeline started by @harshil-goel. The original work added an alternative path through applyMutations that fans mutation edges out by predicate and processes each predicate's batch concurrently — exposing parallelism the legacy serial path can't. It was abandoned with red CI (most notably TestCountIndexConcurrentSetDelScalarPredicate) and a trail of WIP debris; this work picks it up and gets it green.

The bulk of the diff is correctness fixes the original author didn't get to, each committed individually with a full trace narrative. TL;DR: scalar Del-of-old-value was overwriting new Sets inside both ProcessCount and ProcessSingle's data-list write, leaving counts off-by-one and post-mutation reads returning "no value"; InsertTokenizerIndexes deadlocked on uids ≥ 2⁶³ via a signed-int dispatch hash; reused key buffers in ReadPostingList and the async rollup queue silently corrupted unrelated reverse lists under the 21M live load; IterateDisk's IsEmpty had been stubbed to false and broke has(...) after star-deletes; the pipeline's star-delete fast path skipped index Del generation during background indexing because it forgot to set the write-context flag; vector inserts were fanned across 10 goroutines that raced on shared HNSW graph state. There are also four new in-process regression harnesses for [uid] @reverse @count and an exhaustive count(~genre) check across all 592 non-empty Genre entities in the 21M dataset.

The on/off feature flag became --feature-flags="mutations-pipeline-threshold=N"N=0 disables the pipeline entirely (legacy path), N=1 always uses it, N>1 only routes mutations with ≥ N edges to it. The pipeline pays a per-predicate goroutine spin-up cost, so tiny mutations are slightly slower on it; bulk multi-predicate mutations are faster (crossover ≈ 100 edges in in-process benches; live-loading the 1M dataset is ~1.5× faster end-to-end at threshold=1 vs. legacy).

Status: local ./t --suite=systest-baseline, ./t --suite=load (incl. systest/bgindex), and the unit tests are green; the default is currently flipped to 1 so CI exercises the pipeline path on every mutation. Before merge, reset the default to 0 (off-by-default, opt-in via the feature flag) once the canonical Linux CI has been clean for a soak window. Operators who want the bulk-load speedup can opt in with a higher threshold (e.g. 200) so only large mutations take the new path.

Commits in this PR are stacked for easier review.

TODO: set the default mutations-pipeline-threshold to either zero or 100 (following more testing).

@ghost ghost self-requested a review July 10, 2025 10:03
@github-actions github-actions Bot added area/testing Testing related issues area/schema Issues related to the schema language and capabilities. area/core internal mechanisms go Pull requests that update Go code labels Jul 10, 2025
@ghost ghost force-pushed the harshil-goel/mutation-pipeline branch from e4e2caf to f0a5928 Compare August 1, 2025 17:17
Comment thread types/conversion.go Fixed
Comment thread types/conversion.go Fixed
@trunk-io
Copy link
Copy Markdown

trunk-io Bot commented Aug 2, 2025

Static BadgeStatic BadgeStatic BadgeStatic Badge

Failed Test Failure Summary Logs
TestOrderAndOffset The test failed due to an unexpected EOF error when trying to send a POST request to the GraphQL endpoint. Logs ↗︎
TestSystestSuite/TestCountIndexConcurrentSetDelScalarPredicate The test failed because the expected value was 1, but the actual value was 0, indicating a mismatch in the scalar predicate count. Logs ↗︎
TestSystestSuite The test suite failed to complete successfully. Logs ↗︎

View Full Report ↗︎Docs

@ghost ghost force-pushed the harshil-goel/mutation-pipeline branch from d233f29 to e8a07d6 Compare August 26, 2025 02:32
@github-actions github-actions Bot added the area/documentation Documentation related issues. label Aug 26, 2025
@ghost ghost force-pushed the harshil-goel/mutation-pipeline branch from b5c3b4c to 978a0d4 Compare August 27, 2025 11:07
@github-actions github-actions Bot added the area/graphql Issues related to GraphQL support on Dgraph. label Aug 31, 2025
@matthewmcneely matthewmcneely requested a review from a team as a code owner May 1, 2026 16:29
@matthewmcneely
Copy link
Copy Markdown
Contributor

matthewmcneely commented May 1, 2026

Revival of harshil-goel/mutation-pipeline — summary of changes on top of 978a0d41db0c464598

The original WIP added the per-predicate mutation pipeline but was abandoned with a red CI (most notably TestCountIndexConcurrentSetDelScalarPredicate) and lots of WIP debris. This work picks it up and gets it green end-to-end against ./t --suite=systest-baseline, the acl integration suite, and systest/21million/live (the bulk-loader-shaped workload that motivated the original PR).

Phase 1 — cleanup (no behavior change)

Commit What
436b55f8b Resolve the merge of main into the branch (import paths hypermodeinc/dgraphdgraph-io/dgraph, license header, two-line fix in mvcc_test.go::TestRegression9597 for the new Deltas type).
a2fcc09f7 Strip ~30 unconditional fmt.Println debug calls from hot paths (posting/lists.go, index.go, list.go, mvcc.go). They fired on every read/mutation/rollup.
4fe27e1d4 Remove dead commented-out code: ~175 lines of an alternate InsertTokenizerIndexes impl, plus the 101 commented-out lines of BenchmarkProcessListIndex in worker/draft_test.go (referenced methods that don't exist).
8fbb0bdf2 Replace the hardcoded featureFlag := true in applyMutations with a real superflag knob mutations-use-pipeline. (Subsequently superseded by the threshold form below.)

Phase 2 — correctness fixes (real bugs)

Commit What
2d2afb190 Document why TestCount in worker/sort_test.go is t.Skip()'d. The test bypasses Oracle conflict checking; both legacy and new paths fail it.
43b826d0a Scalar Del-of-old-value wiping the new Set in ProcessCount. handleOldDeleteForSingle appends a synthetic Del oldVal posting next to the user's Set newVal so index/count diffing can see the prior value. ProcessCount then iterated postings and called updateMutationLayer(post, singleUidUpdate=true) on each. For non-Lang scalars fingerprintEdge returns math.MaxUint64, so both postings collide on the same Uid; the second iteration's Del unconditionally rewrote currentEntries to [DeleteAll], dropping the new value. Fix: in the !isListEdge branch, when the postings list contains a Set/Ovr, treat any Del as synthetic and skip it for the data-list update.
792923279 InsertTokenizerIndexes deadlock on uids ≥ 2^63. The dispatcher in posting/index.go did chMap[int(uid)%numGo] <- uid. int(uid) is signed, so for any uid in [2^63, 2^64) the modulo result is negative; chMap[-N] returns Go's zero value — a nil channel — and sending on a nil channel blocks forever. Surfaced by live load on 1million.rdf.gz: the very first batch wedged. Two-character fix: chMap[int(uid%uint64(numGo))] <- uid.
8ccdae630 IterateDisk had IsEmpty stubbed to false (introduced by branch commit 41d6445ce1 "fixed some bug" with no comment). That forced every key the iterator found to be reported as non-empty. Broke has(<predicate>) for any uid whose value had been removed via star-deletion. Surfaced by TestSystestSuite/TestHasDeletedEdge in systest/mutations-and-queries. Restored.
de90d97e0 Nil-pointer deref in ProcessSingle on multi-Del-per-uid mutations + strip two leftover fmt.Println("READING…") calls in worker/task.go that the Phase 1B sweep missed. The GraphQL deleteTask(filter: {}) cleanup expands into multiple Del edges per entity (one per predicate); when ≥2 land on the same uid in one batch, findSingleValueInPostingList returns nil on the second iteration, then string(edge.Value) == string(oldVal.Value) panics. Fix: nil-guard the deref and move oldVal per-iteration so stale values can't bleed across edges. Surfaced by graphql/e2e/auth/TestOrderAndOffset.
0a9ffc174 Reused-key-buffer aliasing across ReadPostingList, the in-memory cache, and the rollup queue. The pipeline's ProcessCount loop allocates one dataKey buffer per call and mutates its trailing 8 bytes per uid. Two distant sites captured the slice header rather than the bytes: (a) ReadPostingList's l.key = key aliased the caller's buffer, and saveInCache stored a copyList that shared the alias — by the time async rollup pulled the cached list and built kv.Key = alloc.Copy(l.key), the bytes had collapsed to whatever uid was processed last, so a BitCompletePosting (with WithDiscard) got written to a different list's key, silently overwriting an unrelated reverse list with this list's contents; (b) addKeyToBatch appended the slice header verbatim, so all queued rollup targets from one ProcessCount goroutine ended up pointing at the same final-iteration key. Both fixed by taking ownership of the bytes. Legacy runMutation allocates a fresh key per call so it never aliased anything; the bug was only visible when a caller deliberately reused one buffer the way ProcessCount does. This was the cause of systest/21million/live's consistent failure.

Feature flag — threshold instead of on/off

Commit What
c8f5fbcf3 Replace MutationsUsePipeline bool with MutationsPipelineThreshold int. A mutation runs through the pipeline only when threshold > 0 && len(m.Edges) >= threshold; 0 = pipeline off, 1 = always on, larger values opt small interactive mutations out (benches show ~2× slowdown for ≤10-edge txns). CLI: dgraph alpha --feature-flags="mutations-pipeline-threshold=200".
86ad1eea9 Flip the default to 1 so test suites exercise the pipeline path on every mutation. Reset to 0 before merging once the canonical CI has been green for a soak window.

Tests / regressions

Commit What
7c95203b8 Four in-process regression harnesses for [uid] @reverse @count: single-txn, multi-batch sequential, multi-batch multi-predicate (parallel goroutines), and multi-batch concurrent through a fakeOracle conflict-checking harness. None reproduce the buffer-aliasing bug fixed in 0a9ffc1 — that needs the async rollup path that only fires on a real running cluster — but they pin down the in-memory mutation path's reverse-list bookkeeping.
d0c631a16 New systest/21million/queries/query-073: an exhaustive count(~genre) check across every one of the 592 non-empty Genre entities, with the exact expected count pinned. Catches any future regression that mis-routes reverse edges, which the existing query-017 only does indirectly.

Benchmarks

In-process matrix (worker/pipeline_bench_test.go, included in 792923279) on Apple M4 Pro, 30 iterations:

Config Legacy ns/op Pipeline ns/op Pipeline / Legacy
1 pred × 1 edge 39,753 89,581 2.25× slower
10 preds × 1 edge 190,672 451,854 2.37× slower
1 pred × 100 edges 654,775 727,931 1.11× slower
50 preds × 100 edges 31,630,750 29,959,582 1.06× faster
10 preds × 100 edges 9,448,342 7,441,912 1.27× faster
1 pred × 1000 edges 5,753,151 4,957,482 1.16× faster
10 preds × 1000 edges 69,132,992 58,908,736 1.17× faster
10 preds × 1000 edges, no index 31,308,575 20,238,396 1.55× faster
50 preds × 1000 edges 403,867,619 375,424,165 1.07× faster

Crossover is around 100 total edges.

End-to-end live load against the 1M dataset (1million.rdf.gz, official 1M schema with mixed [uid] @reverse @count, [uid] @count, datetime @index(year), string @index(hash, term, trigram, fulltext) @lang, geo @index(geo), string @index(exact) @upsert):

mutations-pipeline-threshold Avg time Avg N-Quads/s
0 (legacy) ~14.3 s ~77K
1 (always-on pipeline) ~9.5 s ~116K

~1.50× faster on a realistic bulk-shaped workload — the case the per-predicate runner was designed for.

Test results

With mutations-pipeline-threshold=1:

  • posting/, worker/, types/, schema/ — all unit tests pass, including the new TestPipelineCountIndexConcurrent and four TestPipelineReverseListCount* harnesses under -count=20 -race.
  • ./t --suite=systest-baseline — all 13 packages green: systest, systest/acl/restore, systest/audit, systest/audit_encrypted, systest/backup/filesystem, systest/cdc, systest/cloud, systest/export, systest/group-delete, systest/loader, systest/multi-tenancy, systest/mutations-and-queries (66 tests including the previously-failing TestHasDeletedEdge), systest/plugin. Total wall clock 7m38s.
  • ./t --suite=core,integrationacl (150.8s) green plus four large systest/vector tests (TestVectorIncrBackupRestore 124s, TestVectorBackupRestore 72s, TestVectorBackupRestoreDropIndex 23s, TestVectorBackupRestoreReIndexing 110s). Suite then died on a pre-existing Mac-only bug in dgraphtest (commit ef6f27da79 "Fix running tests on mac" on main) that's unrelated to the pipeline.
  • systest/21million/live TestQueries — green. Sweep across all 764 Genre entities reports count(~genre) == count(forward edges) for every genre. Pre-fix the same sweep showed sets of 4 mismatched genres with thousands of stale or missing reverse entries each load, with the specific affected genres varying run-to-run.

Out of scope, worth noting

  • The bulk loader (dgraph/cmd/bulk/) doesn't import any of the pipeline machinery. The "speed up bulk loader" framing in the PR's lineage doesn't survive code inspection — the bulk loader has its own map-reduce path.
  • TestCount in worker/sort_test.go is left t.Skip()'d with a clear reason (harness lacks Oracle conflict checking; both legacy and pipeline fail it).

Recommended deployment

  • Reset the default to 0 before merging.
  • Operators who want the speedup on bulk-only workloads can opt in with --feature-flags="mutations-pipeline-threshold=200" (or whatever the per-mutation crossover is for their workload).
  • Setting 1 always-on is fine for systest CI runs and is a useful "ramp-up" knob.

@blacksmith-sh

This comment has been minimized.

darkcoderrises and others added 15 commits May 4, 2026 21:48
Adds an alternative path through `applyMutations` that fans the edges of
a single mutation out by predicate and processes each predicate's batch
in its own goroutine, exposing parallelism the legacy serial-by-edge
path cannot. The per-predicate runners drive the full mutation lifecycle
(scalar/list, reverse, count, tokenized index, vector index) through new
helpers in posting/index.go: `MutationPipeline.Process` →
`ProcessPredicate` → `ProcessSingle` / `ProcessList` / `ProcessVectorIndex`,
with `InsertTokenizerIndexes`, `ProcessReverse`, and `ProcessCount`
shared across the predicate-shaped paths.

Supporting infrastructure:
- `posting.Deltas` — per-txn delta store split into a sharded raw-bytes
  map and a per-predicate `indexMap` for batched index writes.
- `types.LockedShardedMap` — generic sharded RWMutex map used by Deltas
  and the in-memory index aggregation.
- New mutation-time helpers on `posting.Txn` (`AddDelta`,
  `GetScalarList`, `addConflictKey`, `addConflictKeyWithUid`).
- Test scaffolding in `worker/mutation_unit_test.go`,
  `worker/sort_test.go`, and `worker/draft_test.go`.

This commit squashes the original WIP series authored by Harshil Goel
(commits 978a0d4…41d6445ce on the abandoned branch) plus the merge
into current `main` and the mechanical clean-ups required to compile
against it: import paths `hypermodeinc/dgraph` → `dgraph-io/dgraph`,
license headers, a `posting/mvcc_test.go::TestRegression9597` fix-up
for the `LocalCache.deltas` map → `*Deltas` refactor, and a stray doc
reference in TESTING.md.

The pipeline ships gated and disabled; the on/off knob and subsequent
correctness fixes follow in later commits in this series.

Co-Authored-By: Matthew McNeely <matthew.mcneely@gmail.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Removes the unconditional debug prints scattered through posting/
during the original WIP work — they fired on every read, mutation,
rollup, and commit. None of them were guarded by a verbosity flag,
so under load they would have produced megabytes of stdout noise
per second.

Sites stripped:
- posting/lists.go: READING / READING SINGLE / GETTING KEY FROM DELTAS
- posting/index.go: TOKENS, LOCAL MAP, INSERTING INDEX, UPDATE INDEX,
  ERRORRRING, "Inserting tokenizer indexes ... took"
- posting/mvcc.go: COMMITTING (and unused fmt import)
- posting/list.go: "Buidlding committed uids", "Setting mutation",
  PrintRollup helper (called once internally, never elsewhere)

Left in place: printTreeStats() in index.go, which is already
gated by the DEBUG_SHOW_HNSW_TREE env var and is an intentional
opt-in HNSW debug helper.

No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two large dead-code blocks left over from the original WIP:

- posting/index.go: ~175 lines of an alternate
  InsertTokenizerIndexes implementation, fully commented out. The
  live implementation directly above it supersedes it; keeping the
  commented variant just made the file harder to follow. Also drop
  the scattered "//fmt.Println(...)" leftovers next to live code.

- worker/draft_test.go: BenchmarkProcessListIndex was added entirely
  commented out and references methods (DefaultPipeline,
  ProcessListWithoutIndex, ProcessListIndex) that don't exist on
  MutationPipeline. If we want a benchmark for the pipeline, we
  should write one against the real API.

No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the hardcoded \`featureFlag := true\` in applyMutations with
a real superflag knob, defaulted off:

- Add WorkerOptions.MutationsUsePipeline (bool) in x/config.go.
- Extend the feature-flags superflag with mutations-use-pipeline=false
  and wire alpha to populate WorkerConfig.MutationsUsePipeline from it.
- worker/draft.go applyMutations now branches on
  x.WorkerConfig.MutationsUsePipeline; default false routes mutations
  through the legacy path, preserving current behavior.

Tests can opt into the new pipeline by setting
x.WorkerConfig.MutationsUsePipeline = true. CLI usage:
  dgraph alpha --feature-flags="mutations-use-pipeline=true"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
TestCount is t.Skip()'d on the branch, but the reason wasn't
recorded. Investigation: the test launches concurrent transactions
sharing entity uids and bypasses the Oracle's conflict-checking
commit path — it just calls CommitToDisk() directly with disjoint
commit timestamps. Both the legacy AddMutationWithIndex path and
the new mutation pipeline fail it identically: with two threads
adding edges to the same subject's [uid] @count predicate, neither
path can serialize @count updates without real txn conflicts, so
the count index ends up inconsistent and many subjects are missing
from count(N).

This is expected without conflict checking — the unit harness can't
exercise the safety the Oracle provides. Re-enable when we wire
either: (a) Oracle.WaitForTs/IsAborted into the harness, or (b)
this test through worker.applyMutations() (which does invoke the
Oracle conflict path).

Single-thread TestCount passes, so the per-predicate pipeline's
own count logic is correct in the absence of contention. The
existing TestStringIndexWithLang covers the multithreaded happy
path with disjoint uids.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ssCount

Bug: scalar @count writes were nondeterministically losing data under
concurrent transactions. Roughly half the deltas committed by the
new mutation pipeline contained only a [DeleteAll] posting and no
Set, so reads at maxTs returned an empty value list.

Root cause: in ProcessSingle, handleOldDeleteForSingle appends a
synthetic Del-of-old-value to postings[uid] alongside the user's
Set, so InsertTokenizerIndexes / ProcessReverse / count diffing can
see the prior value. ProcessCount then iterates the postings and
calls list.updateMutationLayer(post, singleUidUpdate=true, ...) on
each. For non-Lang scalar predicates fingerprintEdge returns
math.MaxUint64, so the synthetic Del and the user Set both have
Uid == math.MaxUint64. The first iteration (Set new) leaves
mutationMap.currentEntries = [DeleteAll, Set new]; the second
iteration (Del old) finds the Set we just inserted via findPosting
and applies updateMutationLayer in singleUidUpdate mode, which
unconditionally rewrites currentEntries to [DeleteAll] (the Del
branch never appends mpost) — wiping the new value.

Fix: in ProcessCount, when iterating a !isListEdge predicate's
postings, if the list contains a Set/Ovr posting, treat any Del as
synthetic and skip it for the data-list update. Standalone user Dels
(no accompanying Set) are still applied. Index/reverse/count diffing
already happen before ProcessCount runs and aren't affected.

Repro: TestPipelineCountIndexConcurrent in worker/sort_test.go is a
new conflict-aware in-process harness that mirrors the systest
TestCountIndexConcurrentSetDelScalarPredicate. It runs 200
contending transactions setting <0x1> <name> "name<rand>" against a
"string @index(exact) @count" schema with a fakeOracle that
implements the same hasConflict algorithm as
dgraph/cmd/zero/oracle.go. Pre-fix the test fails roughly 50% of
runs with an empty data list and the wrong count buckets; post-fix
it is stable across 20+ -count iterations and under -race.

Existing tests (TestScalarPredicateIntCount, *RevCount, *Count,
TestSingleUidReplacement, TestDeleteSetWithVarEdgeCorruptsData,
TestStringIndexWithLang, TestMultipleTxnListCount, TestGetScalarList,
TestDatetime) all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When loading via dgraph live (or any mutation source whose uids span
the full uint64 range, including xidmap-assigned uids), the
per-predicate pipeline hung indefinitely on the very first batch with
zero forward progress. A goroutine dump showed the dispatcher
goroutine wedged on \`chan send (nil chan)\` at the line:

    chMap[int(uid)%numGo] <- uid

uid is uint64. Casting directly to int produces a negative value for
uid >= 2^63, so int(uid)%10 can be in [-9, -1]. chMap[-3] returns the
zero value for a chan uint64, which is a nil channel; sending on a
nil channel blocks forever.

The 10 worker goroutines (also created here) were idle on
\`for uid := range uids\` since no uids ever reached them, so the
parent \`wg.Wait()\` and the surrounding errgroup never returned.
applyMutations therefore never released the txn, the alpha's old-txn
abort loop kept retrying every minute, and live-load showed
"Txns: 0  N-Quads: 0" indefinitely.

Fix: hash unsigned, then cast: \`chMap[int(uid%uint64(numGo))]\`.

Verified end-to-end with the live loader against the 1million.rdf.gz
benchmark dataset (1,041,684 n-quads, schema mixes [uid] @reverse
@count, [uid] @count, datetime @index(year), string @index(...) @lang,
geo @index(geo), string @index(exact) @upsert):

  legacy   :  13.85s / 14.74s  (avg ~14.3s,  ~77k n-quads/s)
  pipeline :   9.65s /  9.36s  (avg ~9.5s,  ~116k n-quads/s)

That is ~1.50x faster on a realistic multi-predicate, multi-index
workload — i.e. the case the per-predicate runner pipeline is built
for.

Also adds worker/pipeline_bench_test.go: in-process Go benchmarks
comparing legacy runMutation vs newRunMutations across a matrix of
(predicates, edges-per-predicate, indexed/non-indexed) shapes. They
show the pipeline loses ~2x on tiny mutations (1-10 edges) and wins
1.2x-1.55x on bulk (10 preds x 100+ edges, indexed or not), which is
why the feature flag stays default-off and the live-loader speedup
above is the right place to evaluate this work.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The benchmark matrix in worker/pipeline_bench_test.go showed the
pipeline loses ~2x on small mutations (≤10 edges total) and wins
~1.5x on bulk (live-loader sized: 1000 edges per txn across many
predicates). A binary on/off flag forces an all-or-nothing choice,
penalising whichever side of that crossover the workload spends most
time on.

Replace MutationsUsePipeline (bool) with MutationsPipelineThreshold
(int):

  threshold = 0   -> never use the pipeline (default; legacy behavior)
  threshold = 1   -> always use the pipeline (any txn with ≥1 edge)
  threshold = N   -> use the pipeline only when len(m.Edges) >= N

The threshold compares against total edges in the proposal. From the
benches the crossover is around 100; the live-loader 1M dataset uses
~1000 edges per txn, so anything from 100-1000 will engage the
pipeline only on bulk-shaped mutations and leave small interactive
mutations on the legacy serial path.

Wiring:
  - x.WorkerConfig.MutationsPipelineThreshold (int) replaces the
    bool field.
  - feature-flags superflag: "mutations-pipeline-threshold=0".
  - alpha/run.go reads it via featureFlagsConf.GetInt64.
  - worker/draft.go applyMutations branches on
    `t > 0 && len(m.Edges) >= t`.

Verified end-to-end against the live-loader benchmark
(1million.rdf.gz, official 1M schema):
  threshold=0   : 13.56s, 80,129 N-Quads/s    (legacy, matches baseline)
  threshold=1   :  9.92s, 115,742 N-Quads/s   (always-on, matches prior)

CLI usage:
  dgraph alpha --feature-flags="mutations-pipeline-threshold=200"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Engages the per-predicate mutation pipeline by default so that systest
runs (and any other test suites that don't override feature-flags)
exercise the pipeline path on every mutation, not the legacy serial
path. Threshold of 1 means "any mutation with ≥1 edge takes the
pipeline" — i.e. always on.

This is a deliberate ramp toward shipping the pipeline. Operators who
want to opt small interactive mutations out of the pipeline (where
benches showed ~2x slowdown for ≤10-edge txns) can set a higher
threshold:

  dgraph alpha --feature-flags="mutations-pipeline-threshold=200"

To turn the pipeline fully off, set 0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The original branch commit 41d6445 ("fixed some bug") replaced
the IsEmpty(readTs) call in IterateDisk with a hardcoded `false`,
forcing every key found by the iterator to be reported as non-empty.
That broke has(<predicate>) for any uid whose value had been removed
via star-deletion (<uid> <pred> *): the data list still exists in
badger with a DeleteAll marker on top, but the live posting list is
empty at readTs — IsEmpty returns true and the uid should be skipped.

Surfaced by systest TestSystestSuite/TestHasDeletedEdge in
systest/mutations-and-queries: 3 nodes are created with <end> "",
one is star-deleted, follow-up has(end) is expected to return 2 uids.
With IsEmpty stubbed to false it returned 3.

No comment was left on the original change. Restoring the call. The
mutations-and-queries package is fully green with this in place
(66/66 tests pass including TestHasDeletedEdge); if a real underlying
issue motivated the original disable we'll chase it with a real
diagnosis instead of silently dropping a safety check.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…p stale READING debug prints

Two bugs surfaced by graphql/e2e/auth's TestOrderAndOffset (cleanup
mutation \`mutation DelTask { deleteTask(filter: {}) }\` crashed the
alpha mid-request, causing the test client to see EOF on POST):

1. posting/index.go ProcessSingle SIGSEGV at line 675.

   GraphQL deleteTask cleanup expands into multiple Del edges per
   entity (one per predicate the entity has — uid, type, list edges,
   etc.). When two Del edges to the same uid land in one transaction's
   batch, the second iteration through ProcessSingle's per-edge loop
   does:

     pl, exists := postings[uid]
     if exists {
         if edge.Op == DEL {
             oldVal = findSingleValueInPostingList(pl)
             if string(edge.Value) == string(oldVal.Value) { ... }
                                              ^^^^^^ nil deref
         }
     }

   findSingleValueInPostingList only returns Set postings; if the
   accumulated list holds only Dels (from the prior iteration), it
   returns nil and we panic dereferencing oldVal.Value.

   Two fixes here:
     - Guard the deref: \`if oldVal != nil && string(...) == ...\`.
     - Move \`var oldVal *pb.Posting\` inside the loop. It was declared
       at function scope, so a stale value from one edge could bleed
       into the nil-guarded branch for a different uid on a later
       iteration. Per-edge scope makes the intent explicit.

2. worker/task.go: two leftover \`fmt.Println("READING SINGLE", ...)\`
   and \`fmt.Println("READING", ...)\` calls in the value-postings
   read path. Same class of debug spew Phase 1B stripped from
   posting/, missed because that sweep didn't include worker/.
   Removed both. Safe — they were unconditional prints on every
   query value read.

The graphql/e2e/auth and graphql/e2e/auth/debug_off packages now both
pass in 30s. \`./posting/\` and \`./worker/\` unit tests still green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…addKeyToBatch

The 21million live-load systest failed with `~genre @filter(uid(F))`
returning wrong films for some genres — corruption pattern was that
one genre's reverse list contained another genre's films verbatim,
while a third genre's reverse list was missing thousands of entries.
Different genres were affected on each load, but always: legacy
runMutation produced correct reverse counts; the per-predicate
pipeline produced corrupt ones.

Root cause: the pipeline's ProcessCount per-uid loop allocates one
key buffer per ProcessCount call and mutates its trailing 8 bytes
each iteration via `binary.BigEndian.PutUint64(dataKey[len-8:], uid)`.
Two distinct sites then captured the slice header rather than the
bytes:

1. ReadPostingList's `l.key = key` aliased the caller's buffer.
   saveInCache stores a copyList of the freshly-read list, and
   copyList sets `key: l.key` — also an alias. The cached list's
   key field therefore points at a buffer that the pipeline keeps
   mutating; by the time the async rollup path retrieves the
   cached list and runs `kv.Key = alloc.Copy(l.key)` to build the
   rolled-up KV, the bytes are whatever the LAST iteration left
   behind. Rollup then writes a BitCompletePosting with
   WithDiscard() to the wrong key, overwriting an unrelated
   reverse list with this list's (rolled-up) contents.

2. ReadPostingList's defer `IncrRollup.addKeyToBatch(key, ...)`
   appended the slice header verbatim to the rollup queue. Every
   queued entry from one ProcessCount goroutine ended up pointing
   at the same shared dataKey buffer; by the time the rollup
   goroutine processed the batch the bytes had collapsed to the
   final iteration's uid, redirecting many distinct rollup
   targets to the same key.

Both sites fixed by taking ownership of the bytes (`append nil` /
explicit `make+copy`). Legacy runMutation hits ReadPostingList too
but allocates a fresh key per call, so it never aliased anything;
the bug is only visible when a caller deliberately reuses one
buffer across many uids the way ProcessCount does.

Verified end-to-end on the systest/21million/live load with
mutations-pipeline-threshold=1: a fresh load + sweep across all 764
Genre entities now reports `count(~genre) == count(forward edges)`
for every genre. Pre-fix the same sweep showed 4 mismatched genres
with thousands of stale or missing reverse entries; on a different
load it showed Documentary missing 23,325 of its 31,370 reverse
entries while Children's/Family had 1,435 stale Crime-Thriller
entries.

Unit tests in posting/ and worker/ still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Added in pursuit of the 21million live-load failure that turned out
to be the dataKey buffer-aliasing bug fixed in 0a9ffc1. None of
these tests reproduce that specific corruption — it needs the async
rollup path that only triggers on a real running cluster — but they
pin down what the per-predicate pipeline does correctly for the
non-rollup cases:

- TestPipelineReverseListCount: one transaction, multiple subjects
  pointing at multiple objects on a [uid] @reverse @count predicate.
  Verifies forward and reverse lists are both complete.
- TestPipelineReverseListCountMultiBatch: 50 subjects x 20 objects
  spread across 143 sequential transactions in batches of 7.
- TestPipelineReverseListCountMultiPred: 30 subjects x 12 objects
  across 3 distinct list-uid + reverse + count predicates, with
  shuffled edges and small batches so the per-predicate pipeline
  goroutines for each predicate run in parallel inside Process().
- TestPipelineReverseListCountConcurrent: same shape as the
  multi-batch case but with 10 worker goroutines submitting batches
  in parallel through the fakeOracle conflict-checking harness.

All four pass cleanly. They guard against regressions in the
in-memory mutation path's reverse-list bookkeeping; the rollup path
that the 0a9ffc1 fix actually targets is exercised by the
systest/21million/live integration test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…empty genres

A comprehensive `count(~genre)` check across every Genre entity in
the 21million dataset. The query asks for each genre's name and the
size of its reverse posting list, sorted by name; the fixture pins
the expected count for all 592 genres that have at least one film.

Motivation: the per-predicate mutation pipeline had a buffer-aliasing
bug (fixed in 0a9ffc1) that produced wrong reverse-list contents
on a different small subset of genres each load — sometimes
Documentary lost most of its 31,370 entries, sometimes
Children's/Family gained ~1,500 spurious Crime-Thriller entries,
sometimes Backstage Musical and Indie film were the affected pair.
The existing query-017 (Taraji-films-by-genre) only catches it when
that specific actor's films happen to land on a corrupted genre,
and query-016 / query-044 only test genres above 30,000 films.

This new query exercises every genre's reverse list and pins the
exact expected count, so any regression that mis-routes reverse
edges for any genre will fail it directly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two related bugs in the per-predicate mutation pipeline that surface as
stale index entries under TestStringIndex (systest/bgindex):

1. Process() (the star-delete fast path) was passing the caller's bare
   context to handleDeleteAll. schema.State().IsIndexed / IsReversed /
   HasCount only consult the pending mutSchema (the new schema with the
   index being built) when the context carries the isWrite flag. The
   legacy runMutation path calls schema.GetWriteContext at the top; the
   pipeline didn't. During a background index build, every star-delete
   saw isIndexed=false and skipped addIndexMutations(DEL) for the prior
   value, leaving stale uids in the index permanently. Lifting
   GetWriteContext into Process covers both the star-delete path and the
   later predicate-pipeline goroutines.

2. ProcessSingle's data-list write was passing the unfiltered postings
   list to AddDelta. handleOldDeleteForSingle appends a synthetic
   Del(oldVal) alongside the user's Set(newVal) so InsertTokenizerIndexes
   and ProcessReverse can emit Del-of-old entries. For scalar non-list
   non-lang predicates both postings share Uid == math.MaxUint64, and at
   read time pickPostings' equal-ts tie-break falls back to Go's unstable
   sort.Slice while setMutationAfterCommit overwrites
   committedUids[mpost.Uid] in append order. Either way Del can clobber
   the new Set and the data list reads as "no value." ProcessCount
   already strips the synthetic Del via skipSyntheticDel; mirror that
   filter in ProcessSingle's main data-list path.

systest/bgindex now passes (TestStringIndex, TestReverseIndex,
TestCountIndex, TestParallelIndexing).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
matthewmcneely and others added 2 commits May 4, 2026 21:50
HNSW Insert performs multi-key read-modify-write across the entry-pointer
key, the per-level edge lists, and back-edges into the neighbors of the
new node. Per-list locks make each key's update atomic, but the
cross-key sequences ("read entry, lock entry, read neighbor, modify
neighbor, append back-edge to neighbor's list…") are not. Running 10
worker goroutines concurrently against the same txn cache lets updates
stomp on each other, leaving nodes that have a data-list entry but are
unreachable from the entry point.

Surface symptom: similar_to(k=N) returns fewer than N hits even though
all N vectors committed to the data list. TestVectorTwoTxnWithoutCommit
reliably reproduced this on Linux CI (and not on the macOS dev box,
likely a scheduling artifact).

Legacy applyMutations arrives at single-threaded vector handling
indirectly via x.DivideAndRule: for any num < 256 the helper rounds
numGo down to 1, so a 5-edge vector mutation runs serially on main and
the bug never surfaces there. Mirror that here by dropping
numThreads to 1 — correctness over within-txn parallelism for vector
predicates. Cross-txn parallelism is unaffected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@matthewmcneely matthewmcneely force-pushed the harshil-goel/mutation-pipeline branch from 60f51c8 to 97609b3 Compare May 5, 2026 01:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/core internal mechanisms area/documentation Documentation related issues. area/graphql Issues related to GraphQL support on Dgraph. area/schema Issues related to the schema language and capabilities. area/testing Testing related issues go Pull requests that update Go code

Development

Successfully merging this pull request may close these issues.

3 participants