From 8481219c20a8f1322025cd0bb0957ab6b6572d94 Mon Sep 17 00:00:00 2001 From: Razvan Dobre Date: Mon, 18 May 2026 18:52:41 +0300 Subject: [PATCH 1/3] Fix disk removal deadlock during rolling upgrade When a broker pod is deleted during rolling upgrade and a disk removal is pending (GracefulDiskRemovalScheduled), the operator enters a deadlock: reconcileKafkaPvc blocks the entire reconcile with "Disk removal pending", preventing reconcileKafkaPod from recreating the missing pod. Meanwhile, Cruise Control cannot complete the disk removal because the broker isn't running. Fix: move runningBrokers map building before reconcileKafkaPvc and pass it in. Before returning the blocking error, check if any broker with pending disk removal has a missing pod. If so, allow the reconcile to proceed so the pod can be recreated. The disk removal check is re-evaluated on the next cycle once the broker is back up. Co-Authored-By: Claude Opus 4.6 (1M context) --- pkg/resources/kafka/kafka.go | 33 +++++++++++++++++++------ pkg/resources/kafka/kafka_test.go | 41 ++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 9aa91808f..87140ea44 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -322,13 +322,6 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { brokersVolumes[strconv.Itoa(int(broker.Id))] = brokerVolumes } } - if len(brokersVolumes) > 0 { - err := r.reconcileKafkaPvc(ctx, log, brokersVolumes) - if err != nil { - return errors.WrapIfWithDetails(err, "failed to reconcile resource", "resources", "PersistentVolumeClaim") - } - } - var brokerPods corev1.PodList matchingLabels := client.MatchingLabels(apiutil.LabelsForKafka(r.KafkaCluster.Name)) err = r.List(ctx, &brokerPods, client.ListOption(client.InNamespace(r.KafkaCluster.Namespace)), client.ListOption(matchingLabels)) @@ -342,6 +335,13 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { runningBrokers[brokerID] = struct{}{} } + if len(brokersVolumes) > 0 { + err := r.reconcileKafkaPvc(ctx, log, brokersVolumes, runningBrokers) + if err != nil { + return errors.WrapIfWithDetails(err, "failed to reconcile resource", "resources", "PersistentVolumeClaim") + } + } + var pvcList corev1.PersistentVolumeClaimList err = r.List(ctx, &pvcList, client.ListOption(client.InNamespace(r.KafkaCluster.Namespace)), client.ListOption(matchingLabels)) if err != nil { @@ -1137,7 +1137,7 @@ func (r *Reconciler) isPodTainted(log logr.Logger, pod *corev1.Pod) bool { } //nolint:funlen -func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim) error { +func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim, runningBrokers map[string]struct{}) error { brokersVolumesState := make(map[string]map[string]banzaiv1beta1.VolumeState) var brokerIds []string waitForDiskRemovalToFinish := false @@ -1276,6 +1276,23 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro } if waitForDiskRemovalToFinish { + // Don't block if any broker with pending disk removal has a missing pod. + // Blocking prevents pod recreation, creating a deadlock where CC can't + // complete the disk removal because the broker isn't running. + for brokerId := range brokersDesiredPvcs { + if _, podExists := runningBrokers[brokerId]; !podExists { + if brokerState, ok := r.KafkaCluster.Status.BrokersState[brokerId]; ok { + for _, volumeState := range brokerState.GracefulActionState.VolumeStates { + if volumeState.CruiseControlVolumeState.IsDiskRemoval() { + log.Info("Disk removal pending but broker pod is missing, "+ + "allowing reconcile to proceed for pod recreation", + "brokerId", brokerId) + return nil + } + } + } + } + } return errorfactory.New(errorfactory.CruiseControlTaskRunning{}, errors.New("Disk removal pending"), "Disk removal pending") } diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 4ca517e6f..4a180c3a2 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -47,6 +47,7 @@ type PvcTestCase struct { testName string brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim existingPvcs []*corev1.PersistentVolumeClaim + runningBrokers map[string]struct{} kafkaClusterSpec v1beta1.KafkaClusterSpec kafkaClusterStatus v1beta1.KafkaClusterStatus expectedError bool @@ -1002,6 +1003,7 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) { createPvc("test-pvc-1", "0", "/path/to/mount1"), createPvc("test-pvc-2", "0", "/path/to/mount2"), }, + runningBrokers: map[string]struct{}{"0": {}}, kafkaClusterStatus: v1beta1.KafkaClusterStatus{ BrokersState: map[string]v1beta1.BrokerState{ "0": { @@ -1032,6 +1034,7 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) { createPvc("test-pvc-1", "0", "/path/to/mount1"), createPvc("test-pvc-2", "0", "/path/to/mount2"), }, + runningBrokers: map[string]struct{}{"0": {}}, kafkaClusterStatus: v1beta1.KafkaClusterStatus{ BrokersState: map[string]v1beta1.BrokerState{ "0": { @@ -1062,6 +1065,7 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) { createPvc("test-pvc-1", "0", "/path/to/mount1"), createPvc("test-pvc-2", "0", "/path/to/mount2"), }, + runningBrokers: map[string]struct{}{"0": {}}, kafkaClusterStatus: v1beta1.KafkaClusterStatus{ BrokersState: map[string]v1beta1.BrokerState{ "0": { @@ -1081,6 +1085,37 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) { "/path/to/mount2": v1beta1.GracefulDiskRemovalRunning, }, }, + { + testName: "Disk removal pending but broker pod missing - allow reconcile to proceed", + brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ + "0": { + createPvc("test-pvc-1", "0", "/path/to/mount1"), + }, + }, + existingPvcs: []*corev1.PersistentVolumeClaim{ + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + runningBrokers: map[string]struct{}{}, // broker pod is missing + kafkaClusterStatus: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/path/to/mount2": { + CruiseControlVolumeState: v1beta1.GracefulDiskRemovalScheduled, + }, + }, + }, + }, + }, + }, + expectedError: false, + expectedDeletePvc: false, + expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{ + "/path/to/mount2": v1beta1.GracefulDiskRemovalScheduled, + }, + }, { testName: "If disk removal successful, do not return error and delete pvc and volume state", brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ @@ -1291,8 +1326,12 @@ func execPvcTest(t *testing.T, testCases []PvcTestCase) { // Set up the r.KafkaCluster.Status with the provided test.kafkaClusterStatus r.KafkaCluster.Status = test.kafkaClusterStatus + runningBrokers := test.runningBrokers + if runningBrokers == nil { + runningBrokers = make(map[string]struct{}) + } // Call the reconcileKafkaPvc function with the provided test.brokersDesiredPvcs - err := r.reconcileKafkaPvc(context.TODO(), logf.Log, test.brokersDesiredPvcs) + err := r.reconcileKafkaPvc(context.TODO(), logf.Log, test.brokersDesiredPvcs, runningBrokers) // Test that the expected error is returned if test.expectedError { From cb8cd148013139a9fff86e9464b132d33674d3b1 Mon Sep 17 00:00:00 2001 From: Razvan Dobre Date: Mon, 18 May 2026 19:07:31 +0300 Subject: [PATCH 2/3] Address review findings: cover IsDiskRebalance, add mountPath to log, add tests - Fix #1 (HIGH): Override now checks IsDiskRebalance() in addition to IsDiskRemoval(), closing the same deadlock vector for rebalance states - Fix #2 (LOW): Include mountPath in the bypass log message for consistency with other disk-removal log messages - Fix #3 (LOW): Add tests for rebalance-state deadlock bypass and for newly-marked-for-removal with missing pod Co-Authored-By: Claude Opus 4.6 (1M context) --- pkg/resources/kafka/kafka.go | 10 ++--- pkg/resources/kafka/kafka_test.go | 65 +++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 87140ea44..0695cde1f 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -1276,17 +1276,17 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro } if waitForDiskRemovalToFinish { - // Don't block if any broker with pending disk removal has a missing pod. + // Don't block if any broker with pending disk removal/rebalance has a missing pod. // Blocking prevents pod recreation, creating a deadlock where CC can't - // complete the disk removal because the broker isn't running. + // complete the disk removal or rebalance because the broker isn't running. for brokerId := range brokersDesiredPvcs { if _, podExists := runningBrokers[brokerId]; !podExists { if brokerState, ok := r.KafkaCluster.Status.BrokersState[brokerId]; ok { - for _, volumeState := range brokerState.GracefulActionState.VolumeStates { - if volumeState.CruiseControlVolumeState.IsDiskRemoval() { + for mountPath, volumeState := range brokerState.GracefulActionState.VolumeStates { + if volumeState.CruiseControlVolumeState.IsDiskRemoval() || volumeState.CruiseControlVolumeState.IsDiskRebalance() { log.Info("Disk removal pending but broker pod is missing, "+ "allowing reconcile to proceed for pod recreation", - "brokerId", brokerId) + "brokerId", brokerId, "mountPath", mountPath) return nil } } diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 4a180c3a2..c268d7564 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1116,6 +1116,71 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) { "/path/to/mount2": v1beta1.GracefulDiskRemovalScheduled, }, }, + { + testName: "Disk rebalance pending but broker pod missing - allow reconcile to proceed", + brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ + "0": { + createPvc("test-pvc-1", "0", "/path/to/mount1"), + }, + }, + existingPvcs: []*corev1.PersistentVolumeClaim{ + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + runningBrokers: map[string]struct{}{}, // broker pod is missing + kafkaClusterStatus: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/path/to/mount2": { + CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceScheduled, + }, + }, + }, + }, + }, + }, + expectedError: false, + expectedDeletePvc: false, + expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{ + "/path/to/mount2": v1beta1.GracefulDiskRebalanceScheduled, + }, + }, + { + testName: "Disk newly marked for removal with pod missing - allow reconcile to proceed", + brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ + "0": { + createPvc("test-pvc-1", "0", "/path/to/mount1"), + }, + }, + existingPvcs: []*corev1.PersistentVolumeClaim{ + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + runningBrokers: map[string]struct{}{}, // broker pod is missing + kafkaClusterStatus: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/path/to/mount2": { + // GracefulDiskRebalanceSucceeded triggers the default case in + // handleDiskRemoval which marks GracefulDiskRemovalRequired. + // With pod missing, the override should still allow proceeding. + CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceSucceeded, + }, + }, + }, + }, + }, + }, + expectedError: false, + expectedDeletePvc: false, + expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{ + "/path/to/mount2": v1beta1.GracefulDiskRemovalRequired, + }, + }, { testName: "If disk removal successful, do not return error and delete pvc and volume state", brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ From c0c20d0571890dc145859d4e0119be01a8c40219 Mon Sep 17 00:00:00 2001 From: Razvan Dobre Date: Mon, 18 May 2026 19:11:56 +0300 Subject: [PATCH 3/3] Add OpenSpec artifacts for disk removal deadlock fix Proposal, design, and task tracking for the fix. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../fix-disk-removal-deadlock/design.md | 98 +++++++++++++++++++ .../fix-disk-removal-deadlock/proposal.md | 63 ++++++++++++ .../fix-disk-removal-deadlock/tasks.md | 26 +++++ 3 files changed, 187 insertions(+) create mode 100644 openspec/changes/fix-disk-removal-deadlock/design.md create mode 100644 openspec/changes/fix-disk-removal-deadlock/proposal.md create mode 100644 openspec/changes/fix-disk-removal-deadlock/tasks.md diff --git a/openspec/changes/fix-disk-removal-deadlock/design.md b/openspec/changes/fix-disk-removal-deadlock/design.md new file mode 100644 index 000000000..ba6f221d2 --- /dev/null +++ b/openspec/changes/fix-disk-removal-deadlock/design.md @@ -0,0 +1,98 @@ +# Design: Fix Disk Removal Deadlock + +## Architecture Context + +The koperator reconcile loop in `pkg/resources/kafka/kafka.go` (main `Reconcile` function) runs these steps sequentially: + +``` +reconcileKafkaPodDelete() // Line 265 - delete pods removed from spec + ↓ +reconcileKafkaPvc() // Line 326 - PVC lifecycle (create, resize, disk removal) + ↓ ← BLOCKS HERE when disk removal pending +build runningBrokers map // Line 332 - query pod list + ↓ +reconcileKafkaPod() // Line 448 - per-broker pod create/update/rolling upgrade +``` + +When `reconcileKafkaPvc` returns an error, all subsequent steps are skipped. + +Inside `reconcileKafkaPvc`: +- Iterates ALL brokers' PVCs +- Calls `handleDiskRemoval()` when existing PVCs > desired PVCs +- `handleDiskRemoval` sets `waitForDiskRemovalToFinish = true` for any non-succeeded removal state +- At the end, if `waitForDiskRemovalToFinish` → returns `CruiseControlTaskRunning` error + +Inside `reconcileKafkaPod`: +- When `len(podList.Items) == 0` → creates pod (line 831-836) +- When `len(podList.Items) == 1` → handles rolling upgrade via `handleRollingUpgrade()` + +## Design Decision + +### Where to put the check + +**Option A**: Inside `handleDiskRemoval` — skip `waitForDiskRemovalToFinish = true` per-broker if pod missing. +**Option B**: At the end of `reconcileKafkaPvc` — override the error if any broker with removal has missing pod. +**Option C**: In the main reconcile — catch the error and decide whether to proceed. + +**Chosen: Option B.** Reasons: +- Minimal change surface (only the blocking decision at line 1278) +- `handleDiskRemoval` still correctly tracks state and logs — no behavior change inside it +- The main reconcile doesn't need to understand PVC internals +- Easy to test: one function, one new parameter + +### What data is needed + +`reconcileKafkaPvc` needs to know which broker pods exist. The `runningBrokers` map (currently built at line 332) provides this. Move it earlier and pass it in. + +### Behavioral change + +| Scenario | Current | After Fix | +|---|---|---| +| Disk removal pending, all pods running | Block (error) | Block (error) — unchanged | +| Disk removal pending, broker pod missing | Block (error) — DEADLOCK | Allow (nil) — pod gets created | +| No disk removal pending | Allow (nil) | Allow (nil) — unchanged | + +## Key Code Paths + +### `handleDiskRemoval` (line 1285-1339) + +``` +for each existing PVC not in desired: + if volumeState not found → continue (removal done) + if IsDiskRemovalSucceeded → delete PVC, delete status + if IsDiskRemoval → waitForDiskRemovalToFinish = true ← these are the blocking states + if IsDiskRebalance → waitForDiskRemovalToFinish = true ← (rebalance before removal) + default → mark GracefulDiskRemovalRequired, wait = true ← initial marking +return waitForDiskRemovalToFinish +``` + +### `reconcileKafkaPvc` blocking (line 1278-1280) + +```go +if waitForDiskRemovalToFinish { + return errorfactory.New(CruiseControlTaskRunning{}, "Disk removal pending", ...) +} +``` + +The fix adds a check before this return: +```go +if waitForDiskRemovalToFinish { + // Check if any broker with pending removal has a missing pod + for brokerId := range brokersDesiredPvcs { + if _, podExists := runningBrokers[brokerId]; !podExists { + if state has IsDiskRemoval volume → return nil + } + } + return error // all relevant pods exist, block normally +} +``` + +## Edge Cases + +1. **Multiple brokers with missing pods**: Still returns nil. All missing pods will be created on this reconcile cycle. + +2. **Broker pod missing but NO disk removal for that broker**: `runningBrokers` missing + no IsDiskRemoval volume state → doesn't trigger the override. The error is still returned. This is intentional — we only bypass when the deadlock condition is present. + +3. **Pod deleted between runningBrokers check and reconcileKafkaPod**: Possible but harmless — `reconcileKafkaPod` re-queries the pod list per broker (line 826). + +4. **Disk removal completes while pod is being created**: The next reconcile cycle will see `IsDiskRemovalSucceeded` and clean up the PVC. No conflict. diff --git a/openspec/changes/fix-disk-removal-deadlock/proposal.md b/openspec/changes/fix-disk-removal-deadlock/proposal.md new file mode 100644 index 000000000..c144fdb31 --- /dev/null +++ b/openspec/changes/fix-disk-removal-deadlock/proposal.md @@ -0,0 +1,63 @@ +# Fix: Disk Removal Deadlock During Rolling Upgrade + +**Status**: proposed +**Created**: 2026-05-18 + +## Problem + +When a broker's pod is deleted during a rolling upgrade AND a disk removal is pending (`GracefulDiskRemovalScheduled`), the operator enters a deadlock: + +1. `reconcileKafkaPvc` blocks the entire reconcile with "Disk removal pending" error +2. `reconcileKafkaPod` is never reached, so the pod is never recreated +3. Cruise Control cannot complete the disk removal because the broker isn't running +4. The cluster is stuck in `ClusterRollingUpgrading` indefinitely + +This was observed in production: broker 103 of a 9-broker cluster (`pipeline-kafka`) had its pod deleted at 14:27:33 and was never recreated. The operator looped every ~20s for 10+ minutes with "Disk removal pending". + +## Root Cause + +In `pkg/resources/kafka/kafka.go`, the main reconcile function processes steps sequentially: + +``` +Line 326: reconcileKafkaPvc() ← blocks here with "Disk removal pending" +Line 332: build runningBrokers ← never reached +Line 448: reconcileKafkaPod() ← never reached (this creates missing pods) +``` + +`reconcileKafkaPvc` checks disk removal for ALL brokers. If ANY broker has pending removal, it returns a `CruiseControlTaskRunning` error that aborts the ENTIRE reconcile — including pod creation for brokers whose pods are missing. + +The deadlock emerges across reconcile cycles: +- **Cycle N**: PVC check passes (states not set yet) → rolling upgrade deletes pod → returns +- **Cycle N+1**: PVC check sets `GracefulDiskRemovalRequired` → blocks → pod never recreated +- **Cycle N+2...∞**: Same. Deadlock. + +## Proposed Fix + +**Don't block on disk removal when a broker's pod doesn't exist.** + +CC disk removal REQUIRES the broker to be running (it moves partition replicas off the disk). Blocking pod creation while waiting for CC is counterproductive. The disk removal check is re-evaluated every reconcile cycle, so once the pod is back up, the check will correctly block again if still needed. + +### Changes + +**`pkg/resources/kafka/kafka.go`**: + +1. Move the `runningBrokers` map building (lines 332-343) to BEFORE `reconcileKafkaPvc` (line 326) +2. Pass `runningBrokers` to `reconcileKafkaPvc` +3. In `reconcileKafkaPvc` (line 1278), before returning "Disk removal pending" error: check if any broker with pending disk removal has a missing pod. If yes, return `nil` instead. + +**`pkg/resources/kafka/kafka_test.go`**: + +- Update existing `reconcileKafkaPvc` tests for new signature +- Add test: disk removal pending + broker pod missing → returns `nil` +- Add test: disk removal pending + all pods running → returns error (unchanged behavior) + +## Scope + +- This fix is purely in the reconcile ordering/blocking logic +- No changes to Cruise Control integration, disk removal flow, or rolling upgrade semantics +- Existing behavior is preserved when all broker pods are running +- Only changes behavior when a broker pod is missing AND disk removal is pending + +## Risk + +**Low.** The fix only relaxes a blocking condition in a specific scenario (missing pod + pending disk removal) where the current behavior is provably wrong (deadlock). The disk removal check continues to work normally once the pod is recreated. diff --git a/openspec/changes/fix-disk-removal-deadlock/tasks.md b/openspec/changes/fix-disk-removal-deadlock/tasks.md new file mode 100644 index 000000000..f7e51ea13 --- /dev/null +++ b/openspec/changes/fix-disk-removal-deadlock/tasks.md @@ -0,0 +1,26 @@ +# Tasks: Fix Disk Removal Deadlock + +## Task 1: Move `runningBrokers` before `reconcileKafkaPvc` [x] +- **File**: `pkg/resources/kafka/kafka.go` +- **What**: Move the broker pod list query (lines 332-343) to before `reconcileKafkaPvc` call (line 326). Remove the duplicate query at its original location. +- **Details**: The `var brokerPods` / `runningBrokers` block currently runs AFTER `reconcileKafkaPvc`. Move it before. Pass `runningBrokers` to `reconcileKafkaPvc`. + +## Task 2: Update `reconcileKafkaPvc` to accept and use `runningBrokers` [x] +- **File**: `pkg/resources/kafka/kafka.go` +- **What**: + 1. Add `runningBrokers map[string]struct{}` parameter to `reconcileKafkaPvc` + 2. At line 1278, before returning "Disk removal pending" error: check if any broker in `brokersDesiredPvcs` has a missing pod AND has a `IsDiskRemoval()` volume state. If so, return nil. +- **Details**: This is the core fix. The check iterates `brokersDesiredPvcs` keys, looks up `runningBrokers`, and if a pod is missing checks the broker's volume states for active disk removal. + +## Task 3: Update tests [x] +- **File**: `pkg/resources/kafka/kafka_test.go` +- **What**: + 1. Update all existing callers of `reconcileKafkaPvc` to pass the new `runningBrokers` parameter + 2. Add test case: disk removal pending + broker pod missing → returns nil + 3. Add test case: disk removal pending + all pods present → returns CruiseControlTaskRunning error +- **Details**: The new test cases should set up a KafkaCluster with a broker whose volume state is `GracefulDiskRemovalScheduled`, then call `reconcileKafkaPvc` with/without the broker in `runningBrokers`. + +## Task 4: Verify [x] +- Run `go test ./pkg/resources/kafka/...` +- Run `make test` for full suite +- Run `go vet ./...` and `go build ./...`