From 8b53e4b7c7040db9bff8e0e09e0549dc5a100063 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 19 Mar 2026 17:45:50 +0000 Subject: [PATCH 1/3] KAFKA-20330: Ack handling improvement on broker restart --- .../clients/consumer/ShareConsumerTest.java | 143 ++++++++ .../internals/ShareConsumeRequestManager.java | 317 ++++++++++-------- .../consumer/internals/ShareFetch.java | 5 +- .../common/requests/ShareRequestMetadata.java | 2 + .../ShareConsumeRequestManagerTest.java | 22 +- 5 files changed, 340 insertions(+), 149 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 89ea1b38c50c4..8d79c9fec341a 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -48,8 +48,10 @@ import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.ShareSessionNotFoundException; import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.header.Header; @@ -1301,6 +1303,147 @@ public void testConsumerCloseOnBrokerShutdown() { "Consumer close should not wait for full timeout when broker is already shut down"); } + @ClusterTest + public void testLeaderRestartWithoutLeadershipChangeExplicitAcknowledgementSync() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer("group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { + + AtomicBoolean callbackCalled = new AtomicBoolean(false); + shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) -> { + System.out.println("EXCEPTION " + exception); + exception.printStackTrace(); + assertInstanceOf(NotLeaderOrFollowerException.class, exception); + callbackCalled.set(true); + }); + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + shareConsumer.subscribe(Set.of(tp.topic())); + + producer.send(record); + producer.flush(); + + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(20000)); + assertEquals(1, records.count()); + ConsumerRecord consumerRecord = records.iterator().next(); + + // Shutdown the broker + assertEquals(1, cluster.brokers().size()); + KafkaBroker broker = cluster.brokers().get(0); + cluster.shutdownBroker(0); + + broker.awaitShutdown(); + + // Restart the broker + cluster.startBroker(0); + + shareConsumer.acknowledge(consumerRecord); + Map> commitResult = shareConsumer.commitSync(Duration.ofMillis(30000)); + assertEquals(1, commitResult.size()); + TopicIdPartition tidp = commitResult.keySet().iterator().next(); + assertTrue(commitResult.get(tidp).isPresent()); + assertInstanceOf(NotLeaderOrFollowerException.class, commitResult.get(tidp).get()); + + assertTrue(callbackCalled.get()); + } + } + + @ClusterTest + public void testLeaderRestartWithoutLeadershipChangeExplicitAcknowledgementAsync() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer("group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { + + AtomicBoolean callbackCalled = new AtomicBoolean(false); + shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) -> { + assertInstanceOf(NotLeaderOrFollowerException.class, exception); + callbackCalled.set(true); + }); + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + shareConsumer.subscribe(Set.of(tp.topic())); + + producer.send(record); + producer.flush(); + + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(20000)); + assertEquals(1, records.count()); + ConsumerRecord consumerRecord = records.iterator().next(); + + // Shutdown the broker + assertEquals(1, cluster.brokers().size()); + KafkaBroker broker = cluster.brokers().get(0); + cluster.shutdownBroker(0); + + broker.awaitShutdown(); + + // Restart the broker + cluster.startBroker(0); + + shareConsumer.acknowledge(consumerRecord); + shareConsumer.commitAsync(); + + int maxRetries = 15; + int retries = 0; + while (retries < maxRetries) { + shareConsumer.poll(Duration.ofMillis(2000)); + if (callbackCalled.get()) { + break; + } + retries++; + } + + assertTrue(callbackCalled.get()); + } + } + + @ClusterTest + public void testLeaderRestartWithoutLeadershipChangeImplicitAcknowledgement() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer("group1")) { + + AtomicBoolean callbackCalled = new AtomicBoolean(false); + shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) -> { + assertInstanceOf(ShareSessionNotFoundException.class, exception); + callbackCalled.set(true); + }); + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + shareConsumer.subscribe(Set.of(tp.topic())); + + producer.send(record); + producer.flush(); + + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(20000)); + assertEquals(1, records.count()); + + // Shutdown the broker + assertEquals(1, cluster.brokers().size()); + KafkaBroker broker = cluster.brokers().get(0); + cluster.shutdownBroker(0); + + broker.awaitShutdown(); + + // Restart the broker + cluster.startBroker(0); + + int maxRetries = 15; + int retries = 0; + while (retries < maxRetries) { + shareConsumer.poll(Duration.ofMillis(2000)); + if (callbackCalled.get()) { + break; + } + retries++; + } + + assertTrue(callbackCalled.get()); + } + } + @ClusterTests({ @ClusterTest(serverProperties = { @ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0") diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index dd1b829bcb328..eb57df99e9626 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -255,7 +255,7 @@ public PollResult poll(long currentTimeMs) { // For record_limit mode, we only send a full ShareFetch to a single node at a time. // We prepare to build ShareFetch requests for all nodes with session handlers to permit - // piggy-backing of acknowledgements, and also to adjust the topic-partitions + // piggybacking of acknowledgements, and also to adjust the topic-partitions // in the share session, but if the request would contain neither of those, it can be skipped. boolean canSkipIfRequestEmpty = isShareAcquireModeRecordLimit() && target.id() != fetchRecordsNodeId.get(); @@ -288,9 +288,11 @@ private boolean isShareAcquireModeRecordLimit() { /** * Add acknowledgements for a topic-partition to the node's in-flight acknowledgements. + * If we cannot add acknowledgements, they are completed with {@link Errors#NOT_LEADER_OR_FOLLOWER} exception. + * This probably indicates the connection to the leader broker was lost, but then re-established without a + * leadership change, in which case the acknowledgements fail. * * @return True if we can add acknowledgements to the share session. - * If we cannot add acknowledgements, they are completed with {@link Errors#INVALID_SHARE_SESSION_EPOCH} exception. */ private boolean maybeAddAcknowledgements(ShareSessionHandler handler, Node node, @@ -299,7 +301,7 @@ private boolean maybeAddAcknowledgements(ShareSessionHandler handler, if (handler.isNewSession()) { // Failing the acknowledgements as we cannot have piggybacked acknowledgements in the initial ShareFetchRequest. log.debug("Cannot send acknowledgements on initial epoch for ShareSession for partition {}", tip); - acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception()); + acknowledgements.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); maybeSendShareAcknowledgementEvent(Map.of(tip, acknowledgements), true, Optional.empty()); return false; } else { @@ -540,49 +542,75 @@ private boolean areRequestStatesInProgress(Queue acknow public CompletableFuture> commitSync( final Map acknowledgementsMap, final long deadlineMs) { + final Cluster cluster = metadata.fetch(); final AtomicInteger resultCount = new AtomicInteger(); final CompletableFuture> future = new CompletableFuture<>(); final ResultHandler resultHandler = new ResultHandler(resultCount, Optional.of(future)); - final Cluster cluster = metadata.fetch(); + Map> acknowledgementsMapAllNodes = new HashMap<>(); + Map acknowledgementsMapCannotSend = new HashMap<>(); + acknowledgementsMap.forEach((tip, nodeAcks) -> { + if ((cluster.nodeById(nodeAcks.nodeId()) == null) || isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) { + Acknowledgements prevAcks = acknowledgementsMapCannotSend.putIfAbsent(tip, nodeAcks.acknowledgements()); + if (prevAcks != null) { + prevAcks.merge(nodeAcks.acknowledgements()); + } + } else { + Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new HashMap<>()); + Acknowledgements prevAcks = acksMap.putIfAbsent(tip, nodeAcks.acknowledgements()); + if (prevAcks != null) { + prevAcks.merge(nodeAcks.acknowledgements()); + } + } + }); + + resultCount.addAndGet(acknowledgementsMapCannotSend.size()); sessionHandlers.forEach((nodeId, sessionHandler) -> { - Node node = cluster.nodeById(nodeId); - if (node != null) { - acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); - - // Add the incoming commitSync() request to the queue. - Map acknowledgementsMapForNode = new HashMap<>(); - for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip); - if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) { - if (!isLeaderKnownToHaveChanged(node.id(), tip)) { - acknowledgementsMapForNode.put(tip, nodeAcknowledgements.acknowledgements()); - - metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size()); - log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); - resultCount.incrementAndGet(); - } else { - nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcknowledgements.acknowledgements()), true, Optional.empty()); - } - } - } + Map nodeAcknowledgements = acknowledgementsMapAllNodes.get(nodeId); + if (nodeAcknowledgements == null) + return; - if (!acknowledgementsMapForNode.isEmpty()) { - acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName() + ":1", - deadlineMs, - retryBackoffMs, - retryBackoffMaxMs, - sessionHandler, - nodeId, - acknowledgementsMapForNode, - resultHandler, - AcknowledgeRequestType.COMMIT_SYNC - )); + Map acknowledgementsMapToSend = new HashMap<>(); + + acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); + + // Add the incoming commitSync() request to the queue. + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { + Acknowledgements acknowledgements = nodeAcknowledgements.remove(tip); + if (acknowledgements != null) { + acknowledgementsMapToSend.put(tip, acknowledgements); + resultCount.incrementAndGet(); + + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), nodeId); } } + + resultCount.addAndGet(nodeAcknowledgements.size()); + nodeAcknowledgements.forEach((tip, acks) -> { + acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + resultHandler.complete(tip, acks, AcknowledgeRequestType.COMMIT_SYNC, true, Optional.empty()); + }); + + if (!acknowledgementsMapToSend.isEmpty()) { + acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":1", + deadlineMs, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapToSend, + resultHandler, + AcknowledgeRequestType.COMMIT_SYNC + )); + } + }); + + acknowledgementsMapCannotSend.forEach((tip, acks) -> { + acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + resultHandler.complete(tip, acks, AcknowledgeRequestType.COMMIT_SYNC, true, Optional.empty()); }); resultHandler.completeIfEmpty(); @@ -602,48 +630,63 @@ public void commitAsync( final Cluster cluster = metadata.fetch(); final ResultHandler resultHandler = new ResultHandler(Optional.empty()); + Map> acknowledgementsMapAllNodes = new HashMap<>(); + acknowledgementsMap.forEach((tip, nodeAcks) -> { + if ((cluster.nodeById(nodeAcks.nodeId()) == null) || isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) { + log.debug("Leader for the partition is down or has changed, failing acknowledgements for partition {}", tip); + nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcks.acknowledgements()), true, Optional.empty()); + } else { + Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new HashMap<>()); + Acknowledgements prevAcks = acksMap.putIfAbsent(tip, nodeAcks.acknowledgements()); + if (prevAcks != null) { + prevAcks.merge(nodeAcks.acknowledgements()); + } + } + }); + sessionHandlers.forEach((nodeId, sessionHandler) -> { - Node node = cluster.nodeById(nodeId); - if (node != null) { - Map acknowledgementsMapForNode = new HashMap<>(); - - acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); - - for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip); - if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) { - if (!isLeaderKnownToHaveChanged(node.id(), tip)) { - Acknowledgements acknowledgements = nodeAcknowledgements.acknowledgements(); - acknowledgementsMapForNode.put(tip, acknowledgements); - - metricsManager.recordAcknowledgementSent(acknowledgements.size()); - log.debug("Added async acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); - AcknowledgeRequestState asyncRequestState = acknowledgeRequestStates.get(nodeId).getAsyncRequest(); - if (asyncRequestState == null) { - acknowledgeRequestStates.get(nodeId).setAsyncRequest(new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName() + ":2", - deadlineMs, - retryBackoffMs, - retryBackoffMaxMs, - sessionHandler, - nodeId, - acknowledgementsMapForNode, - resultHandler, - AcknowledgeRequestType.COMMIT_ASYNC - )); - } else { - Acknowledgements prevAcks = asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements); - if (prevAcks != null) { - asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements); - } - } - } else { - nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcknowledgements.acknowledgements()), true, Optional.empty()); + Map nodeAcknowledgements = acknowledgementsMapAllNodes.get(nodeId); + if (nodeAcknowledgements == null) + return; + + Map acknowledgementsMapForNode = new HashMap<>(); + + acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); + + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { + Acknowledgements acknowledgements = nodeAcknowledgements.remove(tip); + if (acknowledgements != null) { + acknowledgementsMapForNode.put(tip, acknowledgements); + + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added async acknowledge request for partition {} to node {}", tip.topicPartition(), nodeId); + AcknowledgeRequestState asyncRequestState = acknowledgeRequestStates.get(nodeId).getAsyncRequest(); + if (asyncRequestState == null) { + acknowledgeRequestStates.get(nodeId).setAsyncRequest(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":2", + deadlineMs, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + resultHandler, + AcknowledgeRequestType.COMMIT_ASYNC + )); + } else { + Acknowledgements prevAcks = asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements); + if (prevAcks != null) { + asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements); } } } } + + nodeAcknowledgements.forEach((tip, acks) -> { + acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); + }); }); resultHandler.completeIfEmpty(); @@ -659,82 +702,84 @@ public void commitAsync( * * @return The future which completes when the acknowledgements finished */ - public CompletableFuture acknowledgeOnClose(final Map acknowledgementsMap, - final long deadlineMs) { + public CompletableFuture acknowledgeOnClose( + final Map acknowledgementsMap, + final long deadlineMs) { final Cluster cluster = metadata.fetch(); final AtomicInteger resultCount = new AtomicInteger(); final ResultHandler resultHandler = new ResultHandler(resultCount, Optional.empty()); closing = true; - Map> acknowledgementsMapAllNodes = new HashMap<>(); + Map> acknowledgementsMapAllNodes = new HashMap<>(); acknowledgementsMap.forEach((tip, nodeAcks) -> { - if (!isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) { + if ((cluster.nodeById(nodeAcks.nodeId()) == null) || isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) { + nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcks.acknowledgements()), true, Optional.empty()); + } else { Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new HashMap<>()); Acknowledgements prevAcks = acksMap.putIfAbsent(tip, nodeAcks.acknowledgements()); if (prevAcks != null) { acksMap.get(tip).merge(nodeAcks.acknowledgements()); } - } else { - nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, nodeAcks.acknowledgements()), true, Optional.empty()); } }); sessionHandlers.forEach((nodeId, sessionHandler) -> { - Node node = cluster.nodeById(nodeId); - if (node != null) { - //Add any waiting piggyback acknowledgements for the node. - Map fetchAcks = fetchAcknowledgementsToSend.remove(nodeId); - if (fetchAcks != null) { - fetchAcks.forEach((tip, acks) -> { - if (!isLeaderKnownToHaveChanged(nodeId, tip)) { - Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>()); - Acknowledgements prevAcks = acksMap.putIfAbsent(tip, acks); - if (prevAcks != null) { - acksMap.get(tip).merge(acks); - } - } else { - acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); + // Add any waiting piggyback acknowledgements for the node. + Map fetchAcks = fetchAcknowledgementsToSend.remove(nodeId); + if (fetchAcks != null) { + fetchAcks.forEach((tip, acks) -> { + if (!isLeaderKnownToHaveChanged(nodeId, tip)) { + Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>()); + Acknowledgements prevAcks = acksMap.putIfAbsent(tip, acks); + if (prevAcks != null) { + acksMap.get(tip).merge(acks); } - }); - } + } else { + acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); + } + }); + } - Map acknowledgementsMapForNode = acknowledgementsMapAllNodes.get(nodeId); - if (acknowledgementsMapForNode != null) { - acknowledgementsMapForNode.forEach((tip, acknowledgements) -> { - metricsManager.recordAcknowledgementSent(acknowledgements.size()); - log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); - resultCount.incrementAndGet(); - }); - } else { - acknowledgementsMapForNode = new HashMap<>(); - } + Map acknowledgementsMapForNode = acknowledgementsMapAllNodes.get(nodeId); + if (acknowledgementsMapForNode != null) { + acknowledgementsMapForNode.forEach((tip, acknowledgements) -> { + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), nodeId); + resultCount.incrementAndGet(); + }); + } else { + acknowledgementsMapForNode = new HashMap<>(); + } - acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); + acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); - // Ensure there is no close() request already present as they are blocking calls - // and only one request can be active at a time. - if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest())) { - log.error("Attempt to call close() when there is an existing close request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequestQueue()); - closeFuture.completeExceptionally( - new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + node.id())); - } else { - // There can only be one close() happening at a time. So per node, there will be one acknowledge request state. - acknowledgeRequestStates.get(nodeId).setCloseRequest( - new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName() + ":3", - deadlineMs, - retryBackoffMs, - retryBackoffMaxMs, - sessionHandler, - nodeId, - acknowledgementsMapForNode, - resultHandler, - AcknowledgeRequestType.CLOSE - )); - } + // Ensure there is no close() request already present as they are blocking calls + // and only one request can be active at a time. + if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest())) { + log.error("Attempt to call close() when there is an existing close request for node {}-{}", nodeId, acknowledgeRequestStates.get(nodeId).getSyncRequestQueue()); + acknowledgementsMapForNode.forEach((tip, acks) -> { + acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); + }); + closeFuture.completeExceptionally( + new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + nodeId)); + } else { + // There can only be one close() happening at a time. So per node, there will be one acknowledge request state. + acknowledgeRequestStates.get(nodeId).setCloseRequest( + new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":3", + deadlineMs, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + resultHandler, + AcknowledgeRequestType.CLOSE + )); } }); @@ -744,6 +789,7 @@ public CompletableFuture acknowledgeOnClose(final Map acknowledgementsMapToClear = incompleteAcknowledgements.isEmpty() ? acknowledgementsToSend : incompleteAcknowledgements; acknowledgementsMapToClear.forEach((tip, acks) -> { if (acks != null) { - acks.complete(Errors.SHARE_SESSION_NOT_FOUND.exception()); + acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); } // We do not know whether this is a renew ack, but handling the error as if it were, will ensure // that we do not leave dangling acknowledgements diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java index a15231512fc53..1f81e3563432b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java @@ -249,7 +249,10 @@ public Map takeAcknowledgedRecords() { public int renew(Map acknowledgementsMap, Optional acquisitionLockTimeoutMs) { int recordsRenewed = 0; for (Map.Entry entry : acknowledgementsMap.entrySet()) { - recordsRenewed += batches.get(entry.getKey()).renew(entry.getValue()); + ShareInFlightBatch batch = batches.get(entry.getKey()); + if (batch != null) { + recordsRenewed += batch.renew(entry.getValue()); + } } acquisitionLockTimeoutMsRenewed = acquisitionLockTimeoutMs; return recordsRenewed; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java index 1af62e44681c9..8cda95de0ee96 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java @@ -33,7 +33,9 @@ public class ShareRequestMetadata { public static final int FINAL_EPOCH = -1; /** + * Whether this session is a new session. * + * @return Whether the session epoch is the initial epoch. */ public boolean isNewSession() { return epoch == INITIAL_EPOCH; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index f3b2063c136f3..8ab6f3aa21e01 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -40,7 +40,7 @@ import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InvalidRecordStateException; -import org.apache.kafka.common.errors.ShareSessionNotFoundException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.header.Header; @@ -369,7 +369,7 @@ public void testServerDisconnectedOnShareAcknowledge() throws InterruptedExcepti assertNull(shareConsumeRequestManager.requestStates(0)); // The callback for these unsent acknowledgements will be invoked with an error code. assertEquals(Map.of(tip0, acknowledgements2), completedAcknowledgements.get(0)); - assertInstanceOf(ShareSessionNotFoundException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + assertInstanceOf(NotLeaderOrFollowerException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); // Attempt a normal fetch to check if nodesWithPendingRequests is empty. assertEquals(1, sendFetches()); @@ -1490,7 +1490,7 @@ public void testPiggybackAcknowledgementsOnInitialShareSessionError() { assertEquals(0, builder.data().topics().find(tip0.topicId()).partitions().find(0).acknowledgementBatches().size()); assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); - assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); } @Test @@ -1526,7 +1526,7 @@ public void testPiggybackAcknowledgementsOnInitialShareSessionErrorSubscriptionC // We should fail any waiting acknowledgements for tip-0 as it would have a share session epoch equal to 0. assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); - assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); } @Test @@ -2226,20 +2226,18 @@ void testLeadershipChangeAfterFetchBeforeCommitSync() { // We fail the acknowledgements for records which were received from node0 with NOT_LEADER_OR_FOLLOWER exception. shareConsumeRequestManager.commitSync(commitAcks, calculateDeadlineMs(time.timer(100))); - // Verify if the callback was invoked with the failed acknowledgements. - assertEquals(1, completedAcknowledgements.get(0).size()); - assertEquals(acknowledgementsTp0, completedAcknowledgements.get(0).get(tip0)); - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); - // We only send acknowledgements for tip1 to node1. assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertEquals(1, completedAcknowledgements.get(1).size()); - assertEquals(acknowledgementsTp1, completedAcknowledgements.get(1).get(tip1)); - assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException()); + // Verify if the callback was invoked with the failed acknowledgements. The callback is called with the commitSync processing. + assertEquals(2, completedAcknowledgements.get(0).size()); + assertEquals(acknowledgementsTp0, completedAcknowledgements.get(0).get(tip0)); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + assertEquals(acknowledgementsTp1, completedAcknowledgements.get(0).get(tip1)); + assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); } @Test From 1f704d76d879787c4cd79bfd517e755ea3e00ac0 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Fri, 20 Mar 2026 10:49:54 +0000 Subject: [PATCH 2/3] Remove debug statements --- .../org/apache/kafka/clients/consumer/ShareConsumerTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 8d79c9fec341a..34d072072ba4c 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -1312,8 +1312,6 @@ public void testLeaderRestartWithoutLeadershipChangeExplicitAcknowledgementSync( AtomicBoolean callbackCalled = new AtomicBoolean(false); shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) -> { - System.out.println("EXCEPTION " + exception); - exception.printStackTrace(); assertInstanceOf(NotLeaderOrFollowerException.class, exception); callbackCalled.set(true); }); From 2c8cfb361ca0ee897d223d81d3361f6aecfbe4e8 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Fri, 20 Mar 2026 21:47:09 +0000 Subject: [PATCH 3/3] Adjust ack processing on close and tests --- .../internals/ShareConsumeRequestManager.java | 51 +++--- .../ShareConsumeRequestManagerTest.java | 160 +++++++++++++++++- 2 files changed, 184 insertions(+), 27 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index eb57df99e9626..104199a9d3b1d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -720,38 +720,38 @@ public CompletableFuture acknowledgeOnClose( Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new HashMap<>()); Acknowledgements prevAcks = acksMap.putIfAbsent(tip, nodeAcks.acknowledgements()); if (prevAcks != null) { - acksMap.get(tip).merge(nodeAcks.acknowledgements()); + prevAcks.merge(nodeAcks.acknowledgements()); } } }); - sessionHandlers.forEach((nodeId, sessionHandler) -> { - // Add any waiting piggyback acknowledgements for the node. - Map fetchAcks = fetchAcknowledgementsToSend.remove(nodeId); - if (fetchAcks != null) { - fetchAcks.forEach((tip, acks) -> { - if (!isLeaderKnownToHaveChanged(nodeId, tip)) { - Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>()); - Acknowledgements prevAcks = acksMap.putIfAbsent(tip, acks); - if (prevAcks != null) { - acksMap.get(tip).merge(acks); - } - } else { - acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); + // Add any waiting piggyback acknowledgements. + fetchAcknowledgementsToSend.forEach((nodeId, nodeAcks) -> + nodeAcks.forEach((tip, acks) -> { + if ((cluster.nodeById(nodeId) == null) || isLeaderKnownToHaveChanged(nodeId, tip)) { + acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); + } else { + Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>()); + Acknowledgements prevAcks = acksMap.putIfAbsent(tip, acks); + if (prevAcks != null) { + prevAcks.merge(acks); } - }); - } + } + }) + ); + + sessionHandlers.forEach((nodeId, sessionHandler) -> { + Map nodeAcknowledgements = acknowledgementsMapAllNodes.get(nodeId); + if (nodeAcknowledgements != null) { + nodeAcknowledgements.forEach((tip, acknowledgements) -> { + resultCount.incrementAndGet(); - Map acknowledgementsMapForNode = acknowledgementsMapAllNodes.get(nodeId); - if (acknowledgementsMapForNode != null) { - acknowledgementsMapForNode.forEach((tip, acknowledgements) -> { metricsManager.recordAcknowledgementSent(acknowledgements.size()); log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), nodeId); - resultCount.incrementAndGet(); }); } else { - acknowledgementsMapForNode = new HashMap<>(); + nodeAcknowledgements = new HashMap<>(); } acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); @@ -760,12 +760,11 @@ public CompletableFuture acknowledgeOnClose( // and only one request can be active at a time. if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest())) { log.error("Attempt to call close() when there is an existing close request for node {}-{}", nodeId, acknowledgeRequestStates.get(nodeId).getSyncRequestQueue()); - acknowledgementsMapForNode.forEach((tip, acks) -> { + nodeAcknowledgements.forEach((tip, acks) -> { acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty()); }); - closeFuture.completeExceptionally( - new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + nodeId)); + closeFuture.completeExceptionally(new IllegalStateException("Attempt to call close() when there is an existing close request for node " + nodeId)); } else { // There can only be one close() happening at a time. So per node, there will be one acknowledge request state. acknowledgeRequestStates.get(nodeId).setCloseRequest( @@ -776,7 +775,7 @@ public CompletableFuture acknowledgeOnClose( retryBackoffMaxMs, sessionHandler, nodeId, - acknowledgementsMapForNode, + nodeAcknowledgements, resultHandler, AcknowledgeRequestType.CLOSE )); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 8ab6f3aa21e01..bdad2512e18c5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -2241,7 +2241,7 @@ void testLeadershipChangeAfterFetchBeforeCommitSync() { } @Test - void testLeadershipChangeAfterFetchBeforeClose() { + void testLeadershipChangeAfterFetchBeforeCloseMove() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); @@ -2318,6 +2318,164 @@ void testLeadershipChangeAfterFetchBeforeClose() { assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); } + @Test + void testLeadershipChangeAfterFetchMoveBeforeClose() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + subscriptions.subscribeToShareGroup(Set.of(topicName)); + Set partitions = new HashSet<>(); + partitions.add(tp0); + partitions.add(tp1); + subscriptions.assignFromSubscribed(partitions); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2), + tp -> validLeaderEpoch, topicIds, false)); + Node nodeId0 = metadata.fetch().nodeById(0); + Node nodeId1 = metadata.fetch().nodeById(1); + + Cluster startingClusterMetadata = metadata.fetch(); + assertFalse(metadata.updateRequested()); + + assertEquals(2, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + LinkedHashMap partitionData = + buildPartitionDataMap(tip0, records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); + partitionData = buildPartitionDataMap(tip1, records, ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Map>> partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + + List> fetchedRecords = partitionRecords.get(tp0); + assertEquals(1, fetchedRecords.size()); + + fetchedRecords = partitionRecords.get(tp1); + assertEquals(2, fetchedRecords.size()); + + Acknowledgements acknowledgementsTp0 = Acknowledgements.empty(); + acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT); + + Acknowledgements acknowledgementsTp1 = getAcknowledgements(1, + AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT); + + shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgementsTp1))); + + // Move the leadership of tp1 onto node 0 + metadata.updatePartitionLeadership(Map.of(tp1, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()), Optional.of(validLeaderEpoch + 1))), List.of()); + + assertNotEquals(startingClusterMetadata, metadata.fetch()); + + // We fail the acknowledgements for records which were received from node0 with NOT_LEADER_OR_FOLLOWER exception. + shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)), + calculateDeadlineMs(time.timer(100))); + + // Verify if the callback was invoked with the failed acknowledgements. + assertEquals(1, completedAcknowledgements.get(0).size()); + assertEquals(acknowledgementsTp1.getAcknowledgementsTypeMap(), completedAcknowledgements.get(0).get(tip1).getAcknowledgementsTypeMap()); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); + completedAcknowledgements.clear(); + + // As we are closing, we still send the request to both the nodes, but with empty acknowledgements to node1, as it is no longer the leader. + assertEquals(2, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponseFrom(fullAcknowledgeResponse(tip0, Errors.NONE), nodeId0); + networkClientDelegate.poll(time.timer(0)); + + client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId1); + networkClientDelegate.poll(time.timer(0)); + + assertEquals(1, completedAcknowledgements.get(0).size()); + assertEquals(acknowledgementsTp0, completedAcknowledgements.get(0).get(tip0)); + assertNull(completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + } + + @Test + void testLeadershipChangeAfterFetchMoveBeforeCloseMove() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + subscriptions.subscribeToShareGroup(Set.of(topicName)); + Set partitions = new HashSet<>(); + partitions.add(tp0); + partitions.add(tp1); + subscriptions.assignFromSubscribed(partitions); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2), + tp -> validLeaderEpoch, topicIds, false)); + Node nodeId0 = metadata.fetch().nodeById(0); + Node nodeId1 = metadata.fetch().nodeById(1); + + Cluster startingClusterMetadata = metadata.fetch(); + assertFalse(metadata.updateRequested()); + + assertEquals(2, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + LinkedHashMap partitionData = + buildPartitionDataMap(tip0, records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); + partitionData = buildPartitionDataMap(tip1, records, ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Map>> partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + + List> fetchedRecords = partitionRecords.get(tp0); + assertEquals(1, fetchedRecords.size()); + + fetchedRecords = partitionRecords.get(tp1); + assertEquals(2, fetchedRecords.size()); + + Acknowledgements acknowledgementsTp0 = Acknowledgements.empty(); + acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT); + + Acknowledgements acknowledgementsTp1 = getAcknowledgements(1, + AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT); + + shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgementsTp1))); + + // Move the leadership of tp1 onto node 0, and tp0 onto node 1 + metadata.updatePartitionLeadership(Map.of(tp1, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()), Optional.of(validLeaderEpoch + 1))), List.of()); + metadata.updatePartitionLeadership(Map.of(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))), List.of()); + + assertNotEquals(startingClusterMetadata, metadata.fetch()); + + // We fail the acknowledgements for records which were received from node0 and node 1 with NOT_LEADER_OR_FOLLOWER exception. + shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)), + calculateDeadlineMs(time.timer(100))); + + // Verify if the callback was invoked with the failed acknowledgements. + assertEquals(1, completedAcknowledgements.get(0).size()); + assertEquals(acknowledgementsTp0.getAcknowledgementsTypeMap(), completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap()); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + assertEquals(1, completedAcknowledgements.get(1).size()); + assertEquals(acknowledgementsTp1.getAcknowledgementsTypeMap(), completedAcknowledgements.get(1).get(tip1).getAcknowledgementsTypeMap()); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(1).get(tip1).getAcknowledgeException()); + completedAcknowledgements.clear(); + + // As we are closing, we still send the request to both the nodes, but with empty acknowledgements to node 0 and node1, as they are no longer the leader. + assertEquals(2, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId0); + networkClientDelegate.poll(time.timer(0)); + + client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId1); + networkClientDelegate.poll(time.timer(0)); + + assertTrue(completedAcknowledgements.isEmpty()); + } + @Test void testWhenLeadershipChangedAfterDisconnected() { buildRequestManager();