Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,7 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe
Throwable exception = firstException.get();
if (exception != null && !swallowException) {
if (exception instanceof InterruptException) {
Thread.currentThread().interrupt();
throw (InterruptException) exception;
}
throw new KafkaException("Failed to close kafka consumer", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,7 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe
Throwable exception = firstException.get();
if (exception != null && !swallowException) {
if (exception instanceof InterruptException) {
Thread.currentThread().interrupt();
throw (InterruptException) exception;
}
throw new KafkaException("Failed to close kafka consumer", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ private void close(final Duration timeout, final boolean swallowException) {
Throwable exception = firstException.get();
if (exception != null && !swallowException) {
if (exception instanceof InterruptException) {
Thread.currentThread().interrupt();
throw (InterruptException) exception;
}
throw new KafkaException("Failed to close Kafka share consumer", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,7 @@ private void close(Duration timeout, boolean swallowException) {
Throwable exception = firstException.get();
if (exception != null && !swallowException) {
if (exception instanceof InterruptException) {
Thread.currentThread().interrupt();
throw (InterruptException) exception;
}
throw new KafkaException("Failed to close kafka producer", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,29 @@ public void testCloseLeavesGroupDespiteInterrupt(long timeoutMs) {
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
}

@Test
public void testCloseRestoresThreadInterruptedStatus() {
// KAFKA-19651: Verify that Thread.interrupted() returns true after close() throws InterruptException
Set<TopicPartition> partitions = singleton(new TopicPartition("topic1", 0));
SubscriptionState subscriptions = mock(SubscriptionState.class);
when(subscriptions.assignedPartitions()).thenReturn(partitions);
when(applicationEventHandler.addAndGet(any(CompletableApplicationEvent.class))).thenThrow(InterruptException.class);
consumer = spy(newConsumer(
mock(FetchBuffer.class),
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions));

try {
assertThrows(InterruptException.class, () -> consumer.close(CloseOptions.timeout(Duration.ZERO)));
assertTrue(Thread.interrupted(),
"Thread interrupted status should be restored when close() throws InterruptException");
} finally {
// Clear interrupted status in case assertion failed
Thread.interrupted();
}
}

@Test
public void testCommitSyncAllConsumed() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
Expand Down