From 901f1157b9b5afca6b7b79cd017ff485115fbb43 Mon Sep 17 00:00:00 2001 From: Ha Van Date: Tue, 12 May 2026 10:20:57 -0500 Subject: [PATCH] Skip JMX version fetch when broker status is already current Adds brokerNeedsVersionUpdate to guard the BrokersState write so that JMX fetches are skipped for brokers whose recorded image and version already match the desired image, reducing unnecessary reconcile work. Co-Authored-By: Claude Sonnet 4.6 --- pkg/resources/kafka/kafka.go | 12 +++- pkg/resources/kafka/kafka_test.go | 104 ++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 934d5b312..0b9be60c3 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -450,7 +450,9 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return err } - brokerStatus[broker.Id] = brokerConfig + if r.brokerNeedsVersionUpdate(broker.Id, brokerConfig) { + brokerStatus[broker.Id] = brokerConfig + } // If dynamic configs can not be set then let the loop continue to the next broker, // after the loop we return error. This solves that case when other brokers could get healthy, @@ -921,6 +923,14 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod, return nil } +// brokerNeedsVersionUpdate returns true when the broker's image/version status is absent, +// incomplete, or stale relative to the desired image — i.e. a JMX fetch is warranted. +func (r *Reconciler) brokerNeedsVersionUpdate(brokerID int32, brokerConfig *banzaiv1beta1.BrokerConfig) bool { + desiredImage := util.GetBrokerImage(brokerConfig, r.KafkaCluster.Spec.GetClusterImage()) + state, ok := r.KafkaCluster.Status.BrokersState[strconv.Itoa(int(brokerID))] + return !ok || state.Version == "" || state.Image != desiredImage +} + type brokerVersionResult struct { brokerID int32 kafkaVersion *banzaiv1beta1.KafkaVersion diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 4ca517e6f..42c17c9ab 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1986,3 +1986,107 @@ func TestGetBrokerAzMap(t *testing.T) { }) } } + +func TestBrokerNeedsVersionUpdate(t *testing.T) { + t.Parallel() + const clusterImage = "apache/kafka:3.4.0" + const updatedImage = "apache/kafka:3.5.0" + + testCases := []struct { + testName string + brokerID int32 + brokerConfig v1beta1.BrokerConfig + clusterImage string + brokersState map[string]v1beta1.BrokerState + expected bool + }{ + { + testName: "no existing status entry triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{}, + expected: true, + }, + { + testName: "status entry with empty version triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: ""}, + }, + expected: true, + }, + { + testName: "status entry with different image triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: updatedImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: true, + }, + { + testName: "status up to date with cluster image skips update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: false, + }, + { + testName: "broker-level image override used instead of cluster image", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.4.1"}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"}, + }, + expected: false, + }, + { + testName: "broker-level image override differs from recorded image triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.5.0"}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"}, + }, + expected: true, + }, + { + testName: "correct state for one broker does not suppress update for another", + brokerID: 1, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: true, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.testName, func(t *testing.T) { + t.Parallel() + r := Reconciler{ + Reconciler: resources.Reconciler{ + KafkaCluster: &v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + ClusterImage: test.clusterImage, + }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: test.brokersState, + }, + }, + }, + } + assert.Equal(t, test.expected, r.brokerNeedsVersionUpdate(test.brokerID, &test.brokerConfig)) + }) + } +}