Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions openspec/changes/fix-disk-removal-deadlock/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Design: Fix Disk Removal Deadlock

## Architecture Context

The koperator reconcile loop in `pkg/resources/kafka/kafka.go` (main `Reconcile` function) runs these steps sequentially:

```
reconcileKafkaPodDelete() // Line 265 - delete pods removed from spec
reconcileKafkaPvc() // Line 326 - PVC lifecycle (create, resize, disk removal)
↓ ← BLOCKS HERE when disk removal pending
build runningBrokers map // Line 332 - query pod list
reconcileKafkaPod() // Line 448 - per-broker pod create/update/rolling upgrade
```

When `reconcileKafkaPvc` returns an error, all subsequent steps are skipped.

Inside `reconcileKafkaPvc`:
- Iterates ALL brokers' PVCs
- Calls `handleDiskRemoval()` when existing PVCs > desired PVCs
- `handleDiskRemoval` sets `waitForDiskRemovalToFinish = true` for any non-succeeded removal state
- At the end, if `waitForDiskRemovalToFinish` → returns `CruiseControlTaskRunning` error

Inside `reconcileKafkaPod`:
- When `len(podList.Items) == 0` → creates pod (line 831-836)
- When `len(podList.Items) == 1` → handles rolling upgrade via `handleRollingUpgrade()`

## Design Decision

### Where to put the check

**Option A**: Inside `handleDiskRemoval` — skip `waitForDiskRemovalToFinish = true` per-broker if pod missing.
**Option B**: At the end of `reconcileKafkaPvc` — override the error if any broker with removal has missing pod.
**Option C**: In the main reconcile — catch the error and decide whether to proceed.

**Chosen: Option B.** Reasons:
- Minimal change surface (only the blocking decision at line 1278)
- `handleDiskRemoval` still correctly tracks state and logs — no behavior change inside it
- The main reconcile doesn't need to understand PVC internals
- Easy to test: one function, one new parameter

### What data is needed

`reconcileKafkaPvc` needs to know which broker pods exist. The `runningBrokers` map (currently built at line 332) provides this. Move it earlier and pass it in.

### Behavioral change

| Scenario | Current | After Fix |
|---|---|---|
| Disk removal pending, all pods running | Block (error) | Block (error) — unchanged |
| Disk removal pending, broker pod missing | Block (error) — DEADLOCK | Allow (nil) — pod gets created |
| No disk removal pending | Allow (nil) | Allow (nil) — unchanged |

## Key Code Paths

### `handleDiskRemoval` (line 1285-1339)

```
for each existing PVC not in desired:
if volumeState not found → continue (removal done)
if IsDiskRemovalSucceeded → delete PVC, delete status
if IsDiskRemoval → waitForDiskRemovalToFinish = true ← these are the blocking states
if IsDiskRebalance → waitForDiskRemovalToFinish = true ← (rebalance before removal)
default → mark GracefulDiskRemovalRequired, wait = true ← initial marking
return waitForDiskRemovalToFinish
```

### `reconcileKafkaPvc` blocking (line 1278-1280)

```go
if waitForDiskRemovalToFinish {
return errorfactory.New(CruiseControlTaskRunning{}, "Disk removal pending", ...)
}
```

The fix adds a check before this return:
```go
if waitForDiskRemovalToFinish {
// Check if any broker with pending removal has a missing pod
for brokerId := range brokersDesiredPvcs {
if _, podExists := runningBrokers[brokerId]; !podExists {
if state has IsDiskRemoval volume → return nil
}
}
return error // all relevant pods exist, block normally
}
```

## Edge Cases

1. **Multiple brokers with missing pods**: Still returns nil. All missing pods will be created on this reconcile cycle.

2. **Broker pod missing but NO disk removal for that broker**: `runningBrokers` missing + no IsDiskRemoval volume state → doesn't trigger the override. The error is still returned. This is intentional — we only bypass when the deadlock condition is present.

3. **Pod deleted between runningBrokers check and reconcileKafkaPod**: Possible but harmless — `reconcileKafkaPod` re-queries the pod list per broker (line 826).

4. **Disk removal completes while pod is being created**: The next reconcile cycle will see `IsDiskRemovalSucceeded` and clean up the PVC. No conflict.
63 changes: 63 additions & 0 deletions openspec/changes/fix-disk-removal-deadlock/proposal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Fix: Disk Removal Deadlock During Rolling Upgrade

**Status**: proposed
**Created**: 2026-05-18

## Problem

When a broker's pod is deleted during a rolling upgrade AND a disk removal is pending (`GracefulDiskRemovalScheduled`), the operator enters a deadlock:

1. `reconcileKafkaPvc` blocks the entire reconcile with "Disk removal pending" error
2. `reconcileKafkaPod` is never reached, so the pod is never recreated
3. Cruise Control cannot complete the disk removal because the broker isn't running
4. The cluster is stuck in `ClusterRollingUpgrading` indefinitely

This was observed in production: broker 103 of a 9-broker cluster (`pipeline-kafka`) had its pod deleted at 14:27:33 and was never recreated. The operator looped every ~20s for 10+ minutes with "Disk removal pending".

## Root Cause

In `pkg/resources/kafka/kafka.go`, the main reconcile function processes steps sequentially:

```
Line 326: reconcileKafkaPvc() ← blocks here with "Disk removal pending"
Line 332: build runningBrokers ← never reached
Line 448: reconcileKafkaPod() ← never reached (this creates missing pods)
```

`reconcileKafkaPvc` checks disk removal for ALL brokers. If ANY broker has pending removal, it returns a `CruiseControlTaskRunning` error that aborts the ENTIRE reconcile — including pod creation for brokers whose pods are missing.

The deadlock emerges across reconcile cycles:
- **Cycle N**: PVC check passes (states not set yet) → rolling upgrade deletes pod → returns
- **Cycle N+1**: PVC check sets `GracefulDiskRemovalRequired` → blocks → pod never recreated
- **Cycle N+2...∞**: Same. Deadlock.

## Proposed Fix

**Don't block on disk removal when a broker's pod doesn't exist.**

CC disk removal REQUIRES the broker to be running (it moves partition replicas off the disk). Blocking pod creation while waiting for CC is counterproductive. The disk removal check is re-evaluated every reconcile cycle, so once the pod is back up, the check will correctly block again if still needed.

### Changes

**`pkg/resources/kafka/kafka.go`**:

1. Move the `runningBrokers` map building (lines 332-343) to BEFORE `reconcileKafkaPvc` (line 326)
2. Pass `runningBrokers` to `reconcileKafkaPvc`
3. In `reconcileKafkaPvc` (line 1278), before returning "Disk removal pending" error: check if any broker with pending disk removal has a missing pod. If yes, return `nil` instead.

**`pkg/resources/kafka/kafka_test.go`**:

- Update existing `reconcileKafkaPvc` tests for new signature
- Add test: disk removal pending + broker pod missing → returns `nil`
- Add test: disk removal pending + all pods running → returns error (unchanged behavior)

## Scope

- This fix is purely in the reconcile ordering/blocking logic
- No changes to Cruise Control integration, disk removal flow, or rolling upgrade semantics
- Existing behavior is preserved when all broker pods are running
- Only changes behavior when a broker pod is missing AND disk removal is pending

## Risk

**Low.** The fix only relaxes a blocking condition in a specific scenario (missing pod + pending disk removal) where the current behavior is provably wrong (deadlock). The disk removal check continues to work normally once the pod is recreated.
26 changes: 26 additions & 0 deletions openspec/changes/fix-disk-removal-deadlock/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Tasks: Fix Disk Removal Deadlock

## Task 1: Move `runningBrokers` before `reconcileKafkaPvc` [x]
- **File**: `pkg/resources/kafka/kafka.go`
- **What**: Move the broker pod list query (lines 332-343) to before `reconcileKafkaPvc` call (line 326). Remove the duplicate query at its original location.
- **Details**: The `var brokerPods` / `runningBrokers` block currently runs AFTER `reconcileKafkaPvc`. Move it before. Pass `runningBrokers` to `reconcileKafkaPvc`.

## Task 2: Update `reconcileKafkaPvc` to accept and use `runningBrokers` [x]
- **File**: `pkg/resources/kafka/kafka.go`
- **What**:
1. Add `runningBrokers map[string]struct{}` parameter to `reconcileKafkaPvc`
2. At line 1278, before returning "Disk removal pending" error: check if any broker in `brokersDesiredPvcs` has a missing pod AND has a `IsDiskRemoval()` volume state. If so, return nil.
- **Details**: This is the core fix. The check iterates `brokersDesiredPvcs` keys, looks up `runningBrokers`, and if a pod is missing checks the broker's volume states for active disk removal.

## Task 3: Update tests [x]
- **File**: `pkg/resources/kafka/kafka_test.go`
- **What**:
1. Update all existing callers of `reconcileKafkaPvc` to pass the new `runningBrokers` parameter
2. Add test case: disk removal pending + broker pod missing → returns nil
3. Add test case: disk removal pending + all pods present → returns CruiseControlTaskRunning error
- **Details**: The new test cases should set up a KafkaCluster with a broker whose volume state is `GracefulDiskRemovalScheduled`, then call `reconcileKafkaPvc` with/without the broker in `runningBrokers`.

## Task 4: Verify [x]
- Run `go test ./pkg/resources/kafka/...`
- Run `make test` for full suite
- Run `go vet ./...` and `go build ./...`
33 changes: 25 additions & 8 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,6 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
brokersVolumes[strconv.Itoa(int(broker.Id))] = brokerVolumes
}
}
if len(brokersVolumes) > 0 {
err := r.reconcileKafkaPvc(ctx, log, brokersVolumes)
if err != nil {
return errors.WrapIfWithDetails(err, "failed to reconcile resource", "resources", "PersistentVolumeClaim")
}
}

var brokerPods corev1.PodList
matchingLabels := client.MatchingLabels(apiutil.LabelsForKafka(r.KafkaCluster.Name))
err = r.List(ctx, &brokerPods, client.ListOption(client.InNamespace(r.KafkaCluster.Namespace)), client.ListOption(matchingLabels))
Expand All @@ -342,6 +335,13 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
runningBrokers[brokerID] = struct{}{}
}

if len(brokersVolumes) > 0 {
err := r.reconcileKafkaPvc(ctx, log, brokersVolumes, runningBrokers)
if err != nil {
return errors.WrapIfWithDetails(err, "failed to reconcile resource", "resources", "PersistentVolumeClaim")
}
}

var pvcList corev1.PersistentVolumeClaimList
err = r.List(ctx, &pvcList, client.ListOption(client.InNamespace(r.KafkaCluster.Namespace)), client.ListOption(matchingLabels))
if err != nil {
Expand Down Expand Up @@ -1137,7 +1137,7 @@ func (r *Reconciler) isPodTainted(log logr.Logger, pod *corev1.Pod) bool {
}

//nolint:funlen
func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim) error {
func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim, runningBrokers map[string]struct{}) error {
brokersVolumesState := make(map[string]map[string]banzaiv1beta1.VolumeState)
var brokerIds []string
waitForDiskRemovalToFinish := false
Expand Down Expand Up @@ -1276,6 +1276,23 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro
}

if waitForDiskRemovalToFinish {
// Don't block if any broker with pending disk removal/rebalance has a missing pod.
// Blocking prevents pod recreation, creating a deadlock where CC can't
// complete the disk removal or rebalance because the broker isn't running.
for brokerId := range brokersDesiredPvcs {
if _, podExists := runningBrokers[brokerId]; !podExists {
if brokerState, ok := r.KafkaCluster.Status.BrokersState[brokerId]; ok {
for mountPath, volumeState := range brokerState.GracefulActionState.VolumeStates {
if volumeState.CruiseControlVolumeState.IsDiskRemoval() || volumeState.CruiseControlVolumeState.IsDiskRebalance() {
log.Info("Disk removal pending but broker pod is missing, "+
"allowing reconcile to proceed for pod recreation",
"brokerId", brokerId, "mountPath", mountPath)
return nil
}
}
}
}
}
return errorfactory.New(errorfactory.CruiseControlTaskRunning{}, errors.New("Disk removal pending"), "Disk removal pending")
}

Expand Down
106 changes: 105 additions & 1 deletion pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type PvcTestCase struct {
testName string
brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim
existingPvcs []*corev1.PersistentVolumeClaim
runningBrokers map[string]struct{}
kafkaClusterSpec v1beta1.KafkaClusterSpec
kafkaClusterStatus v1beta1.KafkaClusterStatus
expectedError bool
Expand Down Expand Up @@ -1002,6 +1003,7 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) {
createPvc("test-pvc-1", "0", "/path/to/mount1"),
createPvc("test-pvc-2", "0", "/path/to/mount2"),
},
runningBrokers: map[string]struct{}{"0": {}},
kafkaClusterStatus: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
Expand Down Expand Up @@ -1032,6 +1034,7 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) {
createPvc("test-pvc-1", "0", "/path/to/mount1"),
createPvc("test-pvc-2", "0", "/path/to/mount2"),
},
runningBrokers: map[string]struct{}{"0": {}},
kafkaClusterStatus: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
Expand Down Expand Up @@ -1062,6 +1065,7 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) {
createPvc("test-pvc-1", "0", "/path/to/mount1"),
createPvc("test-pvc-2", "0", "/path/to/mount2"),
},
runningBrokers: map[string]struct{}{"0": {}},
kafkaClusterStatus: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
Expand All @@ -1081,6 +1085,102 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) {
"/path/to/mount2": v1beta1.GracefulDiskRemovalRunning,
},
},
{
testName: "Disk removal pending but broker pod missing - allow reconcile to proceed",
brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{
"0": {
createPvc("test-pvc-1", "0", "/path/to/mount1"),
},
},
existingPvcs: []*corev1.PersistentVolumeClaim{
createPvc("test-pvc-1", "0", "/path/to/mount1"),
createPvc("test-pvc-2", "0", "/path/to/mount2"),
},
runningBrokers: map[string]struct{}{}, // broker pod is missing
kafkaClusterStatus: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/path/to/mount2": {
CruiseControlVolumeState: v1beta1.GracefulDiskRemovalScheduled,
},
},
},
},
},
},
expectedError: false,
expectedDeletePvc: false,
expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{
"/path/to/mount2": v1beta1.GracefulDiskRemovalScheduled,
},
},
{
testName: "Disk rebalance pending but broker pod missing - allow reconcile to proceed",
brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{
"0": {
createPvc("test-pvc-1", "0", "/path/to/mount1"),
},
},
existingPvcs: []*corev1.PersistentVolumeClaim{
createPvc("test-pvc-1", "0", "/path/to/mount1"),
createPvc("test-pvc-2", "0", "/path/to/mount2"),
},
runningBrokers: map[string]struct{}{}, // broker pod is missing
kafkaClusterStatus: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/path/to/mount2": {
CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceScheduled,
},
},
},
},
},
},
expectedError: false,
expectedDeletePvc: false,
expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{
"/path/to/mount2": v1beta1.GracefulDiskRebalanceScheduled,
},
},
{
testName: "Disk newly marked for removal with pod missing - allow reconcile to proceed",
brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{
"0": {
createPvc("test-pvc-1", "0", "/path/to/mount1"),
},
},
existingPvcs: []*corev1.PersistentVolumeClaim{
createPvc("test-pvc-1", "0", "/path/to/mount1"),
createPvc("test-pvc-2", "0", "/path/to/mount2"),
},
runningBrokers: map[string]struct{}{}, // broker pod is missing
kafkaClusterStatus: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/path/to/mount2": {
// GracefulDiskRebalanceSucceeded triggers the default case in
// handleDiskRemoval which marks GracefulDiskRemovalRequired.
// With pod missing, the override should still allow proceeding.
CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceSucceeded,
},
},
},
},
},
},
expectedError: false,
expectedDeletePvc: false,
expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{
"/path/to/mount2": v1beta1.GracefulDiskRemovalRequired,
},
},
{
testName: "If disk removal successful, do not return error and delete pvc and volume state",
brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{
Expand Down Expand Up @@ -1291,8 +1391,12 @@ func execPvcTest(t *testing.T, testCases []PvcTestCase) {
// Set up the r.KafkaCluster.Status with the provided test.kafkaClusterStatus
r.KafkaCluster.Status = test.kafkaClusterStatus

runningBrokers := test.runningBrokers
if runningBrokers == nil {
runningBrokers = make(map[string]struct{})
}
// Call the reconcileKafkaPvc function with the provided test.brokersDesiredPvcs
err := r.reconcileKafkaPvc(context.TODO(), logf.Log, test.brokersDesiredPvcs)
err := r.reconcileKafkaPvc(context.TODO(), logf.Log, test.brokersDesiredPvcs, runningBrokers)

// Test that the expected error is returned
if test.expectedError {
Expand Down
Loading