diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 934d5b312..0b9be60c3 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -450,7 +450,9 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return err } - brokerStatus[broker.Id] = brokerConfig + if r.brokerNeedsVersionUpdate(broker.Id, brokerConfig) { + brokerStatus[broker.Id] = brokerConfig + } // If dynamic configs can not be set then let the loop continue to the next broker, // after the loop we return error. This solves that case when other brokers could get healthy, @@ -921,6 +923,14 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod, return nil } +// brokerNeedsVersionUpdate returns true when the broker's image/version status is absent, +// incomplete, or stale relative to the desired image — i.e. a JMX fetch is warranted. +func (r *Reconciler) brokerNeedsVersionUpdate(brokerID int32, brokerConfig *banzaiv1beta1.BrokerConfig) bool { + desiredImage := util.GetBrokerImage(brokerConfig, r.KafkaCluster.Spec.GetClusterImage()) + state, ok := r.KafkaCluster.Status.BrokersState[strconv.Itoa(int(brokerID))] + return !ok || state.Version == "" || state.Image != desiredImage +} + type brokerVersionResult struct { brokerID int32 kafkaVersion *banzaiv1beta1.KafkaVersion diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 4ca517e6f..42c17c9ab 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1986,3 +1986,107 @@ func TestGetBrokerAzMap(t *testing.T) { }) } } + +func TestBrokerNeedsVersionUpdate(t *testing.T) { + t.Parallel() + const clusterImage = "apache/kafka:3.4.0" + const updatedImage = "apache/kafka:3.5.0" + + testCases := []struct { + testName string + brokerID int32 + brokerConfig v1beta1.BrokerConfig + clusterImage string + brokersState map[string]v1beta1.BrokerState + expected bool + }{ + { + testName: "no existing status entry triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{}, + expected: true, + }, + { + testName: "status entry with empty version triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: ""}, + }, + expected: true, + }, + { + testName: "status entry with different image triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: updatedImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: true, + }, + { + testName: "status up to date with cluster image skips update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: false, + }, + { + testName: "broker-level image override used instead of cluster image", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.4.1"}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"}, + }, + expected: false, + }, + { + testName: "broker-level image override differs from recorded image triggers update", + brokerID: 0, + brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.5.0"}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"}, + }, + expected: true, + }, + { + testName: "correct state for one broker does not suppress update for another", + brokerID: 1, + brokerConfig: v1beta1.BrokerConfig{}, + clusterImage: clusterImage, + brokersState: map[string]v1beta1.BrokerState{ + "0": {Image: clusterImage, Version: "3.4.0"}, + }, + expected: true, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.testName, func(t *testing.T) { + t.Parallel() + r := Reconciler{ + Reconciler: resources.Reconciler{ + KafkaCluster: &v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + ClusterImage: test.clusterImage, + }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: test.brokersState, + }, + }, + }, + } + assert.Equal(t, test.expected, r.brokerNeedsVersionUpdate(test.brokerID, &test.brokerConfig)) + }) + } +}