From cb629b788fd48372c20cacc2abcf509d1a3d9caf Mon Sep 17 00:00:00 2001 From: Gu Jiawei Date: Sun, 22 Mar 2026 16:19:17 +0800 Subject: [PATCH] 1. fix back pressure and try-later replies missing hasMore bit reset error in inbox fetching. --- .../inbox/server/InboxFetchPipeline.java | 44 ++++++++++++------- .../server/InboxFetchPipelineMappingTest.java | 34 ++++++++++++++ 2 files changed, 61 insertions(+), 17 deletions(-) diff --git a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java index 7259742c9..2b67c1899 100644 --- a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java +++ b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java @@ -240,26 +240,36 @@ private void fetch(FetchState fetchState) { .setInboxId(inboxId) .setIncarnation(incarnation) .setFetched(fetched).build()); - if (fetched.getQos0MsgCount() > 0 || fetched.getSendBufferMsgCount() > 0) { - if (fetched.getQos0MsgCount() > 0) { - fetchState.lastFetchQoS0Seq.set( - fetched.getQos0Msg(fetched.getQos0MsgCount() - 1).getSeq()); + Fetched.Result result = fetched.getResult(); + switch (result) { + case OK -> { + if (fetched.getQos0MsgCount() > 0 || fetched.getSendBufferMsgCount() > 0) { + if (fetched.getQos0MsgCount() > 0) { + fetchState.lastFetchQoS0Seq.set( + fetched.getQos0Msg(fetched.getQos0MsgCount() - 1).getSeq()); + } + int fetchedCount = 0; + if (fetched.getSendBufferMsgCount() > 0) { + fetchedCount += fetched.getSendBufferMsgCount(); + fetchState.downStreamCapacity.accumulateAndGet( + fetched.getSendBufferMsgCount(), + (a, b) -> a == NOT_KNOWN_CAPACITY ? a : Math.max(a - b, 0)); + fetchState.lastFetchSendBufferSeq.set( + fetched.getSendBufferMsg(fetched.getSendBufferMsgCount() - 1).getSeq()); + } + fetchState.hasMore.set(fetchedCount >= request.params().getMaxFetch() + || fetchState.signalFetchTS.get() > fetchTS); + } else { + fetchState.hasMore.set(fetchState.signalFetchTS.get() > fetchTS); + } } - int fetchedCount = 0; - if (fetched.getSendBufferMsgCount() > 0) { - fetchedCount += fetched.getSendBufferMsgCount(); - fetchState.downStreamCapacity.accumulateAndGet(fetched.getSendBufferMsgCount(), - (a, b) -> a == NOT_KNOWN_CAPACITY ? a : Math.max(a - b, 0)); - fetchState.lastFetchSendBufferSeq.set( - fetched.getSendBufferMsg(fetched.getSendBufferMsgCount() - 1).getSeq()); - } - fetchState.hasMore.set(fetchedCount >= request.params().getMaxFetch() - || fetchState.signalFetchTS.get() > fetchTS); - } else { - fetchState.hasMore.set(fetchState.signalFetchTS.get() > fetchTS); + case BACK_PRESSURE_REJECTED, TRY_LATER -> fetchState.hasMore.set(true); + default -> fetchState.hasMore.set(false); } fetchState.fetching.set(false); - if (fetchState.downStreamCapacity.get() > 0 && fetchState.hasMore.get()) { + if (result == Fetched.Result.OK + && fetchState.downStreamCapacity.get() > 0 + && fetchState.hasMore.get()) { fetch(sessionId); } } catch (Throwable t) { diff --git a/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java b/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java index cff671dfa..fa13c6bb7 100644 --- a/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java +++ b/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java @@ -212,6 +212,40 @@ public void shouldNotRewindStartAfterWhenHintIsStale() { pipeline.close(); } + @Test + public void shouldRetryAfterTryLaterOnHint() { + InboxFetcherRegistry registry = new InboxFetcherRegistry(); + TestFetcher fetcher = new TestFetcher(); + InboxFetchPipeline pipeline = new InboxFetchPipeline(responseObserver, fetcher, registry); + + long sessionId = 3503L; + pipeline.onNext(hint(sessionId, 2)); + + FetchRequest firstRequest = fetcher.awaitRequest(); + assertNotNull(firstRequest); + + fetcher.completeNext(Fetched.newBuilder() + .setResult(Fetched.Result.TRY_LATER) + .build()); + + await().until(() -> { + synchronized (received) { + return !received.isEmpty(); + } + }); + + pipeline.onNext(hint(sessionId, 2)); + + FetchRequest retryRequest = fetcher.awaitRequest(); + assertNotNull(retryRequest); + + fetcher.completeNext(Fetched.newBuilder() + .setResult(Fetched.Result.OK) + .build()); + + pipeline.close(); + } + @Test public void shouldCleanStaleSessionIdWhenFetchStateMissing() throws Exception { InboxFetcherRegistry registry = new InboxFetcherRegistry();