Skip to content

dekaf: advertised high-water-mark regresses below served reads during journal primary hand-off (aborts failOnDataLoss consumers) #3092

Description

@jwhartley

1. Priority

Medium - a production Spark consumer was fully aborted, but no data was lost (records all present, journal self-recovered within minutes) and clean workarounds exist (failOnDataLoss=false + dedup, or auto-retry on the exception). The trigger (broker primary hand-off during allocator convergence) is rare.

2. Scope and prevalence

Theoretical blast radius: any Kafka client reading Dekaf with failOnDataLoss=true (the librdkafka/Spark default). Worst on low-traffic journals with a long flushInterval, where the unpersisted spool window is large, so the persisted-head fallback lags live reads by a lot.

Observed prevalence: first observed occurrence. One journal, one consumer, single event. Triggered only when a consumer has read ahead into the live (unpersisted) spool AND the journal's primary hands off during that window.

3. Trigger (testable)

A Dekaf consumer reads recent data that is still in the broker's in-memory spool (not yet flushed to the fragment store), committing offset H_live. The journal's primary then hands off / the allocator reconverges. A ListOffsets/high-water-mark query during the churn returns the last persisted fragment boundary H_persisted < H_live. A failOnDataLoss=true consumer sees the latest offset move backward and aborts.

Observed values: committed 169994417, advertised-latest regressed to 169328047 (a delta of 666,370 bytes). Both are exact fragment boundaries of the journal.

4. Root cause + confidence

Confirmed (broker logs + fragment metadata + code). During a broker primary hand-off, the high-water-mark Dekaf advertises falls back to the last persisted fragment boundary, which lags the live spool the consumer had already read through. It is transient and self-heals once the new primary re-advertises the true head and the spool flushes.

5. Investigation steps / reproduction (testable)

Consumer error (scrubbed):

StreamingQueryException: [STREAM_FAILED]
Partition <topic>-0's offset was changed from 169994417 to 169328047, some data may have been missed.

Failure at 2026-06-29 12:50:54 UTC. Consumer: startingOffsets=earliest, failOnDataLoss=true, no maxOffsetsPerTrigger.

Fragment listing (flowctl collections list-fragments) on the journal .../<collection>/<gen>/pivot=00:

fragment [168909134, 169328047]  persisted 2026-06-29 00:00:59 UTC
fragment [169328047, 169994417]  persisted 2026-06-29 12:55:27 UTC   <- persisted AFTER the failure
fragment [169994417, 170158711]  persisted 2026-06-29 17:40:02 UTC
fragment [170158711, 170257291]  no backingStore  (open/unpersisted spool)
  • 169994417 (committed) = end of a fragment that did not persist until 12:55:27 UTC, ~5 min after the failure.
  • 169328047 (regressed latest) = end of the prior fragment (persisted 00:00:59 UTC).
  • So the consumer had read the [169328047, 169994417] range live from the spool (Dekaf served reads in that range continuously from 01:03 to 12:50 UTC) before it was persisted.

Data-plane (Gazette) broker logs, 2026-06-29 11:30-13:30 UTC window:

  • Only events naming this journal: a primary hand-off at 12:55:26 UTC - stopping local journal replica (primary:true) on broker-A, starting local journal replica (primary:false) on broker-B.
  • Tail of a sustained allocator-convergence wave (continuous solved for maximum assignment rounds with slot churn every ~30-90s from 12:43 through 12:55, ending converge iteration failed (will retry) at 12:55:26). A cohort of other journals reassigned in the same sweep.
  • The [169328047, 169994417] fragment persisted at 12:55:27 - the same instant as the hand-off, i.e. the departing primary flushing its spool as it relinquished the journal. For the whole convergence window the only persisted boundary was 169328047.
  • Auto-suspension ruled out: 46 updated journal suspension events in the window, none for this journal/shard.
  • Storage ruled out: fragment-store health checks succeeded at 12:52:01 / 12:52:11; no persist errors.
  • The offset values themselves are not emitted in broker logs (read/route/connector-layer values), so they cannot be tied to a specific broker log line; the hand-off and persist-on-hand-off are the corroborating broker-side evidence.

Journal is healthy now: flags:4 (O_RDWR), no suspend in spec, head advanced past 170257291. No reset/backfill/retention (retention: 0s, liveSpec unchanged for weeks).

Contributing factor: collection flushInterval = 86400s (24h). Keeps up to ~a day of recent data in the broker spool unpersisted, maximizing how far the persisted-head fallback lags live reads.

6. Possible fixes

  1. Customer-immediate: failOnDataLoss=false + idempotent sink (dedup on document key); or keep true and auto-restart the streaming query on this specific exception. No data loss either way.
  2. Data-preserving: shorten the collection flushInterval to shrink the unpersisted spool window (trade-off: more fragment-store objects).
  3. Platform / code fix: make Dekaf's advertised high-water-mark monotonic per session (never return a latest below a value already served to that client), and/or prefer the live ReadResponse.write_head over a persisted/suspend fallback that is below recently-served reads.
    • fetch_write_head returns broker write_head from a metadata-only read: crates/dekaf/src/topology.rs:487.
    • Suspended path returns suspend.offset: crates/dekaf/src/topology.rs:346-353.
    • Kafka offset = journal byte offset - 1: crates/dekaf/src/read.rs:441.

Open questions for the team:

  1. Should the advertised high-water-mark be monotonic per session across hand-off/suspend?
  2. Should the latest-offset path avoid falling back below recently-served reads?
  3. Is a 24h flushInterval advisable for Dekaf-served journals given it widens this window?

7. References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions