KAFKA-19651: Restore thread interrupted status in consumer close()#21864
Open
daguimu wants to merge 1 commit intoapache:trunkfrom
Open
KAFKA-19651: Restore thread interrupted status in consumer close()#21864daguimu wants to merge 1 commit intoapache:trunkfrom
daguimu wants to merge 1 commit intoapache:trunkfrom
Conversation
…uptException in close() When consumer/producer close() encounters an InterruptException during cleanup, the thread's interrupted status may be cleared by subsequent cleanup operations (metrics.close(), interceptors.close(), etc.) before the exception is re-thrown. This causes Thread.interrupted() to return false after catching the InterruptException, which is inconsistent with the contract of InterruptException. Fix by explicitly calling Thread.currentThread().interrupt() before re-throwing the InterruptException in the close() methods of AsyncKafkaConsumer, ClassicKafkaConsumer, ShareConsumerImpl, and KafkaProducer — all of which share the same vulnerable pattern.
792b049 to
927ae0b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When
AsyncKafkaConsumer.close()encounters anInterruptExceptionduring cleanup, the thread's interrupted status may be cleared by subsequent cleanup operations (metrics.close(),interceptors.close(), etc.) before the exception is re-thrown.This causes
Thread.interrupted()to returnfalseafter catching theInterruptException, which is inconsistent with the contract ofInterruptException(whose constructor sets the interrupted flag viaThread.currentThread().interrupt()).Root Cause
In the
close()method, when anInterruptExceptionis caught byswallow()and stored infirstException, the thread's interrupted flag is set by theInterruptExceptionconstructor. However, subsequent cleanup calls (closeQuietlyfor metrics, interceptors, deserializers, etc.) may internally clear this flag before the exception is re-thrown at the end of the method.Fix
Explicitly call
Thread.currentThread().interrupt()before re-throwing theInterruptException. This ensures the interrupted status is always restored regardless of what cleanup code executed between the initial interrupt and the re-throw.The same vulnerable pattern exists in four close() implementations, all fixed in this PR:
Tests Added
Impact
Fixes KAFKA-19651