From a39ee1586ce03485895472c07623b2956c2d055f Mon Sep 17 00:00:00 2001 From: Razvan Dobre Date: Wed, 13 May 2026 13:31:59 +0300 Subject: [PATCH] feat: batch remove_broker operations into single CruiseControl task When removing multiple brokers from a KafkaCluster spec, the operator previously created one CruiseControlOperation per broker. CC can only run one operation at a time, causing data to be shuffled onto brokers that will themselves be decommissioned, far more partition movements than necessary. Collect all broker IDs in GracefulDownscaleRequired state and submit them as a single remove_broker CC operation, mirroring the existing addBrokers pattern. Because the KafkaCluster reconciler sets all removed brokers to Required via a single atomic status patch, all brokers from a single manifest apply are guaranteed to land in the same batch. Includes unit test (createCCOperation multi-broker params), integration test (exactly one CCOperation created for two simultaneous removals), e2e test scaffold, and 5-broker sample manifest. Co-Authored-By: Claude Sonnet 4.6 --- .../samples/simplekafkacluster_5broker.yaml | 278 ++++++++++++++++++ controllers/cruisecontroltask_controller.go | 25 +- .../cruisecontroltask_controller_test.go | 11 + .../cruisecontroltask_controller_test.go | 54 ++++ tests/e2e/koperator_suite_test.go | 3 + tests/e2e/test_broker_removal.go | 122 ++++++++ 6 files changed, 484 insertions(+), 9 deletions(-) create mode 100644 config/samples/simplekafkacluster_5broker.yaml create mode 100644 tests/e2e/test_broker_removal.go diff --git a/config/samples/simplekafkacluster_5broker.yaml b/config/samples/simplekafkacluster_5broker.yaml new file mode 100644 index 000000000..06d3dbf0f --- /dev/null +++ b/config/samples/simplekafkacluster_5broker.yaml @@ -0,0 +1,278 @@ +apiVersion: kafka.banzaicloud.io/v1beta1 +kind: KafkaCluster +metadata: + labels: + controller-tools.k8s.io: "1.0" + name: kafka +spec: + kRaft: false + monitoringConfig: + jmxImage: "ghcr.io/adobe/koperator/jmx-javaagent:1.4.0" + headlessServiceEnabled: true + zkAddresses: + - "zookeeper-server-client.zookeeper:2181" + propagateLabels: false + oneBrokerPerNode: false + clusterImage: "ghcr.io/adobe/koperator/kafka:2.13-3.9.1" + readOnlyConfig: | + auto.create.topics.enable=false + cruise.control.metrics.topic.auto.create=true + cruise.control.metrics.topic.num.partitions=1 + cruise.control.metrics.topic.replication.factor=2 + brokerConfigGroups: + default: + # podSecurityContext: + # runAsNonRoot: false + # securityContext: + # privileged: true + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi + brokerAnnotations: + prometheus.io/scrape: "true" + prometheus.io/port: "9020" + # brokerLabels: + # kafka_broker_group: "default_group" + brokers: + - id: 0 + brokerConfigGroup: "default" + # brokerConfig: + # envs: + # - name: +CLASSPATH + # value: "/opt/kafka/libs/dev/*:" + # - name: CLASSPATH+ + # value: ":/opt/kafka/libs/extra-jars/*" + - id: 1 + brokerConfigGroup: "default" + - id: 2 + brokerConfigGroup: "default" + - id: 3 + brokerConfigGroup: "default" + - id: 4 + brokerConfigGroup: "default" + rollingUpgradeConfig: + failureThreshold: 1 + listenersConfig: + internalListeners: + - type: "plaintext" + name: "internal" + containerPort: 29092 + usedForInnerBrokerCommunication: true + - type: "plaintext" + name: "controller" + containerPort: 29093 + usedForInnerBrokerCommunication: false + usedForControllerCommunication: true + cruiseControlConfig: + # podSecurityContext: + # runAsNonRoot: false + # securityContext: + # privileged: true + cruiseControlTaskSpec: + RetryDurationMinutes: 5 + topicConfig: + partitions: 12 + replicationFactor: 3 +# resourceRequirements: +# requests: +# cpu: 500m +# memory: 1Gi +# limits: +# cpu: 500m +# memory: 1Gi + image: "adobe/cruise-control:3.0.3-adbe-20250804" + config: | + # Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + # + # This is an example property file for Kafka Cruise Control. See KafkaCruiseControlConfig for more details. + # Configuration for the metadata client. + # ======================================= + # The maximum interval in milliseconds between two metadata refreshes. + #metadata.max.age.ms=300000 + # Client id for the Cruise Control. It is used for the metadata client. + #client.id=kafka-cruise-control + # The size of TCP send buffer bytes for the metadata client. + #send.buffer.bytes=131072 + # The size of TCP receive buffer size for the metadata client. + #receive.buffer.bytes=131072 + # The time to wait before disconnect an idle TCP connection. + #connections.max.idle.ms=540000 + # The time to wait before reconnect to a given host. + #reconnect.backoff.ms=50 + # The time to wait for a response from a host after sending a request. + #request.timeout.ms=30000 + # Configurations for the load monitor + # ======================================= + # The number of metric fetcher thread to fetch metrics for the Kafka cluster + num.metric.fetchers=1 + # The metric sampler class + metric.sampler.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler + # Configurations for CruiseControlMetricsReporterSampler + metric.reporter.topic.pattern=__CruiseControlMetrics + # The sample store class name + sample.store.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore + # The config for the Kafka sample store to save the partition metric samples + partition.metric.sample.store.topic=__KafkaCruiseControlPartitionMetricSamples + # The config for the Kafka sample store to save the model training samples + broker.metric.sample.store.topic=__KafkaCruiseControlModelTrainingSamples + # The replication factor of Kafka metric sample store topic + sample.store.topic.replication.factor=2 + # The config for the number of Kafka sample store consumer threads + num.sample.loading.threads=8 + # The partition assignor class for the metric samplers + metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor + # The metric sampling interval in milliseconds + metric.sampling.interval.ms=120000 + metric.anomaly.detection.interval.ms=180000 + # The partition metrics window size in milliseconds + partition.metrics.window.ms=300000 + # The number of partition metric windows to keep in memory + num.partition.metrics.windows=1 + # The minimum partition metric samples required for a partition in each window + min.samples.per.partition.metrics.window=1 + # The broker metrics window size in milliseconds + broker.metrics.window.ms=300000 + # The number of broker metric windows to keep in memory + num.broker.metrics.windows=20 + # The minimum broker metric samples required for a partition in each window + min.samples.per.broker.metrics.window=1 + # The configuration for the BrokerCapacityConfigFileResolver (supports JBOD and non-JBOD broker capacities) + capacity.config.file=config/capacity.json + #capacity.config.file=config/capacityJBOD.json + # Configurations for the analyzer + # ======================================= + # The list of goals to optimize the Kafka cluster for with pre-computed proposals + default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal + # The list of supported goals + goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal + # The list of supported hard goals + hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal + # The minimum percentage of well monitored partitions out of all the partitions + min.monitored.partition.percentage=0.95 + # The balance threshold for CPU + cpu.balance.threshold=1.1 + # The balance threshold for disk + disk.balance.threshold=1.1 + # The balance threshold for network inbound utilization + network.inbound.balance.threshold=1.1 + # The balance threshold for network outbound utilization + network.outbound.balance.threshold=1.1 + # The balance threshold for the replica count + replica.count.balance.threshold=1.1 + # The capacity threshold for CPU in percentage + cpu.capacity.threshold=0.8 + # The capacity threshold for disk in percentage + disk.capacity.threshold=0.8 + # The capacity threshold for network inbound utilization in percentage + network.inbound.capacity.threshold=0.8 + # The capacity threshold for network outbound utilization in percentage + network.outbound.capacity.threshold=0.8 + # The threshold to define the cluster to be in a low CPU utilization state + cpu.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low disk utilization state + disk.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low network inbound utilization state + network.inbound.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low disk utilization state + network.outbound.low.utilization.threshold=0.0 + # The metric anomaly percentile upper threshold + metric.anomaly.percentile.upper.threshold=90.0 + # The metric anomaly percentile lower threshold + metric.anomaly.percentile.lower.threshold=10.0 + # How often should the cached proposal be expired and recalculated if necessary + proposal.expiration.ms=60000 + # The maximum number of replicas that can reside on a broker at any given time. + max.replicas.per.broker=10000 + # The number of threads to use for proposal candidate precomputing. + num.proposal.precompute.threads=1 + # the topics that should be excluded from the partition movement. + #topics.excluded.from.partition.movement + # Configurations for the executor + # ======================================= + # The max number of partitions to move in/out on a given broker at a given time. + num.concurrent.partition.movements.per.broker=10 + # The interval between two execution progress checks. + execution.progress.check.interval.ms=10000 + # Configurations for anomaly detector + # ======================================= + # The goal violation notifier class + anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier + # The metric anomaly finder class + metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder + # The anomaly detection interval + anomaly.detection.interval.ms=10000 + # The goal violation to detect. + anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal + # The interested metrics for metric anomaly analyzer. + metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_MAX,BROKER_PRODUCE_LOCAL_TIME_MS_MEAN,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MAX,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MAX,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_LOG_FLUSH_TIME_MS_MAX,BROKER_LOG_FLUSH_TIME_MS_MEAN + ## Adjust accordingly if your metrics reporter is an older version and does not produce these metrics. + #metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_50TH,BROKER_PRODUCE_LOCAL_TIME_MS_999TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH,BROKER_LOG_FLUSH_TIME_MS_50TH,BROKER_LOG_FLUSH_TIME_MS_999TH + # The zk path to store failed broker information. + failed.brokers.zk.path=/CruiseControlBrokerList + # Topic config provider class + topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaTopicConfigProvider + # The cluster configurations for the KafkaTopicConfigProvider + cluster.configs.file=config/clusterConfigs.json + # The maximum time in milliseconds to store the response and access details of a completed user task. + completed.user.task.retention.time.ms=21600000 + # The maximum time in milliseconds to retain the demotion history of brokers. + demotion.history.retention.time.ms=86400000 + # The maximum number of completed user tasks for which the response and access details will be cached. + max.cached.completed.user.tasks=500 + # The maximum number of user tasks for concurrently running in async endpoints across all users. + max.active.user.tasks=25 + # Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled + self.healing.enabled=true + # Enable self healing for broker failure detector + #self.healing.broker.failure.enabled=true + # Enable self healing for goal violation detector + #self.healing.goal.violation.enabled=true + # Enable self healing for metric anomaly detector + #self.healing.metric.anomaly.enabled=true + # configurations for the webserver + # ================================ + # HTTP listen port + webserver.http.port=9090 + # HTTP listen address + webserver.http.address=0.0.0.0 + # Whether CORS support is enabled for API or not + webserver.http.cors.enabled=false + # Value for Access-Control-Allow-Origin + webserver.http.cors.origin=http://localhost:8080/ + # Value for Access-Control-Request-Method + webserver.http.cors.allowmethods=OPTIONS,GET,POST + # Headers that should be exposed to the Browser (Webapp) + # This is a special header that is used by the + # User Tasks subsystem and should be explicitly + # Enabled when CORS mode is used as part of the + # Admin Interface + webserver.http.cors.exposeheaders=User-Task-ID + # REST API default prefix + # (dont forget the ending *) + webserver.api.urlprefix=/kafkacruisecontrol/* + # Location where the Cruise Control frontend is deployed + webserver.ui.diskpath=./cruise-control-ui/dist/ + # URL path prefix for UI + # (dont forget the ending *) + webserver.ui.urlprefix=/* + # Time After which request is converted to Async + webserver.request.maxBlockTimeMs=10000 + # Default Session Expiry Period + webserver.session.maxExpiryTimeMs=60000 + # Session cookie path + webserver.session.path=/ + # Server Access Logs + webserver.accesslog.enabled=true + # Location of HTTP Request Logs + webserver.accesslog.path=access.log + # HTTP Request Log retention days + webserver.accesslog.retention.days=14 + clusterConfig: | + { + "min.insync.replicas": 2 + } diff --git a/controllers/cruisecontroltask_controller.go b/controllers/cruisecontroltask_controller.go index 0a4cb15c2..4c0efdc64 100644 --- a/controllers/cruisecontroltask_controller.go +++ b/controllers/cruisecontroltask_controller.go @@ -171,19 +171,26 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr } } case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRemoveBroker) > 0: - var removeTask *CruiseControlTask + brokerIDs := make([]string, 0) for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRemoveBroker) { - removeTask = task - break + if task == nil { + continue + } + brokerIDs = append(brokerIDs, task.BrokerID) } - cruiseControlOpRef, err := r.removeBroker(ctx, instance, operationTTLSecondsAfterFinished, removeTask.BrokerID) + cruiseControlOpRef, err := r.removeBrokers(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs) if err != nil { - return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for downscale has failed, brokerID: %s", removeTask.BrokerID), err) + return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for downscale has failed, brokerIDs: %s", brokerIDs), err) } - removeTask.SetCruiseControlOperationRef(cruiseControlOpRef) - removeTask.SetStateScheduled() + for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRemoveBroker) { + if task == nil { + continue + } + task.SetCruiseControlOperationRef(cruiseControlOpRef) + task.SetStateScheduled() + } case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRemoveDisks) > 0: brokerLogDirsToRemove := make(map[string][]string) @@ -367,8 +374,8 @@ func (r *CruiseControlTaskReconciler) addBrokers(ctx context.Context, kafkaClust return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs, false, nil) } -func (r *CruiseControlTaskReconciler) removeBroker(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerID string) (corev1.LocalObjectReference, error) { - return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID}, false, nil) +func (r *CruiseControlTaskReconciler) removeBrokers(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerIDs []string) (corev1.LocalObjectReference, error) { + return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, brokerIDs, false, nil) } func (r *CruiseControlTaskReconciler) removeDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerIdsToRemovedLogDirs map[string][]string) (corev1.LocalObjectReference, error) { diff --git a/controllers/cruisecontroltask_controller_test.go b/controllers/cruisecontroltask_controller_test.go index 600537b75..1b467fdfe 100644 --- a/controllers/cruisecontroltask_controller_test.go +++ b/controllers/cruisecontroltask_controller_test.go @@ -343,6 +343,17 @@ func TestCreateCCOperation(t *testing.T) { assert.Equal(t, "true", params[scale.ParamExcludeRemoved]) }, }, + { + operationType: banzaiv1alpha1.OperationRemoveBroker, + brokerIDs: []string{"1", "2", "3"}, + isJBOD: false, + brokerIdsToLogDirs: nil, + parameterCheck: func(t *testing.T, params map[string]string) { + assert.Equal(t, "1,2,3", params[scale.ParamBrokerID]) + assert.Equal(t, "true", params[scale.ParamExcludeDemoted]) + assert.Equal(t, "true", params[scale.ParamExcludeRemoved]) + }, + }, { operationType: banzaiv1alpha1.OperationRemoveDisks, brokerIDs: []string{"1", "2"}, diff --git a/controllers/tests/cruisecontroltask_controller_test.go b/controllers/tests/cruisecontroltask_controller_test.go index 3160c990a..a0e5accb4 100644 --- a/controllers/tests/cruisecontroltask_controller_test.go +++ b/controllers/tests/cruisecontroltask_controller_test.go @@ -456,6 +456,60 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }, taskExtendedTimeoutDuration, reconcilePollingPeriod).Should(BeTrue()) }) }) + When("multiple brokers are removed", Serial, func() { + JustBeforeEach(func(ctx SpecContext) { + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1()) + err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, kafkaCluster); err != nil { + return err + } + + for _, id := range []string{"1", "2"} { + brokerState := kafkaCluster.Status.BrokersState[id] + brokerState.GracefulActionState.CruiseControlState = v1beta1.GracefulDownscaleRequired + kafkaCluster.Status.BrokersState[id] = brokerState + } + return k8sClient.Status().Update(ctx, kafkaCluster) + }) + Expect(err).NotTo(HaveOccurred()) + }) + It("should create exactly one remove_broker CruiseControlOperation for all brokers", func(ctx SpecContext) { + Eventually(ctx, func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, kafkaCluster) + Expect(err).NotTo(HaveOccurred()) + + brokerState1, ok1 := kafkaCluster.Status.BrokersState["1"] + brokerState2, ok2 := kafkaCluster.Status.BrokersState["2"] + if !ok1 || !ok2 { + return false + } + if brokerState1.GracefulActionState.CruiseControlOperationReference == nil || + brokerState2.GracefulActionState.CruiseControlOperationReference == nil { + return false + } + + operationList := &v1alpha1.CruiseControlOperationList{} + err = k8sClient.List(ctx, operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) + Expect(err).NotTo(HaveOccurred()) + + if len(operationList.Items) != 1 { + return false + } + operation = &operationList.Items[0] + return operation.CurrentTaskOperation() == v1alpha1.OperationRemoveBroker && + brokerState1.GracefulActionState.CruiseControlOperationReference.Name == operation.Name && + brokerState2.GracefulActionState.CruiseControlOperationReference.Name == operation.Name && + brokerState1.GracefulActionState.CruiseControlState == v1beta1.GracefulDownscaleScheduled && + brokerState2.GracefulActionState.CruiseControlState == v1beta1.GracefulDownscaleScheduled + }, taskExtendedTimeoutDuration, reconcilePollingPeriod).Should(BeTrue()) + }) + }) }) func getScaleMockCCTask1() *mocks.MockCruiseControlScaler { diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index 3e2c35c3a..8e03fef04 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -72,6 +72,9 @@ var _ = ginkgo.When("Testing e2e test altogether", ginkgo.Ordered, func() { testInstallKafkaCluster("../../config/samples/simplekafkacluster_4disk.yaml") testMultiDiskRemoval() testUninstallKafkaCluster() + testInstallKafkaCluster("../../config/samples/simplekafkacluster_5broker.yaml") + testBatchedBrokerRemoval() + testUninstallKafkaCluster() testUninstallZookeeperCluster() // kraft tests testInstallKafkaCluster("../../config/samples/kraft/simplekafkacluster_kraft.yaml") diff --git a/tests/e2e/test_broker_removal.go b/tests/e2e/test_broker_removal.go new file mode 100644 index 000000000..5a97a1129 --- /dev/null +++ b/tests/e2e/test_broker_removal.go @@ -0,0 +1,122 @@ +// Copyright © 2025 Cisco Systems, Inc. and/or its affiliates +// Copyright 2025 Adobe. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build e2e + +package e2e + +import ( + "context" + "time" + + "github.com/gruntwork-io/terratest/modules/k8s" + ginkgo "github.com/onsi/ginkgo/v2" + gomega "github.com/onsi/gomega" + + "github.com/banzaicloud/koperator/api/v1beta1" +) + +const ( + batchedBrokerRemovalTimeout = 1200 * time.Second + batchedBrokerRemovalPollInterval = 15 * time.Second +) + +// testBatchedBrokerRemoval applies the 3-broker manifest over the running 5-broker cluster, +// waits for CruiseControl to complete removal, then asserts exactly one remove_broker +// CruiseControlOperation was created and only 3 broker pods remain Ready. +func testBatchedBrokerRemoval() bool { + return ginkgo.When("Batched broker removal: remove two brokers and assert single CC operation", func() { + var kubectlOptions k8s.KubectlOptions + var err error + + ginkgo.It("Acquiring K8s config and context", func() { + kubectlOptions, err = kubectlOptionsForCurrentContext() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace + }) + + ginkgo.It("Applying 3-broker manifest to trigger removal of brokers 3 and 4", func() { + ginkgo.By("Patching KafkaCluster to remove brokers 3 and 4") + applyK8sResourceManifest(kubectlOptions, "../../config/samples/simplekafkacluster.yaml") + }) + + ginkgo.It("Waiting for exactly one remove_broker CruiseControlOperation to be created", func() { + ginkgo.By("Polling until exactly one remove_broker CruiseControlOperation exists") + gomega.Eventually(context.Background(), func() (bool, error) { + return hasExactlyOneRemoveBrokerOperation(kubectlOptions) + }, batchedBrokerRemovalTimeout, batchedBrokerRemovalPollInterval).Should(gomega.BeTrue()) + }) + + ginkgo.It("Asserting exactly one remove_broker CruiseControlOperation was created", func() { + ok, err := hasExactlyOneRemoveBrokerOperation(kubectlOptions) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(ok).To(gomega.BeTrue(), "expected exactly one remove_broker CruiseControlOperation") + }) + + ginkgo.It("Waiting for brokers 3 and 4 to be removed (only 3 pods remain)", func() { + ginkgo.By("Waiting until only 3 kafka broker pods are Ready") + gomega.Eventually(context.Background(), func() (bool, error) { + return hasExactlyNBrokerPods(kubectlOptions, 3) + }, batchedBrokerRemovalTimeout, batchedBrokerRemovalPollInterval).Should(gomega.BeTrue()) + }) + + ginkgo.It("Asserting remaining Kafka brokers are healthy", func() { + err := waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", defaultPodReadinessWaitTime, + v1beta1.KafkaCRLabelKey+"="+kafkaClusterName+","+kafkaLabelSelectorBrokers, "") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + }) +} + +// hasExactlyOneRemoveBrokerOperation returns true if there is exactly one CruiseControlOperation +// of type remove_broker in the namespace. +func hasExactlyOneRemoveBrokerOperation(kubectlOptions k8s.KubectlOptions) (bool, error) { + ops, err := getK8sResources(kubectlOptions, + []string{"cruisecontroloperation"}, + "", + "", + "-o", "jsonpath={range .items[*]}{.status.currentTask.operation}{'\\n'}{end}", + ) + if err != nil { + return false, err + } + + count := 0 + for _, op := range ops { + if op == "removeBroker" { + count++ + } + } + return count == 1, nil +} + +// hasExactlyNBrokerPods returns true when exactly n broker pods exist in the namespace. +func hasExactlyNBrokerPods(kubectlOptions k8s.KubectlOptions, n int) (bool, error) { + pods, err := getK8sResources(kubectlOptions, + []string{"pod"}, + v1beta1.KafkaCRLabelKey+"="+kafkaClusterName+","+kafkaLabelSelectorBrokers, + "", + "--field-selector=status.phase=Running", + ) + if err != nil { + return false, err + } + // subtract 1 for the header line + actual := len(pods) - 1 + if actual < 0 { + actual = 0 + } + return actual == n, nil +}