diff --git a/pkg/util/util.go b/pkg/util/util.go index 6b3007201..a6a396eb1 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -281,6 +281,17 @@ func ShouldIncludeBroker(brokerConfig *v1beta1.BrokerConfig, status v1beta1.Kafk } } } + + // Broker removed from spec but still draining — keep in external listener config until CC finishes + if brokerConfig == nil { + if brokerState, ok := status.BrokersState[strconv.Itoa(brokerID)]; ok { + ccState := brokerState.GracefulActionState.CruiseControlState + if ccState.IsDownscale() && !ccState.IsSucceeded() && + apiutil.StringSliceContains(brokerState.ExternalListenerConfigNames, ingressConfigName) { + return true + } + } + } return false } diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index db84482ed..22533352a 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -930,3 +930,72 @@ func TestGetMD5Hash(t *testing.T) { } } } + +func TestShouldIncludeBroker(t *testing.T) { + t.Parallel() + + const ingressConfig = "default" + brokerID := 5 + + makeStatus := func(state v1beta1.CruiseControlState) v1beta1.KafkaClusterStatus { + return v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "5": { + GracefulActionState: v1beta1.GracefulActionState{ + CruiseControlState: state, + }, + ExternalListenerConfigNames: v1beta1.ExternalListenerConfigNames{ingressConfig}, + }, + }, + } + } + + testCases := []struct { + name string + state v1beta1.CruiseControlState + expected bool + }{ + {"GracefulDownscaleRequired", v1beta1.GracefulDownscaleRequired, true}, + {"GracefulDownscaleScheduled", v1beta1.GracefulDownscaleScheduled, true}, + {"GracefulDownscaleRunning", v1beta1.GracefulDownscaleRunning, true}, + {"GracefulDownscaleCompletedWithError", v1beta1.GracefulDownscaleCompletedWithError, true}, + {"GracefulDownscalePaused", v1beta1.GracefulDownscalePaused, true}, + {"GracefulDownscaleSucceeded", v1beta1.GracefulDownscaleSucceeded, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := ShouldIncludeBroker(nil, makeStatus(tc.state), brokerID, ingressConfig, ingressConfig) + if result != tc.expected { + t.Errorf("state %s: expected %v, got %v", tc.name, tc.expected, result) + } + }) + } + + t.Run("no broker state entry", func(t *testing.T) { + emptyStatus := v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{}, + } + result := ShouldIncludeBroker(nil, emptyStatus, brokerID, ingressConfig, ingressConfig) + if result { + t.Error("expected false for broker with no status entry, got true") + } + }) + + t.Run("ingress config not in ExternalListenerConfigNames", func(t *testing.T) { + status := v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "5": { + GracefulActionState: v1beta1.GracefulActionState{ + CruiseControlState: v1beta1.GracefulDownscaleRunning, + }, + ExternalListenerConfigNames: v1beta1.ExternalListenerConfigNames{"other-ingress"}, + }, + }, + } + result := ShouldIncludeBroker(nil, status, brokerID, ingressConfig, ingressConfig) + if result { + t.Error("expected false when ingressConfig not in ExternalListenerConfigNames, got true") + } + }) +}