Skip to content

KAFKA-19651: Restore thread interrupted status in consumer close()#21864

Open
daguimu wants to merge 1 commit intoapache:trunkfrom
daguimu:fix/async-consumer-interrupt-on-close-KAFKA-19651
Open

KAFKA-19651: Restore thread interrupted status in consumer close()#21864
daguimu wants to merge 1 commit intoapache:trunkfrom
daguimu:fix/async-consumer-interrupt-on-close-KAFKA-19651

Conversation

@daguimu
Copy link
Copy Markdown

@daguimu daguimu commented Mar 25, 2026

Problem

When AsyncKafkaConsumer.close() encounters an InterruptException during cleanup, the thread's interrupted status may be cleared by subsequent cleanup operations (metrics.close(), interceptors.close(), etc.) before the exception is re-thrown.

This causes Thread.interrupted() to return false after catching the InterruptException, which is inconsistent with the contract of InterruptException (whose constructor sets the interrupted flag via Thread.currentThread().interrupt()).

Root Cause

In the close() method, when an InterruptException is caught by swallow() and stored in firstException, the thread's interrupted flag is set by the InterruptException constructor. However, subsequent cleanup calls (closeQuietly for metrics, interceptors, deserializers, etc.) may internally clear this flag before the exception is re-thrown at the end of the method.

Fix

Explicitly call Thread.currentThread().interrupt() before re-throwing the InterruptException. This ensures the interrupted status is always restored regardless of what cleanup code executed between the initial interrupt and the re-throw.

The same vulnerable pattern exists in four close() implementations, all fixed in this PR:

  • AsyncKafkaConsumer.close() - primary fix per KAFKA-19651
  • ClassicKafkaConsumer.close() - same pattern
  • ShareConsumerImpl.close() - same pattern
  • KafkaProducer.close() - same pattern

Tests Added

  • testCloseRestoresThreadInterruptedStatus in AsyncKafkaConsumerTest: Verifies that Thread.interrupted() returns true after close() throws InterruptException.

Impact

  • Minimal change: one line added per close() implementation
  • No behavioral change for non-interrupt code paths
  • Restores consistency with InterruptException contract

Fixes KAFKA-19651

…uptException in close()

When consumer/producer close() encounters an InterruptException during
cleanup, the thread's interrupted status may be cleared by subsequent
cleanup operations (metrics.close(), interceptors.close(), etc.) before
the exception is re-thrown. This causes Thread.interrupted() to return
false after catching the InterruptException, which is inconsistent with
the contract of InterruptException.

Fix by explicitly calling Thread.currentThread().interrupt() before
re-throwing the InterruptException in the close() methods of
AsyncKafkaConsumer, ClassicKafkaConsumer, ShareConsumerImpl, and
KafkaProducer — all of which share the same vulnerable pattern.
@daguimu daguimu force-pushed the fix/async-consumer-interrupt-on-close-KAFKA-19651 branch from 792b049 to 927ae0b Compare March 25, 2026 04:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant