diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 37829111fcb74..e7b6224ef95fc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1637,6 +1637,7 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe Throwable exception = firstException.get(); if (exception != null && !swallowException) { if (exception instanceof InterruptException) { + Thread.currentThread().interrupt(); throw (InterruptException) exception; } throw new KafkaException("Failed to close kafka consumer", exception); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index b8e181743a250..cdf385050ee22 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -1171,6 +1171,7 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe Throwable exception = firstException.get(); if (exception != null && !swallowException) { if (exception instanceof InterruptException) { + Thread.currentThread().interrupt(); throw (InterruptException) exception; } throw new KafkaException("Failed to close kafka consumer", exception); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 7504eecf5bd45..09bdfb187fcc5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -1055,6 +1055,7 @@ private void close(final Duration timeout, final boolean swallowException) { Throwable exception = firstException.get(); if (exception != null && !swallowException) { if (exception instanceof InterruptException) { + Thread.currentThread().interrupt(); throw (InterruptException) exception; } throw new KafkaException("Failed to close Kafka share consumer", exception); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 2702ad52921e8..7460f0d44c3fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1573,6 +1573,7 @@ private void close(Duration timeout, boolean swallowException) { Throwable exception = firstException.get(); if (exception != null && !swallowException) { if (exception instanceof InterruptException) { + Thread.currentThread().interrupt(); throw (InterruptException) exception; } throw new KafkaException("Failed to close kafka producer", exception); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index b3086caba567d..6f10dd27c5893 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -755,6 +755,29 @@ public void testCloseLeavesGroupDespiteInterrupt(long timeoutMs) { verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class)); } + @Test + public void testCloseRestoresThreadInterruptedStatus() { + // KAFKA-19651: Verify that Thread.interrupted() returns true after close() throws InterruptException + Set partitions = singleton(new TopicPartition("topic1", 0)); + SubscriptionState subscriptions = mock(SubscriptionState.class); + when(subscriptions.assignedPartitions()).thenReturn(partitions); + when(applicationEventHandler.addAndGet(any(CompletableApplicationEvent.class))).thenThrow(InterruptException.class); + consumer = spy(newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions)); + + try { + assertThrows(InterruptException.class, () -> consumer.close(CloseOptions.timeout(Duration.ZERO))); + assertTrue(Thread.interrupted(), + "Thread interrupted status should be restored when close() throws InterruptException"); + } finally { + // Clear interrupted status in case assertion failed + Thread.interrupted(); + } + } + @Test public void testCommitSyncAllConsumed() { SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);