From 6493987614b8ac9618c952df972e662b23f453ce Mon Sep 17 00:00:00 2001 From: Adrian Coman <1664229+azun@users.noreply.github.com> Date: Thu, 16 Apr 2026 16:33:35 +0300 Subject: [PATCH 1/6] feat/Broker version extraction improvements --- pkg/resources/kafka/kafka.go | 51 +++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 9aa91808f..fbbc0eab0 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -409,6 +409,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { reorderedBrokers := reorderBrokers(runningBrokers, boundPersistentVolumeClaims, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID, log) allBrokerDynamicConfigSucceeded := true + brokerStatus := make(map[int32]*banzaiv1beta1.BrokerConfig) for _, broker := range reorderedBrokers { brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec) if err != nil { @@ -449,9 +450,8 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return err } - if err = r.updateStatusWithDockerImageAndVersion(broker.Id, brokerConfig, log); err != nil { - return err - } + brokerStatus[broker.Id] = brokerConfig + // If dynamic configs can not be set then let the loop continue to the next broker, // after the loop we return error. This solves that case when other brokers could get healthy, // but the loop exits too soon because dynamic configs can not be set. @@ -474,6 +474,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } + if err := r.updateStatusWithDockerImageAndVersion(brokerStatus, log); err != nil { + return err + } + // in case HeadlessServiceEnabled is changed, delete the service that was created by the previous // reconcile flow. The services must be deleted at the end of the reconcile flow after the new services // were created and broker configurations reflecting the new services otherwise the Kafka brokers @@ -917,21 +921,38 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod, return nil } -func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, brokerConfig *banzaiv1beta1.BrokerConfig, - log logr.Logger) error { - jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), - r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) +type brokerVersionResult struct { + brokerID int32 + kafkaVersion *banzaiv1beta1.KafkaVersion + err error +} - kafkaVersion, err := jmxExp.ExtractDockerImageAndVersion(brokerId, brokerConfig, - r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) - if err != nil { - return err +func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokers map[int32]*banzaiv1beta1.BrokerConfig, log logr.Logger) error { + ch := make(chan brokerVersionResult, len(brokers)) + for brokerID, brokerConfig := range brokers { + go func(id int32, cfg *banzaiv1beta1.BrokerConfig) { + jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), + r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) + kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, + r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) + if err != nil { + ch <- brokerVersionResult{brokerID: id, err: err} + return + } + ch <- brokerVersionResult{brokerID: id, kafkaVersion: kv} + }(brokerID, brokerConfig) } - err = k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(brokerId))}, r.KafkaCluster, - *kafkaVersion, log) - if err != nil { - return err + + for range brokers { + result := <-ch + if result.err != nil { + return result.err + } + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(result.brokerID))}, r.KafkaCluster, *result.kafkaVersion, log); err != nil { + return err + } } + return nil } From 8f25bec40179b16ed529067612d3794bf529aba0 Mon Sep 17 00:00:00 2001 From: Adrian Coman <1664229+azun@users.noreply.github.com> Date: Mon, 20 Apr 2026 15:50:22 +0300 Subject: [PATCH 2/6] Added PodMonitor for Prometheus metrics --- .../operator-deployment-with-webhook.yaml | 6 ++-- .../kafka-operator/templates/podmonitor.yaml | 32 +++++++++++++++++++ charts/kafka-operator/values.yaml | 4 +++ 3 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 charts/kafka-operator/templates/podmonitor.yaml diff --git a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml index 0de92a3ed..e99612438 100644 --- a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml +++ b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml @@ -117,6 +117,8 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator + app: kafka-operator + component: operator replicas: {{ .Values.replicaCount }} template: metadata: @@ -133,8 +135,8 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator - app: prometheus - component: alertmanager + app: kafka-operator + component: operator spec: {{- with .Values.imagePullSecrets }} imagePullSecrets: diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml new file mode 100644 index 000000000..164ba5577 --- /dev/null +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -0,0 +1,32 @@ +{{- if .Values.prometheusMetrics.podMonitor.enabled }} +kind: PodMonitor +apiVersion: monitoring.coreos.com/v1 +metadata: + name: {{ include "kafka-operator.fullname" . }} + namespace: {{ .Release.Namespace | quote }} + labels: + helm.sh/chart: {{ include "kafka-operator.chart" . }} + app.kubernetes.io/name: {{ include "kafka-operator.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/version: {{ .Chart.AppVersion }} + app.kubernetes.io/component: operator + app: kafka-operator + component: operator + {{- with .Values.operator.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + namespaceSelector: + matchNames: + - {{ .Release.Namespace }} + selector: + matchLabels: + app: kafka-operator + component: operator + endpoints: + - interval: {{ .Values.prometheusMetrics.interval }} + port: metrics + path: /metrics +{{- end }} diff --git a/charts/kafka-operator/values.yaml b/charts/kafka-operator/values.yaml index 1073280f8..a7966a9e8 100644 --- a/charts/kafka-operator/values.yaml +++ b/charts/kafka-operator/values.yaml @@ -103,6 +103,10 @@ prometheusMetrics: create: true # -- ServiceAccount used by prometheus auth proxy name: kafka-operator-authproxy + podMonitor: + # -- If true, create a PodMonitor for Prometheus metrics + enabled: false + interval: 30s # -- Health probes configuration healthProbes: {} From 8b3ad25278d361086ce6ea64c5b2e93854b01fdc Mon Sep 17 00:00:00 2001 From: Adrian <1664229+azun@users.noreply.github.com> Date: Wed, 6 May 2026 16:20:49 +0300 Subject: [PATCH 3/6] Update charts/kafka-operator/templates/podmonitor.yaml Co-authored-by: Razvan Dobre --- charts/kafka-operator/templates/podmonitor.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml index 164ba5577..1faef3af2 100644 --- a/charts/kafka-operator/templates/podmonitor.yaml +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -26,7 +26,7 @@ spec: app: kafka-operator component: operator endpoints: - - interval: {{ .Values.prometheusMetrics.interval }} + - interval: {{ .Values.prometheusMetrics.podMonitor.interval }} port: metrics path: /metrics {{- end }} From 32c6bd2c14337fac0d9852a70456284de0bc2acd Mon Sep 17 00:00:00 2001 From: Adrian <1664229+azun@users.noreply.github.com> Date: Wed, 6 May 2026 16:24:21 +0300 Subject: [PATCH 4/6] Update charts/kafka-operator/templates/podmonitor.yaml Co-authored-by: Razvan Dobre --- charts/kafka-operator/templates/podmonitor.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml index 1faef3af2..481265240 100644 --- a/charts/kafka-operator/templates/podmonitor.yaml +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -23,8 +23,9 @@ spec: - {{ .Release.Namespace }} selector: matchLabels: - app: kafka-operator - component: operator + app.kubernetes.io/name: {{ include "kafka-operator.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/component: operator endpoints: - interval: {{ .Values.prometheusMetrics.podMonitor.interval }} port: metrics From 3317ccc37c92839eb64b65b69fa5b9e05db77a4e Mon Sep 17 00:00:00 2001 From: Adrian Coman Date: Wed, 6 May 2026 16:37:32 +0300 Subject: [PATCH 5/6] label cleanup --- .../templates/operator-deployment-with-webhook.yaml | 4 ---- charts/kafka-operator/templates/podmonitor.yaml | 2 -- 2 files changed, 6 deletions(-) diff --git a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml index e99612438..c4801b225 100644 --- a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml +++ b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml @@ -117,8 +117,6 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator - app: kafka-operator - component: operator replicas: {{ .Values.replicaCount }} template: metadata: @@ -135,8 +133,6 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator - app: kafka-operator - component: operator spec: {{- with .Values.imagePullSecrets }} imagePullSecrets: diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml index 481265240..06b8b87a7 100644 --- a/charts/kafka-operator/templates/podmonitor.yaml +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -11,8 +11,6 @@ metadata: app.kubernetes.io/managed-by: {{ .Release.Service }} app.kubernetes.io/version: {{ .Chart.AppVersion }} app.kubernetes.io/component: operator - app: kafka-operator - component: operator {{- with .Values.operator.annotations }} annotations: {{- toYaml . | nindent 4 }} From 7cac66ea4fd102f093aceb6766440e6a18c5c94f Mon Sep 17 00:00:00 2001 From: Adrian Coman Date: Wed, 6 May 2026 16:37:56 +0300 Subject: [PATCH 6/6] Added 30s timeout to JMX extractor --- pkg/jmxextractor/extractor.go | 4 +++- pkg/resources/kafka/kafka.go | 6 ++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/jmxextractor/extractor.go b/pkg/jmxextractor/extractor.go index 583e06dd7..d06af1491 100644 --- a/pkg/jmxextractor/extractor.go +++ b/pkg/jmxextractor/extractor.go @@ -20,6 +20,7 @@ import ( "io" "net/http" "regexp" + "time" "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" @@ -90,7 +91,8 @@ func (exp *jmxExtractor) ExtractDockerImageAndVersion(brokerId int32, brokerConf requestURL = fmt.Sprintf(serviceJMXTemplate, exp.clusterName, brokerId, exp.clusterNamespace, exp.kubernetesClusterDomain, 9020) } - rsp, err := http.Get(requestURL) + client := &http.Client{Timeout: 30 * time.Second} + rsp, err := client.Get(requestURL) if err != nil { exp.log.Error(err, fmt.Sprintf("error during talking to broker-%d", brokerId)) return nil, errorfactory.New(errorfactory.BrokersNotReady{}, err, "unable to talk to ...") diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index fbbc0eab0..934d5b312 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -931,10 +931,8 @@ func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokers map[int32]*ba ch := make(chan brokerVersionResult, len(brokers)) for brokerID, brokerConfig := range brokers { go func(id int32, cfg *banzaiv1beta1.BrokerConfig) { - jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), - r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) - kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, - r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) + jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) + kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) if err != nil { ch <- brokerVersionResult{brokerID: id, err: err} return