1. Priority
Medium - a production Spark consumer was fully aborted, but no data was lost (records all present, journal self-recovered within minutes) and clean workarounds exist (failOnDataLoss=false + dedup, or auto-retry on the exception). The trigger (broker primary hand-off during allocator convergence) is rare.
2. Scope and prevalence
Theoretical blast radius: any Kafka client reading Dekaf with failOnDataLoss=true (the librdkafka/Spark default). Worst on low-traffic journals with a long flushInterval, where the unpersisted spool window is large, so the persisted-head fallback lags live reads by a lot.
Observed prevalence: first observed occurrence. One journal, one consumer, single event. Triggered only when a consumer has read ahead into the live (unpersisted) spool AND the journal's primary hands off during that window.
3. Trigger (testable)
A Dekaf consumer reads recent data that is still in the broker's in-memory spool (not yet flushed to the fragment store), committing offset H_live. The journal's primary then hands off / the allocator reconverges. A ListOffsets/high-water-mark query during the churn returns the last persisted fragment boundary H_persisted < H_live. A failOnDataLoss=true consumer sees the latest offset move backward and aborts.
Observed values: committed 169994417, advertised-latest regressed to 169328047 (a delta of 666,370 bytes). Both are exact fragment boundaries of the journal.
4. Root cause + confidence
Confirmed (broker logs + fragment metadata + code). During a broker primary hand-off, the high-water-mark Dekaf advertises falls back to the last persisted fragment boundary, which lags the live spool the consumer had already read through. It is transient and self-heals once the new primary re-advertises the true head and the spool flushes.
5. Investigation steps / reproduction (testable)
Consumer error (scrubbed):
StreamingQueryException: [STREAM_FAILED]
Partition <topic>-0's offset was changed from 169994417 to 169328047, some data may have been missed.
Failure at 2026-06-29 12:50:54 UTC. Consumer: startingOffsets=earliest, failOnDataLoss=true, no maxOffsetsPerTrigger.
Fragment listing (flowctl collections list-fragments) on the journal .../<collection>/<gen>/pivot=00:
fragment [168909134, 169328047] persisted 2026-06-29 00:00:59 UTC
fragment [169328047, 169994417] persisted 2026-06-29 12:55:27 UTC <- persisted AFTER the failure
fragment [169994417, 170158711] persisted 2026-06-29 17:40:02 UTC
fragment [170158711, 170257291] no backingStore (open/unpersisted spool)
169994417 (committed) = end of a fragment that did not persist until 12:55:27 UTC, ~5 min after the failure.
169328047 (regressed latest) = end of the prior fragment (persisted 00:00:59 UTC).
- So the consumer had read the
[169328047, 169994417] range live from the spool (Dekaf served reads in that range continuously from 01:03 to 12:50 UTC) before it was persisted.
Data-plane (Gazette) broker logs, 2026-06-29 11:30-13:30 UTC window:
- Only events naming this journal: a primary hand-off at 12:55:26 UTC -
stopping local journal replica (primary:true) on broker-A, starting local journal replica (primary:false) on broker-B.
- Tail of a sustained allocator-convergence wave (continuous
solved for maximum assignment rounds with slot churn every ~30-90s from 12:43 through 12:55, ending converge iteration failed (will retry) at 12:55:26). A cohort of other journals reassigned in the same sweep.
- The
[169328047, 169994417] fragment persisted at 12:55:27 - the same instant as the hand-off, i.e. the departing primary flushing its spool as it relinquished the journal. For the whole convergence window the only persisted boundary was 169328047.
- Auto-suspension ruled out: 46
updated journal suspension events in the window, none for this journal/shard.
- Storage ruled out: fragment-store health checks succeeded at 12:52:01 / 12:52:11; no persist errors.
- The offset values themselves are not emitted in broker logs (read/route/connector-layer values), so they cannot be tied to a specific broker log line; the hand-off and persist-on-hand-off are the corroborating broker-side evidence.
Journal is healthy now: flags:4 (O_RDWR), no suspend in spec, head advanced past 170257291. No reset/backfill/retention (retention: 0s, liveSpec unchanged for weeks).
Contributing factor: collection flushInterval = 86400s (24h). Keeps up to ~a day of recent data in the broker spool unpersisted, maximizing how far the persisted-head fallback lags live reads.
6. Possible fixes
- Customer-immediate:
failOnDataLoss=false + idempotent sink (dedup on document key); or keep true and auto-restart the streaming query on this specific exception. No data loss either way.
- Data-preserving: shorten the collection
flushInterval to shrink the unpersisted spool window (trade-off: more fragment-store objects).
- Platform / code fix: make Dekaf's advertised high-water-mark monotonic per session (never return a latest below a value already served to that client), and/or prefer the live
ReadResponse.write_head over a persisted/suspend fallback that is below recently-served reads.
fetch_write_head returns broker write_head from a metadata-only read: crates/dekaf/src/topology.rs:487.
- Suspended path returns
suspend.offset: crates/dekaf/src/topology.rs:346-353.
- Kafka offset = journal byte offset - 1:
crates/dekaf/src/read.rs:441.
Open questions for the team:
- Should the advertised high-water-mark be monotonic per session across hand-off/suspend?
- Should the latest-offset path avoid falling back below recently-served reads?
- Is a 24h
flushInterval advisable for Dekaf-served journals given it widens this window?
7. References
1. Priority
Medium - a production Spark consumer was fully aborted, but no data was lost (records all present, journal self-recovered within minutes) and clean workarounds exist (
failOnDataLoss=false+ dedup, or auto-retry on the exception). The trigger (broker primary hand-off during allocator convergence) is rare.2. Scope and prevalence
Theoretical blast radius: any Kafka client reading Dekaf with
failOnDataLoss=true(the librdkafka/Spark default). Worst on low-traffic journals with a longflushInterval, where the unpersisted spool window is large, so the persisted-head fallback lags live reads by a lot.Observed prevalence: first observed occurrence. One journal, one consumer, single event. Triggered only when a consumer has read ahead into the live (unpersisted) spool AND the journal's primary hands off during that window.
3. Trigger (testable)
A Dekaf consumer reads recent data that is still in the broker's in-memory spool (not yet flushed to the fragment store), committing offset
H_live. The journal's primary then hands off / the allocator reconverges. AListOffsets/high-water-mark query during the churn returns the last persisted fragment boundaryH_persisted < H_live. AfailOnDataLoss=trueconsumer sees the latest offset move backward and aborts.Observed values: committed
169994417, advertised-latest regressed to169328047(a delta of 666,370 bytes). Both are exact fragment boundaries of the journal.4. Root cause + confidence
Confirmed (broker logs + fragment metadata + code). During a broker primary hand-off, the high-water-mark Dekaf advertises falls back to the last persisted fragment boundary, which lags the live spool the consumer had already read through. It is transient and self-heals once the new primary re-advertises the true head and the spool flushes.
5. Investigation steps / reproduction (testable)
Consumer error (scrubbed):
Failure at 2026-06-29 12:50:54 UTC. Consumer:
startingOffsets=earliest,failOnDataLoss=true, nomaxOffsetsPerTrigger.Fragment listing (
flowctl collections list-fragments) on the journal.../<collection>/<gen>/pivot=00:169994417(committed) = end of a fragment that did not persist until 12:55:27 UTC, ~5 min after the failure.169328047(regressed latest) = end of the prior fragment (persisted 00:00:59 UTC).[169328047, 169994417]range live from the spool (Dekaf served reads in that range continuously from 01:03 to 12:50 UTC) before it was persisted.Data-plane (Gazette) broker logs, 2026-06-29 11:30-13:30 UTC window:
stopping local journal replica(primary:true) on broker-A,starting local journal replica(primary:false) on broker-B.solved for maximum assignmentrounds with slot churn every ~30-90s from 12:43 through 12:55, endingconverge iteration failed (will retry)at 12:55:26). A cohort of other journals reassigned in the same sweep.[169328047, 169994417]fragment persisted at 12:55:27 - the same instant as the hand-off, i.e. the departing primary flushing its spool as it relinquished the journal. For the whole convergence window the only persisted boundary was169328047.updated journal suspensionevents in the window, none for this journal/shard.Journal is healthy now:
flags:4(O_RDWR), nosuspendin spec, head advanced past170257291. No reset/backfill/retention (retention: 0s, liveSpec unchanged for weeks).Contributing factor: collection
flushInterval=86400s(24h). Keeps up to ~a day of recent data in the broker spool unpersisted, maximizing how far the persisted-head fallback lags live reads.6. Possible fixes
failOnDataLoss=false+ idempotent sink (dedup on document key); or keeptrueand auto-restart the streaming query on this specific exception. No data loss either way.flushIntervalto shrink the unpersisted spool window (trade-off: more fragment-store objects).ReadResponse.write_headover a persisted/suspend fallback that is below recently-served reads.fetch_write_headreturns brokerwrite_headfrom a metadata-only read:crates/dekaf/src/topology.rs:487.suspend.offset:crates/dekaf/src/topology.rs:346-353.crates/dekaf/src/read.rs:441.Open questions for the team:
flushIntervaladvisable for Dekaf-served journals given it widens this window?7. References