diff --git a/Makefile b/Makefile index 01f5ba493..3126de365 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ GOLANGCI_VERSION = 2.11.4 # renovate: datasource=github-releases depName=golangc LICENSEI_VERSION = 0.9.0 # renovate: datasource=github-releases depName=goph/licensei CONTROLLER_GEN_VERSION = v0.20.1 # renovate: datasource=github-releases depName=kubernetes-sigs/controller-tools ENVTEST_K8S_VERSION = 1.35.0 # renovate: datasource=github-releases depName=kubernetes-sigs/controller-tools extractVersion=^envtest-v(?.+)$ -SETUP_ENVTEST_VERSION := latest +SETUP_ENVTEST_VERSION := v0.0.0-20260311120938-7f576c06d187 # last commit with go 1.25 requirement; 598e330b (2026-03-31) bumped to go 1.26 ADDLICENSE_VERSION := 1.2.0 # renovate: datasource=github-releases depName=google/addlicense GOTEMPLATE_VERSION := 3.12.0 # renovate: datasource=github-releases depName=cznic/gotemplate MOCKGEN_VERSION := 0.6.0 # renovate: datasource=github-releases depName=uber-go/mock diff --git a/api/v1beta1/common_types.go b/api/v1beta1/common_types.go index 902967438..5e4e4f17d 100644 --- a/api/v1beta1/common_types.go +++ b/api/v1beta1/common_types.go @@ -222,6 +222,19 @@ type VolumeState struct { CruiseControlOperationReference *corev1.LocalObjectReference `json:"cruiseControlOperationReference,omitempty"` } +// TieredCacheVolumeState tracks the lifecycle state of a tiered storage cache PVC for a given mount path. +type TieredCacheVolumeState string + +const ( + // TieredCacheVolumeActive indicates the mount path is an active tiered storage cache volume + // with no resize operation in progress. + TieredCacheVolumeActive TieredCacheVolumeState = "active" + // TieredCacheVolumePendingDeletion indicates that the old cache PVC at this mount path is waiting + // to be deleted once the broker pod stops. A replacement PVC with the new desired size has + // already been created at the same mount path. + TieredCacheVolumePendingDeletion TieredCacheVolumeState = "pending-deletion" +) + // BrokerState holds information about broker state type BrokerState struct { // RackAwarenessState holds info about rack awareness status @@ -240,6 +253,12 @@ type BrokerState struct { Image string `json:"image,omitempty"` // Compressed data from broker configuration to restore broker pod in specific cases ConfigurationBackup string `json:"configurationBackup,omitempty"` + // TieredCacheVolumes tracks tiered storage cache PVC state for this broker, keyed by mount path. + // "active" means the PVC is a cache volume with no resize in progress. + // "pending-deletion" means a resize is in flight (old PVC waiting for pod to stop). + // Absent entry means the mount path is not a cache volume (or the PVC was removed). + // Written at PVC creation and updated through the resize lifecycle. + TieredCacheVolumes map[string]TieredCacheVolumeState `json:"tieredCacheVolumes,omitempty"` } const ( diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 0e03045fe..681ba483e 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -52,6 +52,12 @@ const ( // IsControllerNodeKey is used to identify if the kafka pod is a controller or broker_controller IsControllerNodeKey = "isControllerNode" + // TieredStorageCacheAnnotationKey marks a PVC as backing a Kafka tiered storage cache. + // Cache PVCs are excluded from log.dirs and Cruise Control capacity, and follow a + // delete-and-recreate path on shrink. Read sites must use this constant rather than the + // literal so a typo cannot silently misclassify a cache PVC as a regular log-dir volume. + TieredStorageCacheAnnotationKey = "tieredStorageCache" + // DefaultCruiseControlImage is the default CC image used when users don't specify it in CruiseControlConfig.Image DefaultCruiseControlImage = "adobe/cruise-control:3.0.3-adbe-20250804" // renovate: datasource=docker depName=adobe/cruise-control @@ -572,6 +578,12 @@ type StorageConfig struct { // the `pvcSpec` is used by default. // +optional EmptyDir *corev1.EmptyDirVolumeSource `json:"emptyDir,omitempty"` + + // TieredStorageCache indicates this storage is used for Kafka tiered storage cache + // (e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be + // excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir. + // +optional + TieredStorageCache bool `json:"tieredStorageCache,omitempty"` } // ListenersConfig defines the Kafka listener types @@ -1031,16 +1043,17 @@ func (bConfig *BrokerConfig) GetTerminationGracePeriod() int64 { } // GetStorageMountPaths returns a string with comma-separated storage mount paths that the broker uses +// for Kafka log.dirs. Tiered storage cache volumes are excluded. func (bConfig *BrokerConfig) GetStorageMountPaths() string { - var mountPaths string - for i, sc := range bConfig.StorageConfigs { - if i != len(bConfig.StorageConfigs)-1 { - mountPaths += sc.MountPath + "," - } else { - mountPaths += sc.MountPath + var mountPaths []string + for _, sc := range bConfig.StorageConfigs { + // Skip tiered storage cache volumes - they should not be in log.dirs + if sc.TieredStorageCache { + continue } + mountPaths = append(mountPaths, sc.MountPath) } - return mountPaths + return strings.Join(mountPaths, ",") } // GetNodeSelector returns the node selector for cruise control diff --git a/api/v1beta1/kafkacluster_types_test.go b/api/v1beta1/kafkacluster_types_test.go index 38a7be38e..10c3c5362 100644 --- a/api/v1beta1/kafkacluster_types_test.go +++ b/api/v1beta1/kafkacluster_types_test.go @@ -622,6 +622,24 @@ func TestGetStorageMountPaths(t *testing.T) { }, expectedMountPaths: "test-log-1,test-log-2,test-log-3,test-log-4,test-log-5", }, + { + testName: "BrokerConfig with tiered storage cache should exclude it from mount paths", + brokerConfig: &BrokerConfig{ + StorageConfigs: []StorageConfig{ + { + MountPath: "test-log-1", + }, + { + MountPath: "test-log-2", + }, + { + MountPath: "/tiered-storage-cache", + TieredStorageCache: true, + }, + }, + }, + expectedMountPaths: "test-log-1,test-log-2", + }, } for _, test := range testCases { diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index cef0ed5b3..5622b1775 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -210,6 +210,13 @@ func (in *BrokerState) DeepCopyInto(out *BrokerState) { *out = make(ExternalListenerConfigNames, len(*in)) copy(*out, *in) } + if in.TieredCacheVolumes != nil { + in, out := &in.TieredCacheVolumes, &out.TieredCacheVolumes + *out = make(map[string]TieredCacheVolumeState, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BrokerState. diff --git a/charts/kafka-operator/crds/kafkaclusters.yaml b/charts/kafka-operator/crds/kafkaclusters.yaml index ff4636057..040925fc2 100644 --- a/charts/kafka-operator/crds/kafkaclusters.yaml +++ b/charts/kafka-operator/crds/kafkaclusters.yaml @@ -5100,6 +5100,12 @@ spec: the PersistentVolume backing this claim. type: string type: object + tieredStorageCache: + description: |- + TieredStorageCache indicates this storage is used for Kafka tiered storage cache + (e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be + excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir. + type: boolean required: - mountPath type: object @@ -12124,6 +12130,12 @@ spec: to the PersistentVolume backing this claim. type: string type: object + tieredStorageCache: + description: |- + TieredStorageCache indicates this storage is used for Kafka tiered storage cache + (e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be + excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir. + type: boolean required: - mountPath type: object @@ -23893,6 +23905,18 @@ spec: description: RackAwarenessState holds info about rack awareness status type: string + tieredCacheVolumes: + additionalProperties: + description: TieredCacheVolumeState tracks the lifecycle state + of a tiered storage cache PVC for a given mount path. + type: string + description: |- + TieredCacheVolumes tracks tiered storage cache PVC state for this broker, keyed by mount path. + "active" means the PVC is a cache volume with no resize in progress. + "pending-deletion" means a resize is in flight (old PVC waiting for pod to stop). + Absent entry means the mount path is not a cache volume (or the PVC was removed). + Written at PVC creation and updated through the resize lifecycle. + type: object version: description: Version holds the current version of the broker in semver format diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index ff4636057..040925fc2 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -5100,6 +5100,12 @@ spec: the PersistentVolume backing this claim. type: string type: object + tieredStorageCache: + description: |- + TieredStorageCache indicates this storage is used for Kafka tiered storage cache + (e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be + excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir. + type: boolean required: - mountPath type: object @@ -12124,6 +12130,12 @@ spec: to the PersistentVolume backing this claim. type: string type: object + tieredStorageCache: + description: |- + TieredStorageCache indicates this storage is used for Kafka tiered storage cache + (e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be + excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir. + type: boolean required: - mountPath type: object @@ -23893,6 +23905,18 @@ spec: description: RackAwarenessState holds info about rack awareness status type: string + tieredCacheVolumes: + additionalProperties: + description: TieredCacheVolumeState tracks the lifecycle state + of a tiered storage cache PVC for a given mount path. + type: string + description: |- + TieredCacheVolumes tracks tiered storage cache PVC state for this broker, keyed by mount path. + "active" means the PVC is a cache volume with no resize in progress. + "pending-deletion" means a resize is in flight (old PVC waiting for pod to stop). + Absent entry means the mount path is not a cache volume (or the PVC was removed). + Written at PVC creation and updated through the resize lifecycle. + type: object version: description: Version holds the current version of the broker in semver format diff --git a/config/samples/kafkacluster_tiered_storage_cache_resize_initial.yaml b/config/samples/kafkacluster_tiered_storage_cache_resize_initial.yaml new file mode 100644 index 000000000..39ec42a3f --- /dev/null +++ b/config/samples/kafkacluster_tiered_storage_cache_resize_initial.yaml @@ -0,0 +1,104 @@ +apiVersion: kafka.banzaicloud.io/v1beta1 +kind: KafkaCluster +metadata: + name: kafka-ts-resize + namespace: kafka +spec: + kRaft: true + headlessServiceEnabled: true + oneBrokerPerNode: false + clusterImage: "ghcr.io/adobe/koperator/kafka:2.13-3.9.1" + readOnlyConfig: | + auto.create.topics.enable=false + cruise.control.metrics.topic.auto.create=true + cruise.control.metrics.topic.num.partitions=1 + cruise.control.metrics.topic.replication.factor=1 + rollingUpgradeConfig: + failureThreshold: 1 + cruiseControlConfig: + topicConfig: + partitions: 1 + replicationFactor: 2 + brokers: + - id: 0 + brokerConfig: + processRoles: + - broker + terminationGracePeriodSeconds: 10 + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi + - mountPath: "/tiered-storage-cache" + tieredStorageCache: true + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi + resourceRequirements: + limits: + cpu: "1" + memory: 2Gi + requests: + cpu: "200m" + memory: 1Gi + # Second broker: present only to satisfy the CRD's minimum replicationFactor=2 constraint + # for the Cruise Control coordination topic. Carries no tiered storage cache PVC and is + # not the broker under test (tsResizeBrokerID=0). + - id: 1 + brokerConfig: + processRoles: + - broker + terminationGracePeriodSeconds: 10 + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi + resourceRequirements: + limits: + cpu: "1" + memory: 2Gi + requests: + cpu: "200m" + memory: 1Gi + - id: 2 + brokerConfig: + processRoles: + - controller + terminationGracePeriodSeconds: 10 + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi + resourceRequirements: + limits: + cpu: "500m" + memory: 1Gi + requests: + cpu: "100m" + memory: 512Mi + listenersConfig: + internalListeners: + - type: "plaintext" + name: "internal" + containerPort: 29092 + usedForInnerBrokerCommunication: true + - type: "plaintext" + name: "controller" + containerPort: 29093 + usedForInnerBrokerCommunication: false + usedForControllerCommunication: true diff --git a/config/samples/kafkacluster_tiered_storage_cache_resize_shrunk.yaml b/config/samples/kafkacluster_tiered_storage_cache_resize_shrunk.yaml new file mode 100644 index 000000000..16b966a3d --- /dev/null +++ b/config/samples/kafkacluster_tiered_storage_cache_resize_shrunk.yaml @@ -0,0 +1,104 @@ +apiVersion: kafka.banzaicloud.io/v1beta1 +kind: KafkaCluster +metadata: + name: kafka-ts-resize + namespace: kafka +spec: + kRaft: true + headlessServiceEnabled: true + oneBrokerPerNode: false + clusterImage: "ghcr.io/adobe/koperator/kafka:2.13-3.9.1" + readOnlyConfig: | + auto.create.topics.enable=false + cruise.control.metrics.topic.auto.create=true + cruise.control.metrics.topic.num.partitions=1 + cruise.control.metrics.topic.replication.factor=1 + rollingUpgradeConfig: + failureThreshold: 1 + cruiseControlConfig: + topicConfig: + partitions: 1 + replicationFactor: 2 + brokers: + - id: 0 + brokerConfig: + processRoles: + - broker + terminationGracePeriodSeconds: 10 + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi + - mountPath: "/tiered-storage-cache" + tieredStorageCache: true + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi # Reduced from 2Gi to trigger delete-and-recreate + resourceRequirements: + limits: + cpu: "1" + memory: 2Gi + requests: + cpu: "200m" + memory: 1Gi + # Second broker: present only to satisfy the CRD's minimum replicationFactor=2 constraint + # for the Cruise Control coordination topic. Carries no tiered storage cache PVC and is + # not the broker under test (tsResizeBrokerID=0). + - id: 1 + brokerConfig: + processRoles: + - broker + terminationGracePeriodSeconds: 10 + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi + resourceRequirements: + limits: + cpu: "1" + memory: 2Gi + requests: + cpu: "200m" + memory: 1Gi + - id: 2 + brokerConfig: + processRoles: + - controller + terminationGracePeriodSeconds: 10 + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi + resourceRequirements: + limits: + cpu: "500m" + memory: 1Gi + requests: + cpu: "100m" + memory: 512Mi + listenersConfig: + internalListeners: + - type: "plaintext" + name: "internal" + containerPort: 29092 + usedForInnerBrokerCommunication: true + - type: "plaintext" + name: "controller" + containerPort: 29093 + usedForInnerBrokerCommunication: false + usedForControllerCommunication: true diff --git a/docs/tiered-storage-pvc-resize.md b/docs/tiered-storage-pvc-resize.md new file mode 100644 index 000000000..59d5e50b2 --- /dev/null +++ b/docs/tiered-storage-pvc-resize.md @@ -0,0 +1,118 @@ +# Tiered Storage Cache PVC Resize + +Kubernetes does not support shrinking a PVC in-place. Because tiered storage cache +data is ephemeral (repopulated from remote storage on broker restart), koperator +implements a **delete-and-recreate** strategy for cache PVC shrinks, coordinated +with the rolling upgrade machinery so only one broker is affected at a time. + +--- + +## State tracking + +Resize state is stored in the `KafkaCluster` CR status under +`status.brokersState[].tieredCacheVolumes`, keyed by mount path. +This keeps the KafkaCluster CR the single source of truth for all in-flight +broker operations and avoids a second, parallel state store on PVC objects. + +| Field | Value | Meaning | +|-------|-------|---------| +| `status.brokersState[N].tieredCacheVolumes[]` | `active` | The mount path is a tiered storage cache volume. No resize is in progress. | +| `status.brokersState[N].tieredCacheVolumes[]` | `pending-deletion` | A resize is in flight for this mount path. The old PVC (larger size) is waiting to be deleted once the broker pod stops; the replacement PVC (desired smaller size) has already been created. | +| *(absent)* | — | The mount path is not a tiered storage cache volume (or the PVC has been removed). | + +The entry transitions from `pending-deletion` back to `active` once the old PVC +has been deleted and the resize is complete. An absent entry means no cache PVC +exists at that path. + +Two PVC annotations that describe what a PVC **is** (not operational state) are +always present on cache PVCs: + +| Annotation | Value | Purpose | +|------------|-------|---------| +| `mountPath` | `` | Used throughout reconcile logic to match PVCs to storage configs | +| `tieredStorageCache` | `"true"` | Identifies cache PVCs for special handling: skipped from `log.dirs` and CC capacity config | + +--- + +## Resize flow + +### Cycle N — resize detected, pod running + +1. `status.brokersState[N].tieredCacheVolumes[]` is set to `pending-deletion` + in the KafkaCluster CR status. This is the durable record that a resize is in flight. +2. A replacement PVC with the new (smaller) size is created. Provisioning starts immediately. +3. The broker's `ConfigurationState` is set to `ConfigOutOfSync` to trigger a rolling restart + via `handleRollingUpgrade`. +4. `handleRollingUpgrade` evaluates health gates (replica health, concurrent restart limit, + rack awareness). If all pass the broker pod is deleted and the cycle requeues. If any gate + fails the state persists in the CR and is retried next cycle. + +### Cycle N+1 — pod is absent + +A pod is considered absent when it either does not exist or has a non-nil +`DeletionTimestamp` (Terminating). Treating a Terminating pod as absent allows +cleanup to start during the pod's Terminating window rather than waiting for it +to fully disappear from etcd. + +1. The old PVC (the one whose size differs from the desired size at that mount path) + is deleted. +2. The `tieredCacheVolumes` entry for that mount path is set to `active` in the CR status. +3. A new broker pod is created referencing the replacement PVC. Because provisioning + started in cycle N the PVC is likely already `Bound`, minimising startup latency. + +### Cycle N+2 — pod is present again + +1. No `pending-deletion` entry remains for the mount path → resize is complete. +2. The replacement PVC is now an ordinary cache PVC with `tieredCacheVolumes[] = active`. + +--- + +## Grow vs shrink + +A cache PVC **grow** takes the normal Kubernetes in-place expansion path: the PVC +spec is updated with the larger size and Kubernetes expands the volume without a +pod restart (requires `allowVolumeExpansion: true` on the StorageClass). No +`tieredCacheVolumes` state change is made and no rolling restart is triggered. + +A cache PVC **shrink** uses the delete-and-recreate flow described above. +Shrinking is only supported for tiered storage cache volumes — regular Kafka log +volumes reject any size decrease with an error. + +--- + +## Properties of this design + +| Property | Value | +|----------|-------| +| State survives reconciler crash | Yes — `tieredCacheVolumes` is written to the KafkaCluster CR (etcd) before the replacement PVC is created; every step is re-entrant | +| Single source of truth | Yes — all broker state (configuration, graceful actions, cache resize) lives in `status.brokersState` | +| Atomicity gap | Eliminated — replacement PVC is created before old is deleted | +| Provisioning overlaps gate evaluation | Yes — replacement PVC created in cycle N, not N+1 | +| Observable via kubectl | Yes — `kubectl get kafkacluster -o jsonpath='{.status.brokersState}'` shows resize state; `pending-deletion` entries indicate an in-flight resize | +| CC disk rebalance for cache PVCs | Excluded — tiered cache PVCs are explicitly skipped from `GracefulDiskRebalanceRequired` and CC capacity config | +| `log.dirs` for cache PVCs | Excluded — `generateStorageConfig` skips volumes with `TieredStorageCache: true` | + +--- + +## Sequence diagram + +``` +Cycle N (pod UP, resize detected) + ├─ set tieredCacheVolumes[mountPath] = pending-deletion in CR status + ├─ create replacement PVC (provisioning starts) + ├─ set ConfigOutOfSync + └─ handleRollingUpgrade + ├─ [gates fail] → requeue 15s, retry next cycle + └─ [gates pass] → delete pod → requeue 15s + +Cycle N+k (pod UP, gates failing — any number of cycles) + └─ ensure ConfigOutOfSync, requeue + +Cycle N+k+1 (pod ABSENT — gone or Terminating) + ├─ delete old PVC (identified as the PVC at mountPath whose size ≠ desired) + ├─ set tieredCacheVolumes[mountPath] = active in CR status + └─ create new pod bound to replacement PVC + +Cycle N+k+2 (pod PRESENT — non-Terminating, not necessarily Running) + └─ tieredCacheVolumes[mountPath] = active → resize complete, no further action +``` diff --git a/pkg/k8sutil/status.go b/pkg/k8sutil/status.go index 4836cb3ab..f287727ed 100644 --- a/pkg/k8sutil/status.go +++ b/pkg/k8sutil/status.go @@ -174,6 +174,17 @@ func generateBrokerState(brokerIDs []string, cluster *banzaicloudv1beta1.KafkaCl case banzaicloudv1beta1.KafkaVersion: brokerState.Image = s.Image brokerState.Version = s.Version + case map[string]banzaicloudv1beta1.TieredCacheVolumeState: + if brokerState.TieredCacheVolumes == nil { + brokerState.TieredCacheVolumes = make(map[string]banzaicloudv1beta1.TieredCacheVolumeState) + } + for mountPath, state := range s { + if state == "" { + delete(brokerState.TieredCacheVolumes, mountPath) + } else { + brokerState.TieredCacheVolumes[mountPath] = state + } + } } brokersState[brokerID] = brokerState } diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index 20735de89..10d079d2f 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -346,6 +346,13 @@ func generateBrokerDisks(brokerState v1beta1.Broker, kafkaClusterSpec v1beta1.Ka // Generate log dir configuration logDirs := make(map[string]string, len(storageConfigs)) for path, conf := range storageConfigs { + // Skip tiered storage cache volumes - they should not be included in Cruise Control capacity + if conf.TieredStorageCache { + log.V(1).Info("skipping tiered storage cache volume from Cruise Control capacity", + v1beta1.BrokerIdLabelKey, brokerState.Id, "mountPath", path) + continue + } + size := parseMountPathWithSize(conf) log.V(1).Info(fmt.Sprintf("broker log.dir %s size in MB: %d", path, size), v1beta1.BrokerIdLabelKey, brokerState.Id) diff --git a/pkg/resources/cruisecontrol/configmap_test.go b/pkg/resources/cruisecontrol/configmap_test.go index 89ec08f27..5e2420ed5 100644 --- a/pkg/resources/cruisecontrol/configmap_test.go +++ b/pkg/resources/cruisecontrol/configmap_test.go @@ -1017,4 +1017,105 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { } }) } + + // Test tiered storage cache exclusion from capacity + t.Run("tiered storage cache should be excluded from capacity", func(t *testing.T) { + quantity, _ := resource.ParseQuantity("100Gi") + tieredCacheQuantity, _ := resource.ParseQuantity("50Gi") + cpuQuantity, _ := resource.ParseQuantity("2000m") + + kafkaCluster := v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + { + Id: 0, + BrokerConfig: &v1beta1.BrokerConfig{ + Resources: &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": cpuQuantity, + }, + }, + StorageConfigs: []v1beta1.StorageConfig{ + { + MountPath: "/kafka-logs", + PvcSpec: &v1.PersistentVolumeClaimSpec{ + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: quantity, + }, + }, + }, + }, + { + MountPath: "/tiered-storage-cache", + TieredStorageCache: true, + PvcSpec: &v1.PersistentVolumeClaimSpec{ + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: tieredCacheQuantity, + }, + }, + }, + }, + }, + }, + }, + }, + }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": {}, + }, + }, + } + + expectedConfiguration := `{ + "brokerCapacities": [ + { + "brokerId": "0", + "capacity": { + "DISK": { + "/kafka-logs/kafka": "107374" + }, + "CPU": "200", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + } + ] +}` + + var actual JBODInvariantCapacityConfig + rawStringActual, err := GenerateCapacityConfig(&kafkaCluster, logr.Discard(), nil) + if err != nil { + t.Error(err, "could not generate capacity config") + } + err = json.Unmarshal([]byte(rawStringActual), &actual) + if err != nil { + t.Error(err, "could not unmarshal actual json") + } + + var expected JBODInvariantCapacityConfig + err = json.Unmarshal([]byte(expectedConfiguration), &expected) + if err != nil { + t.Error(err, "could not unmarshal expected json") + } + + if !reflect.DeepEqual(actual, expected) { + t.Errorf("Expected: %+v, got: %+v", expected, actual) + } + + // Verify that tiered storage cache is NOT in the capacity + actualCapacity := actual.Capacities[0].(map[string]interface{}) + diskCapacity := actualCapacity["capacity"].(map[string]interface{})["DISK"].(map[string]interface{}) + + if _, exists := diskCapacity["/tiered-storage-cache/kafka"]; exists { + t.Error("Tiered storage cache should not be in Cruise Control capacity") + } + + if _, exists := diskCapacity["/kafka-logs/kafka"]; !exists { + t.Error("Regular storage should be in Cruise Control capacity") + } + }) } diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 6e1874a61..00282f7d2 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -245,14 +245,14 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf // this is to support the zk to kRaft migration func shouldUseKRaftModeForBroker(brokerReadOnlyConfig *properties.Properties) bool { migrationBrokerKRaftMode, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerKRaftMode) - return !found || migrationBrokerKRaftMode.Value() == "true" + return !found || migrationBrokerKRaftMode.Value() == annotationTrue } // Returns true by default (not in migration) OR when MigrationBrokerControllerQuorumConfigEnabled is set and 'true'. // this is to support the zk to kRaft migration func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.Properties) bool { migrationBrokerControllerQuorumConfigEnabled, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerControllerQuorumConfigEnabled) - return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true" + return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == annotationTrue } func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, extListenerStatuses, intListenerStatuses, @@ -411,6 +411,10 @@ func getMountPathsFromBrokerConfigMap(configMap *corev1.ConfigMap) ([]string, er func generateStorageConfig(sConfig []v1beta1.StorageConfig) []string { mountPaths := make([]string, 0, len(sConfig)) for _, storage := range sConfig { + // Tiered storage cache volumes are not Kafka log dirs — exclude them from log.dirs. + if storage.TieredStorageCache { + continue + } mountPaths = append(mountPaths, util.StorageConfigKafkaMountPath(storage.MountPath)) } return mountPaths diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 9aa91808f..00301705d 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -68,6 +68,9 @@ const ( brokerConfigTemplate = "%s-config" brokerStorageTemplate = "%s-%d-storage-%d-" + // annotationTrue is the string value used for boolean-true annotations and config comparisons. + annotationTrue = "true" + brokerConfigMapVolumeMount = "broker-config" kafkaDataVolumeMount = "kafka-data" @@ -140,7 +143,9 @@ func getCreatedPvcForBroker( c client.Reader, brokerID int32, storageConfigs []banzaiv1beta1.StorageConfig, - namespace, crName string) ([]corev1.PersistentVolumeClaim, error) { + namespace, crName string, + pendingDeletionMountPaths map[string]bool, +) ([]corev1.PersistentVolumeClaim, error) { foundPvcList := &corev1.PersistentVolumeClaimList{} matchingLabels := client.MatchingLabels( apiutil.MergeLabels( @@ -153,6 +158,30 @@ func getCreatedPvcForBroker( return nil, err } + // Filter the PVC list: + // 1. Always exclude terminating PVCs (DeletionTimestamp set) — they are released from pods + // and should not be mounted by the new broker pod. + // 2. For mount paths with an in-flight cache resize (pendingDeletionMountPaths), two PVCs + // temporarily coexist (old + replacement). Keep only one so the pod spec has a unique + // mount path (the reconciler will use the correctly-sized one during PVC reconciliation). + seenMountPaths := make(map[string]bool) + n := 0 + for _, pvc := range foundPvcList.Items { + if pvc.DeletionTimestamp != nil { + continue + } + mp := pvc.Annotations["mountPath"] + if pendingDeletionMountPaths[mp] { + if seenMountPaths[mp] { + continue + } + seenMountPaths[mp] = true + } + foundPvcList.Items[n] = pvc + n++ + } + foundPvcList.Items = foundPvcList.Items[:n] + var missing []string for i := range storageConfigs { if storageConfigs[i].PvcSpec == nil { @@ -312,6 +341,12 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if storage.PvcSpec == nil { continue } + // Tiered storage cache PVCs are only meaningful on broker nodes. + // Controller-only nodes do not serve Kafka client traffic or hold tiered-storage + // segment data, so skip cache PVCs for them. + if storage.TieredStorageCache && brokerConfig.IsControllerNode() && !brokerConfig.IsBrokerNode() { + continue + } o, err := r.pvc(broker.Id, index, storage) if err != nil { return errors.WrapIfWithDetails(err, "failed to generate resource", "resources", "PersistentVolumeClaim") @@ -432,7 +467,14 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { } } - pvcs, err := getCreatedPvcForBroker(ctx, r.Client, broker.Id, brokerConfig.StorageConfigs, r.KafkaCluster.Namespace, r.KafkaCluster.Name) + pendingDeletionMountPaths := make(map[string]bool) + brokerIdStr := strconv.Itoa(int(broker.Id)) + for mp, state := range r.KafkaCluster.Status.BrokersState[brokerIdStr].TieredCacheVolumes { + if state == banzaiv1beta1.TieredCacheVolumePendingDeletion { + pendingDeletionMountPaths[mp] = true + } + } + pvcs, err := getCreatedPvcForBroker(ctx, r.Client, broker.Id, brokerConfig.StorageConfigs, r.KafkaCluster.Namespace, r.KafkaCluster.Name, pendingDeletionMountPaths) if err != nil { return errors.WrapIfWithDetails(err, "failed to list PVC's") } @@ -1136,7 +1178,6 @@ func (r *Reconciler) isPodTainted(log logr.Logger, pod *corev1.Pod) bool { return selector.Matches(labels.Set(pod.Labels)) } -//nolint:funlen func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim) error { brokersVolumesState := make(map[string]map[string]banzaiv1beta1.VolumeState) var brokerIds []string @@ -1145,21 +1186,16 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro for brokerId, desiredPvcs := range brokersDesiredPvcs { desiredType := reflect.TypeOf(&corev1.PersistentVolumeClaim{}) brokerVolumesState := make(map[string]banzaiv1beta1.VolumeState) - pvcList := &corev1.PersistentVolumeClaimList{} - matchingLabels := client.MatchingLabels( apiutil.MergeLabels( apiutil.LabelsForKafka(r.KafkaCluster.Name), map[string]string{banzaiv1beta1.BrokerIdLabelKey: brokerId}, ), ) - log = log.WithValues("kind", desiredType) - err := r.List(ctx, pvcList, - client.InNamespace(r.KafkaCluster.GetNamespace()), matchingLabels) - if err != nil { + if err := r.List(ctx, pvcList, client.InNamespace(r.KafkaCluster.GetNamespace()), matchingLabels); err != nil { return errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType) } @@ -1167,7 +1203,6 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro if err != nil { return errors.WrapIfWithDetails(err, "could not determine if broker is controller", "brokerId", brokerId) } - if isController { if len(desiredPvcs) != 1 { return errors.New("controller broker can have only one volume") @@ -1179,106 +1214,591 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro } } - // Handle disk removal - if len(pvcList.Items) > len(desiredPvcs) { + // Resolve pod existence once per broker — used by both cleanup and resize logic below. + // A pod with a non-nil DeletionTimestamp is Terminating: its containers have stopped (or + // are stopping) and any PVCs it mounted will be released once the pod is fully gone. + // Treating a Terminating pod as "not existing" lets the pending-deletion PVC be cleaned up + // during the Terminating window rather than waiting for the narrow gap between the pod being + // fully removed from etcd and the new pod being created in the same reconcile cycle. + brokerPodExists, err := r.getBrokerPodExists(ctx, brokerId) + if err != nil { + return err + } + + desiredMountPaths := make(map[string]bool, len(desiredPvcs)) + for _, dp := range desiredPvcs { + desiredMountPaths[dp.Annotations["mountPath"]] = true + } + + skipBroker, err := r.handleBrokerCacheResizeCleanup(ctx, log, brokerId, pvcList, desiredPvcs, desiredMountPaths, brokerPodExists, matchingLabels) + if err != nil { + return err + } + if skipBroker { + continue + } + + // Only delete cache PVCs when the broker pod is gone — issuing Delete against a mounted + // PVC sets the DeletionTimestamp and is held by the pvc-protection finalizer, but the + // intent is wrong and the resulting PVC state confuses subsequent reconciles. + if !brokerPodExists { + if err := r.deleteRemovedCachePVCs(ctx, log, brokerId, pvcList, desiredMountPaths); err != nil { + return err + } + } + + // Re-list so the disk removal count below reflects the deletions above. + if err := r.List(ctx, pvcList, client.InNamespace(r.KafkaCluster.GetNamespace()), matchingLabels); err != nil { + return errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType) + } + + if r.effectivePvcCount(brokerId, pvcList) > len(desiredPvcs) { waitForDiskRemovalToFinish, err = handleDiskRemoval(ctx, pvcList, desiredPvcs, r, brokerId, log, desiredType, brokerVolumesState) if err != nil { return err } } - for _, desiredPvc := range desiredPvcs { - currentPvc := desiredPvc.DeepCopy() - log.V(1).Info("searching with label because name is empty") + if err := r.reconcileDesiredPvcsForBroker(ctx, log, brokerId, desiredPvcs, pvcList, matchingLabels, desiredType, brokerVolumesState); err != nil { + return err + } - err := r.List(ctx, pvcList, - client.InNamespace(r.KafkaCluster.GetNamespace()), matchingLabels) - if err != nil { - return errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType) + if len(brokerVolumesState) > 0 { + brokerIds = append(brokerIds, brokerId) + brokersVolumesState[brokerId] = brokerVolumesState + } + } + + if len(brokersVolumesState) > 0 { + if err := k8sutil.UpdateBrokerStatus(r.Client, brokerIds, r.KafkaCluster, brokersVolumesState, log); err != nil { + return err + } + } + + if waitForDiskRemovalToFinish { + return errorfactory.New(errorfactory.CruiseControlTaskRunning{}, errors.New("Disk removal pending"), "Disk removal pending") + } + + return nil +} + +// isBrokerPVCTieredCache reports whether the given PVC is a tiered storage cache volume, +// using status.BrokersState as the authoritative source of truth (written at PVC creation). +// Falls back to the PVC annotation for PVCs created before TieredCacheVolumes was introduced. +func (r *Reconciler) isBrokerPVCTieredCache(pvc *corev1.PersistentVolumeClaim, brokerId string) bool { + mountPath := pvc.Annotations["mountPath"] + if state := r.KafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes[mountPath]; state != "" { + return true + } + return pvc.Annotations[banzaiv1beta1.TieredStorageCacheAnnotationKey] == annotationTrue +} + +// deleteRemovedCachePVCs deletes tiered storage cache PVCs whose mount path is no longer desired +// and clears their status entries. These volumes are not Kafka log dirs and must never go through +// CC disk removal. +func (r *Reconciler) deleteRemovedCachePVCs( + ctx context.Context, + log logr.Logger, + brokerId string, + pvcList *corev1.PersistentVolumeClaimList, + desiredMountPaths map[string]bool, +) error { + // pathsToUntrack accumulates mount paths whose status entry must be cleared. + // Collected in one pass so a single UpdateBrokerStatus call covers all of them. + pathsToUntrack := make(map[string]banzaiv1beta1.TieredCacheVolumeState) + + for i := range pvcList.Items { + pvc := &pvcList.Items[i] + if !r.isBrokerPVCTieredCache(pvc, brokerId) { + continue + } + mountPath := pvc.Annotations["mountPath"] + if desiredMountPaths[mountPath] { + continue + } + // Issue Delete only if the PVC is not already terminating. + if pvc.DeletionTimestamp == nil { + log.Info("Deleting removed tiered storage cache PVC", + "brokerId", brokerId, "mountPath", mountPath, "pvc", pvc.Name) + if err := r.Delete(ctx, pvc); err != nil && !apierrors.IsNotFound(err) { + return errorfactory.New(errorfactory.APIFailure{}, err, + "deleting removed tiered storage cache PVC failed", "pvc", pvc.Name) } + } + // Clear status regardless of PVC termination state: once deletion has been + // requested (DeletionTimestamp set), the status entry is no longer needed. + pathsToUntrack[mountPath] = "" + } - mountPath := currentPvc.Annotations["mountPath"] - // Creating the first PersistentVolume For Pod - if len(pvcList.Items) == 0 { - if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPvc); err != nil { - return errors.WrapIf(err, "could not apply last state to annotation") - } - if err := r.Create(ctx, desiredPvc); err != nil { - return errorfactory.New(errorfactory.APIFailure{}, err, "creating resource failed", "kind", desiredType) - } - log.Info("resource created") - continue + // Second pass: clear status entries for mount paths that are no longer desired + // but whose PVC is already fully finalized and absent from the list. This handles + // the retry path where a prior cycle deleted the PVC but failed to clear status. + for mountPath, state := range r.KafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes { + if state == "" || desiredMountPaths[mountPath] { + continue + } + if _, alreadyHandled := pathsToUntrack[mountPath]; !alreadyHandled { + log.Info("Clearing stale tiered cache volume status for already-deleted mount path", + "brokerId", brokerId, "mountPath", mountPath) + pathsToUntrack[mountPath] = "" + } + } + + if len(pathsToUntrack) == 0 { + return nil + } + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{brokerId}, r.KafkaCluster, + pathsToUntrack, log); err != nil { + return errorfactory.New(errorfactory.StatusUpdateError{}, err, + "could not clear tiered cache volume state after PVC deletion", "brokerId", brokerId) + } + return nil +} + +// effectivePvcCount returns the number of PVCs for a broker, counting each pending-deletion cache +// mount path only once (old + replacement coexist during a shrink but represent one logical disk). +func (r *Reconciler) effectivePvcCount(brokerId string, pvcList *corev1.PersistentVolumeClaimList) int { + tieredCacheVolumes := r.KafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes + counted := make(map[string]bool) + total := 0 + for _, pvc := range pvcList.Items { + mp := pvc.Annotations["mountPath"] + if tieredCacheVolumes[mp] == banzaiv1beta1.TieredCacheVolumePendingDeletion { + if !counted[mp] { + counted[mp] = true + total++ } + } else { + total++ + } + } + return total +} + +// getBrokerPodExists returns true if a non-terminating pod exists for the given broker. +func (r *Reconciler) getBrokerPodExists(ctx context.Context, brokerId string) (bool, error) { + brokerPodList := &corev1.PodList{} + matchingLabels := client.MatchingLabels( + apiutil.MergeLabels( + apiutil.LabelsForKafka(r.KafkaCluster.Name), + map[string]string{banzaiv1beta1.BrokerIdLabelKey: brokerId}, + ), + ) + if err := r.List(ctx, brokerPodList, client.InNamespace(r.KafkaCluster.GetNamespace()), matchingLabels); err != nil { + return false, errorfactory.New(errorfactory.APIFailure{}, err, "getting broker pods failed", "brokerId", brokerId) + } + for i := range brokerPodList.Items { + if brokerPodList.Items[i].DeletionTimestamp == nil { + return true, nil + } + } + return false, nil +} + +// handleBrokerCacheResizeCleanup manages tiered storage cache resize PVC cleanup based on broker pod state. +// It returns true when the caller should skip the rest of the reconcile loop for this broker (orphaned resize +// state detected while the pod is running — a rolling restart must complete first). +// +//nolint:funlen,gocyclo +func (r *Reconciler) handleBrokerCacheResizeCleanup( + ctx context.Context, + log logr.Logger, + brokerId string, + pvcList *corev1.PersistentVolumeClaimList, + desiredPvcs []*corev1.PersistentVolumeClaim, + desiredMountPaths map[string]bool, + brokerPodExists bool, + matchingLabels client.MatchingLabels, +) (bool, error) { + tieredCacheVolumes := r.KafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes + hasPendingDeletion := false + for _, state := range tieredCacheVolumes { + if state == banzaiv1beta1.TieredCacheVolumePendingDeletion { + hasPendingDeletion = true + break + } + } + if !hasPendingDeletion { + return false, nil + } - alreadyCreated := false + desiredType := reflect.TypeOf(&corev1.PersistentVolumeClaim{}) + + // Build desired size per mount path for identifying the old PVC during cleanup. + desiredSizeByMountPath := make(map[string]int64, len(desiredPvcs)) + for _, dp := range desiredPvcs { + if s := dp.Spec.Resources.Requests.Storage(); s != nil { + desiredSizeByMountPath[dp.Annotations["mountPath"]] = s.Value() + } + } + + // stateUpdates accumulates TieredCacheVolumes changes to apply in a single status update. + // An empty TieredCacheVolumeState value means "delete this entry from the map". + stateUpdates := make(map[string]banzaiv1beta1.TieredCacheVolumeState) + + if brokerPodExists { + // Resize complete when the replacement PVC (size == desired) already exists at the mount + // path and no old PVC (size != desired) remains. Using a size-based check instead of a + // count-based one makes the logic crash-safe: if the reconciler crashes between writing + // pending-deletion state and creating the replacement PVC, the next cycle sees only the + // old PVC (size != desired) and correctly does NOT clear state, letting + // reconcileDesiredPvcsForBroker re-create the replacement. + for mp, state := range tieredCacheVolumes { + if state != banzaiv1beta1.TieredCacheVolumePendingDeletion { + continue + } + desiredSize := desiredSizeByMountPath[mp] + replacementExists := false + oldPvcExists := false for _, pvc := range pvcList.Items { - if mountPath == pvc.Annotations["mountPath"] { - currentPvc = pvc.DeepCopy() - alreadyCreated = true - // Checking pvc state, if bounded, so the broker has already restarted and the CC GracefulDiskRebalance has not happened yet, - // then we make it happening with status update. - // If disk removal was set, and the disk was added back, we also need to mark the volume for rebalance - volumeState, found := r.KafkaCluster.Status.BrokersState[brokerId].GracefulActionState.VolumeStates[mountPath] - if currentPvc.Status.Phase == corev1.ClaimBound && - (!found || volumeState.CruiseControlVolumeState.IsDiskRemoval()) { - brokerVolumesState[mountPath] = banzaiv1beta1.VolumeState{CruiseControlVolumeState: banzaiv1beta1.GracefulDiskRebalanceRequired} + if pvc.Annotations["mountPath"] != mp { + continue + } + if pvcSize := pvc.Spec.Resources.Requests.Storage(); pvcSize != nil { + if pvcSize.Value() == desiredSize { + replacementExists = true + } else { + oldPvcExists = true } - break } } - - if !alreadyCreated { - // Creating the 2+ PersistentVolumes for Pod - if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPvc); err != nil { - return errors.WrapIf(err, "could not apply last state to annotation") + if replacementExists && !oldPvcExists { + stateUpdates[mp] = banzaiv1beta1.TieredCacheVolumeActive + log.Info("Tiered storage cache PVC resize complete — clearing resize state", + "brokerId", brokerId, "mountPath", mp) + } + } + } else { + // Broker pod is down — safe to delete old PVCs and clean up orphaned state. + for mp, state := range tieredCacheVolumes { + if state != banzaiv1beta1.TieredCacheVolumePendingDeletion { + continue + } + if !desiredMountPaths[mp] { + // Mount path is no longer desired: delete all PVCs at this path (old + replacement). + for i := range pvcList.Items { + pvc := &pvcList.Items[i] + if pvc.Annotations["mountPath"] != mp { + continue + } + log.Info("Broker pod is down — deleting orphaned cache resize PVC", + "brokerId", brokerId, "mountPath", mp, "pvc", pvc.Name) + if err := r.Delete(ctx, pvc); err != nil && !apierrors.IsNotFound(err) { + return false, errorfactory.New(errorfactory.APIFailure{}, err, + "deleting orphaned cache resize PVC failed", "pvc", pvc.Name) + } } - if err := r.Create(ctx, desiredPvc); err != nil { - return errorfactory.New(errorfactory.APIFailure{}, err, "creating resource failed", "kind", desiredType) + stateUpdates[mp] = "" // entry removed: volume gone, no longer a cache PVC + } else { + // Mount path still desired: delete the old PVC (the one whose size differs from desired). + desiredSize := desiredSizeByMountPath[mp] + for i := range pvcList.Items { + pvc := &pvcList.Items[i] + if pvc.Annotations["mountPath"] != mp { + continue + } + if pvc.Spec.Resources.Requests.Storage().Value() != desiredSize { + log.Info("Broker pod is down — deleting pending-deletion tiered storage cache PVC", + "brokerId", brokerId, "mountPath", mp, "pvc", pvc.Name) + if err := r.Delete(ctx, pvc); err != nil && !apierrors.IsNotFound(err) { + return false, errorfactory.New(errorfactory.APIFailure{}, err, + "deleting pending-deletion tiered storage cache PVC failed", "pvc", pvc.Name) + } + break + } } + stateUpdates[mp] = banzaiv1beta1.TieredCacheVolumeActive // resize done, still a cache PVC + } + } + + if len(stateUpdates) > 0 { + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{brokerId}, r.KafkaCluster, + stateUpdates, log); err != nil { + return false, errorfactory.New(errorfactory.StatusUpdateError{}, err, + "could not clear cache resize state after PVC cleanup", "brokerId", brokerId) + } + } + // Re-list so the rest of this iteration sees the current state. + if err := r.List(ctx, pvcList, client.InNamespace(r.KafkaCluster.GetNamespace()), matchingLabels); err != nil { + return false, errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType) + } + } + + if brokerPodExists && len(stateUpdates) > 0 { + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{brokerId}, r.KafkaCluster, + stateUpdates, log); err != nil { + return false, errorfactory.New(errorfactory.StatusUpdateError{}, err, + "could not clear cache resize state after resize complete", "brokerId", brokerId) + } + } + + // If the pod is still running and any resize state entry refers to a mount path that is no + // longer desired (storage config removed mid-resize), trigger a rolling restart so the pod + // stops and the orphaned PVCs can be deleted on the next cycle. + if brokerPodExists { + for mp, state := range r.KafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes { + if state != banzaiv1beta1.TieredCacheVolumePendingDeletion { continue } - if err == nil { - if k8sutil.CheckIfObjectUpdated(log, desiredType, currentPvc, desiredPvc) { - if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPvc); err != nil { - return errors.WrapIf(err, "could not apply last state to annotation") + if !desiredMountPaths[mp] { + log.Info("Orphaned cache resize state detected with broker pod running — triggering rolling restart", + "brokerId", brokerId, "mountPath", mp) + if r.KafkaCluster.Status.BrokersState[brokerId].ConfigurationState != banzaiv1beta1.ConfigOutOfSync { + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{brokerId}, r.KafkaCluster, + banzaiv1beta1.ConfigOutOfSync, log); err != nil { + return false, errorfactory.New(errorfactory.StatusUpdateError{}, err, + "could not mark broker ConfigOutOfSync for orphaned cache PVC cleanup", "brokerId", brokerId) } + } + return true, nil + } + } + } - if isDesiredStorageValueInvalid(desiredPvc, currentPvc) { - return errorfactory.New(errorfactory.InternalError{}, errors.New("could not modify pvc size"), - "one can not reduce the size of a PVC", "kind", desiredType) - } + return false, nil +} - resReq := desiredPvc.Spec.Resources.Requests - labels := desiredPvc.Labels - desiredPvc = currentPvc.DeepCopy() - desiredPvc.Spec.Resources.Requests = resReq - desiredPvc.Labels = labels +// reconcileDesiredPvcsForBroker reconciles the desired PVCs for a single broker. +// +//nolint:funlen,gocyclo +func (r *Reconciler) reconcileDesiredPvcsForBroker( + ctx context.Context, + log logr.Logger, + brokerId string, + desiredPvcs []*corev1.PersistentVolumeClaim, + pvcList *corev1.PersistentVolumeClaimList, + matchingLabels client.MatchingLabels, + desiredType reflect.Type, + brokerVolumesState map[string]banzaiv1beta1.VolumeState, +) error { + for _, desiredPvc := range desiredPvcs { + currentPvc := desiredPvc.DeepCopy() + log.V(1).Info("searching with label because name is empty") + + if err := r.List(ctx, pvcList, client.InNamespace(r.KafkaCluster.GetNamespace()), matchingLabels); err != nil { + return errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType) + } - if err := r.Update(ctx, desiredPvc); err != nil { - return errorfactory.New(errorfactory.APIFailure{}, err, "updating resource failed", "kind", desiredType) + mountPath := currentPvc.Annotations["mountPath"] + // Creating the first PersistentVolume For Pod + if len(pvcList.Items) == 0 { + if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPvc); err != nil { + return errors.WrapIf(err, "could not apply last state to annotation") + } + if err := r.Create(ctx, desiredPvc); err != nil { + return errorfactory.New(errorfactory.APIFailure{}, err, "creating resource failed", "kind", desiredType) + } + log.Info("resource created") + if desiredPvc.Annotations[banzaiv1beta1.TieredStorageCacheAnnotationKey] == annotationTrue { + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{brokerId}, r.KafkaCluster, + map[string]banzaiv1beta1.TieredCacheVolumeState{mountPath: banzaiv1beta1.TieredCacheVolumeActive}, log); err != nil { + return errorfactory.New(errorfactory.StatusUpdateError{}, err, + "could not record tiered cache volume state after PVC creation", "brokerId", brokerId) + } + } + continue + } + + alreadyCreated := false + for _, pvc := range pvcList.Items { + if mountPath != pvc.Annotations["mountPath"] { + continue + } + // Skip PVCs that are being deleted — the API server may still return them briefly + // after a Delete call while the pvc-protection finalizer is being removed. + // Matching a terminating PVC as the current PVC would cause a spurious re-staging. + if pvc.DeletionTimestamp != nil { + continue + } + // Skip the old PVC that is waiting to be deleted once the broker pod stops. + // We identify it as the PVC whose size differs from the desired size while a + // pending-deletion resize is in flight for this mount path. + if r.KafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes[mountPath] == banzaiv1beta1.TieredCacheVolumePendingDeletion { + if desiredPvc.Spec.Resources.Requests.Storage().Value() != pvc.Spec.Resources.Requests.Storage().Value() { + continue + } + } + currentPvc = pvc.DeepCopy() + alreadyCreated = true + + // Trigger a CC disk rebalance only for regular data volumes. + // Tiered storage cache PVCs are ephemeral — CC must not account for them. + if !r.isBrokerPVCTieredCache(currentPvc, brokerId) { + // Checking pvc state, if bounded, so the broker has already restarted and the CC GracefulDiskRebalance has not happened yet, + // then we make it happening with status update. + // If disk removal was set, and the disk was added back, we also need to mark the volume for rebalance + volumeState, found := r.KafkaCluster.Status.BrokersState[brokerId].GracefulActionState.VolumeStates[mountPath] + if currentPvc.Status.Phase == corev1.ClaimBound && + (!found || volumeState.CruiseControlVolumeState.IsDiskRemoval()) { + brokerVolumesState[mountPath] = banzaiv1beta1.VolumeState{CruiseControlVolumeState: banzaiv1beta1.GracefulDiskRebalanceRequired} + } + } + break + } + + if !alreadyCreated { + // During an in-flight cache resize, a prior reconcile cycle may have already created + // the replacement PVC server-side even if the client got a timeout, or the cached + // client may not yet have observed an in-flight Create. The size filter above + // excludes the old PVC, so alreadyCreated is false even when a replacement exists. + // A strongly-consistent (uncached) read picks up that replacement so we don't issue a + // second Create with a fresh GenerateName. + if r.KafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes[mountPath] == banzaiv1beta1.TieredCacheVolumePendingDeletion { + liveList := &corev1.PersistentVolumeClaimList{} + if err := r.DirectClient.List(ctx, liveList, client.InNamespace(r.KafkaCluster.GetNamespace()), matchingLabels); err != nil { + return errorfactory.New(errorfactory.APIFailure{}, err, "uncached list of PVCs failed", "kind", desiredType) + } + desiredSize := desiredPvc.Spec.Resources.Requests.Storage().Value() + for i := range liveList.Items { + p := &liveList.Items[i] + if p.DeletionTimestamp != nil || p.Annotations["mountPath"] != mountPath { + continue + } + if p.Spec.Resources.Requests.Storage().Value() == desiredSize { + log.Info("Replacement cache PVC already exists from a prior partial attempt; skipping Create", + "brokerId", brokerId, "mountPath", mountPath, "pvc", p.Name) + alreadyCreated = true + break } - log.Info("resource updated") + } + if alreadyCreated { + continue + } + } + // Creating the 2+ PersistentVolumes for Pod + if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPvc); err != nil { + return errors.WrapIf(err, "could not apply last state to annotation") + } + if err := r.Create(ctx, desiredPvc); err != nil { + return errorfactory.New(errorfactory.APIFailure{}, err, "creating resource failed", "kind", desiredType) + } + // Only write Active when this is a genuinely new PVC. If pending-deletion is already + // set (crash-recovery: prior Create timed out), preserve the in-flight state so + // handleBrokerCacheResizeCleanup can still delete the old PVC on the next cycle. + if desiredPvc.Annotations[banzaiv1beta1.TieredStorageCacheAnnotationKey] == annotationTrue && + r.KafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes[mountPath] != banzaiv1beta1.TieredCacheVolumePendingDeletion { + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{brokerId}, r.KafkaCluster, + map[string]banzaiv1beta1.TieredCacheVolumeState{mountPath: banzaiv1beta1.TieredCacheVolumeActive}, log); err != nil { + return errorfactory.New(errorfactory.StatusUpdateError{}, err, + "could not record tiered cache volume state after PVC creation", "brokerId", brokerId) } } + continue } - if len(brokerVolumesState) > 0 { - brokerIds = append(brokerIds, brokerId) - brokersVolumesState[brokerId] = brokerVolumesState + // A resize is in flight for this mount path: the matched PVC is the replacement. + // Keep triggering ConfigOutOfSync so the rolling upgrade completes and the old PVC + // can be deleted on the next reconcile cycle. + if r.KafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes[mountPath] == banzaiv1beta1.TieredCacheVolumePendingDeletion { + if r.KafkaCluster.Status.BrokersState[brokerId].ConfigurationState != banzaiv1beta1.ConfigOutOfSync { + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{brokerId}, r.KafkaCluster, + banzaiv1beta1.ConfigOutOfSync, log); err != nil { + return errorfactory.New(errorfactory.StatusUpdateError{}, err, + "could not mark broker ConfigOutOfSync for pending tiered storage cache PVC resize", "brokerId", brokerId) + } + } + continue } - } - if len(brokersVolumesState) > 0 { - err := k8sutil.UpdateBrokerStatus(r.Client, brokerIds, r.KafkaCluster, brokersVolumesState, log) - if err != nil { - return err + // Use the committed brokerState (written at PVC creation) as the authoritative source of + // truth — never the desired spec. Reading from the desired side would let a CR edit + // (flipping tieredStorageCache: false → true on an existing data volume) route a real + // log-dir PVC through the cache-shrink delete-and-recreate path, bypassing graceful drain. + isTieredCache := r.isBrokerPVCTieredCache(currentPvc, brokerId) + desiredSize := desiredPvc.Spec.Resources.Requests.Storage().Value() + currentSize := currentPvc.Spec.Resources.Requests.Storage().Value() + + // Tiered storage cache PVC shrink: stage the replacement PVC immediately so + // provisioning runs in parallel with rolling-upgrade gate evaluation. + // The old PVC is annotated pending-deletion and removed once the broker pod stops. + if isTieredCache && desiredSize < currentSize { + if err := r.stageTieredCachePVCShrink(ctx, log, brokerId, mountPath, currentPvc, desiredPvc, currentSize, desiredSize); err != nil { + return err + } + continue + } + + if !k8sutil.CheckIfObjectUpdated(log, desiredType, currentPvc, desiredPvc) { + continue + } + + if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPvc); err != nil { + return errors.WrapIf(err, "could not apply last state to annotation") + } + + // Regular validation: size decreases are forbidden for non-cache volumes. + if isDesiredStorageValueInvalid(desiredPvc, currentPvc) { + return errorfactory.New(errorfactory.InternalError{}, errors.New("could not modify pvc size"), + "one can not reduce the size of a PVC", "kind", desiredType) + } + + resReq := desiredPvc.Spec.Resources.Requests + labels := desiredPvc.Labels + desiredPvc = currentPvc.DeepCopy() + desiredPvc.Spec.Resources.Requests = resReq + desiredPvc.Labels = labels + + if err := r.Update(ctx, desiredPvc); err != nil { + return errorfactory.New(errorfactory.APIFailure{}, err, "updating resource failed", "kind", desiredType) } + log.Info("resource updated") } + return nil +} - if waitForDiskRemovalToFinish { - return errorfactory.New(errorfactory.CruiseControlTaskRunning{}, errors.New("Disk removal pending"), "Disk removal pending") +// stageTieredCachePVCShrink handles shrinking a tiered storage cache PVC by staging a replacement +// and triggering a rolling restart so the old PVC can be deleted once the broker pod stops. +func (r *Reconciler) stageTieredCachePVCShrink( + ctx context.Context, + log logr.Logger, + brokerId string, + mountPath string, + currentPvc *corev1.PersistentVolumeClaim, + desiredPvc *corev1.PersistentVolumeClaim, + currentSize int64, + desiredSize int64, +) error { + log.Info("Tiered storage cache size decrease detected — staging replacement PVC", + "brokerId", brokerId, + "mountPath", mountPath, + "currentSize", currentSize, + "desiredSize", desiredSize) + + // Record the resize in brokerState so the reconciler knows to exclude the old PVC and + // keep the rolling upgrade running until the old PVC is cleaned up. + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{brokerId}, r.KafkaCluster, + map[string]banzaiv1beta1.TieredCacheVolumeState{mountPath: banzaiv1beta1.TieredCacheVolumePendingDeletion}, log); err != nil { + return errorfactory.New(errorfactory.StatusUpdateError{}, err, + "could not record cache resize state for tiered storage cache PVC resize", "brokerId", brokerId) + } + log.Info("Recorded pending-deletion resize state for tiered storage cache PVC", + "brokerId", brokerId, "mountPath", mountPath, "pvc", currentPvc.Name) + + // Create the replacement PVC immediately so provisioning starts now. + if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPvc); err != nil { + return errors.WrapIf(err, "could not apply last state to annotation") } + if err := r.Create(ctx, desiredPvc); err != nil { + // Resize state is already recorded; the next reconcile cycle will re-create the replacement. + log.Error(err, "creating replacement tiered storage cache PVC failed after recording resize state; will retry", + "brokerId", brokerId, "mountPath", mountPath) + return errorfactory.New(errorfactory.APIFailure{}, err, + "creating replacement tiered storage cache PVC failed", "brokerId", brokerId) + } + log.Info("Created replacement tiered storage cache PVC", + "brokerId", brokerId, "pvc", desiredPvc.Name, "desiredSize", desiredSize) + // Trigger rolling upgrade so the broker pod is restarted and the old PVC + // can be deleted on the next reconcile cycle. + if r.KafkaCluster.Status.BrokersState[brokerId].ConfigurationState != banzaiv1beta1.ConfigOutOfSync { + log.Info("Marking broker ConfigOutOfSync to trigger rolling upgrade for tiered storage cache PVC resize", + "brokerId", brokerId) + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{brokerId}, r.KafkaCluster, + banzaiv1beta1.ConfigOutOfSync, log); err != nil { + return errorfactory.New(errorfactory.StatusUpdateError{}, err, + "could not mark broker ConfigOutOfSync for tiered storage cache PVC resize", "brokerId", brokerId) + } + } return nil } diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 4ca517e6f..292d1cdbc 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -31,9 +31,12 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "github.com/banzaicloud/k8s-objectmatcher/patch" + "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" controllerMocks "github.com/banzaicloud/koperator/controllers/tests/mocks" @@ -1257,6 +1260,15 @@ func execPvcTest(t *testing.T, testCases []PvcTestCase) { }, } + // Mock pod list — the new code always checks pod existence before disk removal. + // For these tests the broker pod is always absent. + mockClient.EXPECT().List( + context.TODO(), + gomock.AssignableToTypeOf(&corev1.PodList{}), + client.InNamespace("kafka"), + gomock.Any(), + ).Return(nil).AnyTimes() + // Set up the mockClient to return the provided test.existingPvcs mockClient.EXPECT().List( context.TODO(), @@ -1336,6 +1348,317 @@ func createPvc(name, brokerId, mountPath string) *corev1.PersistentVolumeClaim { } } +func TestReconcileKafkaPvcTieredCacheResize(t *testing.T) { + t.Parallel() + + const ( + clusterName = "kafka" + namespace = "kafka" + brokerId = "0" + mountPath = "/tiered-storage-cache" + ) + + makeTieredCachePvc := func(name, size string) *corev1.PersistentVolumeClaim { + qty := resource.MustParse(size) + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + v1beta1.BrokerIdLabelKey: brokerId, + v1beta1.AppLabelKey: "kafka", + v1beta1.KafkaCRLabelKey: clusterName, + }, + Annotations: map[string]string{ + "mountPath": mountPath, + v1beta1.TieredStorageCacheAnnotationKey: "true", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: qty, + }, + }, + }, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, + } + // Set last-applied annotation so CheckIfObjectUpdated detects size differences + _ = patch.DefaultAnnotator.SetLastAppliedAnnotation(pvc) + return pvc + } + + runningPod := corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-0", + Namespace: namespace, + }, + } + terminatingPod := corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-0", + Namespace: namespace, + DeletionTimestamp: &metav1.Time{Time: time.Now()}, + }, + } + + testCases := []struct { + testName string + existingPvc *corev1.PersistentVolumeClaim + additionalExistingPvc *corev1.PersistentVolumeClaim // optional second PVC at the same mountPath (e.g., pre-staged replacement) + // directClientOnlyPvc: when set, this PVC is returned only by DirectClient (uncached) reads, + // not by the cached Client. Simulates the cache-lag window after a Create where the watch + // event hasn't propagated yet — the scenario the HIGH-3 idempotency fix protects against. + directClientOnlyPvc *corev1.PersistentVolumeClaim + desiredPvc *corev1.PersistentVolumeClaim + existingPods []corev1.Pod + initialTieredCacheState v1beta1.TieredCacheVolumeState // pre-existing brokerState for mountPath, if any + expectedUpdatePvc bool + expectedCreatePvc bool + expectedDeletePvc bool + expectedError bool + // expectedTieredCacheState, when non-empty, asserts the final TieredCacheVolumes[mountPath] + // value after reconcileKafkaPvc returns. + expectedTieredCacheState v1beta1.TieredCacheVolumeState + }{ + { + // Pod is up, no prior resize state: record TieredCacheVolumePendingDeletion in brokerState, + // create replacement PVC, set ConfigOutOfSync to trigger rolling upgrade. + testName: "size decrease with running pod — resize state recorded, replacement PVC created", + existingPvc: makeTieredCachePvc("cache-pvc-1", "100Gi"), + desiredPvc: makeTieredCachePvc("cache-pvc-new", "50Gi"), + existingPods: []corev1.Pod{runningPod}, + expectedUpdatePvc: false, + expectedCreatePvc: true, + expectedDeletePvc: false, + expectedError: false, + }, + { + // Terminating pod is treated as having no running pod: staging starts immediately + // so the replacement PVC is provisioned during the drain window. + testName: "size decrease with terminating pod — resize state recorded, replacement PVC created", + existingPvc: makeTieredCachePvc("cache-pvc-1", "100Gi"), + desiredPvc: makeTieredCachePvc("cache-pvc-new", "50Gi"), + existingPods: []corev1.Pod{terminatingPod}, + expectedUpdatePvc: false, + expectedCreatePvc: true, + expectedDeletePvc: false, + expectedError: false, + }, + { + // Resize state already recorded and pod is terminating (treated as gone): + // cleanup fires — old PVC (larger size) is deleted, state is cleared. + testName: "pending-deletion state with terminating pod — old PVC deleted", + existingPvc: makeTieredCachePvc("cache-pvc-1", "100Gi"), + desiredPvc: makeTieredCachePvc("cache-pvc-new", "50Gi"), + existingPods: []corev1.Pod{terminatingPod}, + initialTieredCacheState: v1beta1.TieredCacheVolumePendingDeletion, + expectedUpdatePvc: false, + expectedCreatePvc: true, + expectedDeletePvc: true, + expectedError: false, + }, + { + // Pod is already gone, no prior resize state: record state and create replacement PVC. + // Cleanup of old PVC happens on next cycle when reconciler re-observes the state. + testName: "size decrease with pod already gone — resize state recorded, replacement PVC created", + existingPvc: makeTieredCachePvc("cache-pvc-1", "100Gi"), + desiredPvc: makeTieredCachePvc("cache-pvc-new", "50Gi"), + existingPods: []corev1.Pod{}, + expectedUpdatePvc: false, + expectedCreatePvc: true, + expectedDeletePvc: false, + expectedError: false, + }, + { + // Resize state already recorded and pod is gone: + // cleanup fires — old PVC (larger size) is deleted, state is cleared. + testName: "pending-deletion state with pod gone — old PVC deleted", + existingPvc: makeTieredCachePvc("cache-pvc-1", "100Gi"), + desiredPvc: makeTieredCachePvc("cache-pvc-new", "50Gi"), + existingPods: []corev1.Pod{}, + initialTieredCacheState: v1beta1.TieredCacheVolumePendingDeletion, + expectedUpdatePvc: false, + expectedCreatePvc: true, + expectedDeletePvc: true, + expectedError: false, + }, + { + // size increase — no special handling, regular PVC update path. + testName: "size increase — regular PVC update path", + existingPvc: makeTieredCachePvc("cache-pvc-1", "50Gi"), + desiredPvc: makeTieredCachePvc("cache-pvc-1", "100Gi"), + existingPods: []corev1.Pod{}, + expectedUpdatePvc: true, + expectedCreatePvc: false, + expectedDeletePvc: false, + expectedError: false, + }, + { + // Crash-recovery: state was written in a prior cycle but the replacement Create + // failed before completing. Pod is still running, so the old PVC must NOT be + // deleted and state must NOT be cleared. The reconciler must re-create the + // replacement. + testName: "pending-deletion with running pod and no replacement — replacement re-staged", + existingPvc: makeTieredCachePvc("cache-pvc-1", "100Gi"), + desiredPvc: makeTieredCachePvc("cache-pvc-new", "50Gi"), + existingPods: []corev1.Pod{runningPod}, + initialTieredCacheState: v1beta1.TieredCacheVolumePendingDeletion, + expectedUpdatePvc: false, + expectedCreatePvc: true, + expectedDeletePvc: false, + expectedError: false, + expectedTieredCacheState: v1beta1.TieredCacheVolumePendingDeletion, + }, + { + // Idempotency (symmetric): pending-deletion state and a replacement at desired + // size already exists in both cached and uncached views. The inner-match loop + // finds the replacement; no second Create. + testName: "pending-deletion with running pod and replacement present — no duplicate Create", + existingPvc: makeTieredCachePvc("cache-pvc-1", "100Gi"), + additionalExistingPvc: makeTieredCachePvc("cache-pvc-replacement", "50Gi"), + desiredPvc: makeTieredCachePvc("cache-pvc-replacement", "50Gi"), + existingPods: []corev1.Pod{runningPod}, + initialTieredCacheState: v1beta1.TieredCacheVolumePendingDeletion, + expectedUpdatePvc: false, + expectedCreatePvc: false, + expectedDeletePvc: false, + expectedError: false, + }, + { + // Idempotency (asymmetric — the actual HIGH-3 cache-lag scenario): the prior + // Create succeeded server-side, so the replacement exists in etcd, but the cached + // Client hasn't yet observed the watch event. The inner match loop sees only the + // old PVC (size-filtered out) and would fall through to a second Create — except + // the idempotency check uses DirectClient (uncached) and detects the replacement. + // If a future refactor removes the DirectClient lookup, this test fails. + testName: "pending-deletion with replacement visible only via DirectClient — no duplicate Create", + existingPvc: makeTieredCachePvc("cache-pvc-1", "100Gi"), + directClientOnlyPvc: makeTieredCachePvc("cache-pvc-replacement", "50Gi"), + desiredPvc: makeTieredCachePvc("cache-pvc-replacement", "50Gi"), + existingPods: []corev1.Pod{runningPod}, + initialTieredCacheState: v1beta1.TieredCacheVolumePendingDeletion, + expectedUpdatePvc: false, + expectedCreatePvc: false, + expectedDeletePvc: false, + expectedError: false, + }, + } + + mockCtrl := gomock.NewController(t) + + for _, test := range testCases { + test := test + t.Run(test.testName, func(t *testing.T) { + mockClient := mocks.NewMockClient(mockCtrl) + mockDirectClient := mocks.NewMockClient(mockCtrl) // separate so tests can simulate cache vs apiserver asymmetry + mockSubResourceClient := mocks.NewMockSubResourceClient(mockCtrl) + + kafkaCluster := &v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{Name: clusterName, Namespace: namespace}, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{ + Id: 0, + BrokerConfig: &v1beta1.BrokerConfig{Roles: []string{"broker"}}, + }}, + }, + } + if test.initialTieredCacheState != "" { + kafkaCluster.Status.BrokersState = map[string]v1beta1.BrokerState{ + brokerId: { + TieredCacheVolumes: map[string]v1beta1.TieredCacheVolumeState{ + mountPath: test.initialTieredCacheState, + }, + }, + } + } + r := Reconciler{ + Reconciler: resources.Reconciler{ + Client: mockClient, + DirectClient: mockDirectClient, + KafkaCluster: kafkaCluster, + }, + } + + // Cached Client PVC list: returns the existing PVC plus any additional. Does NOT + // include directClientOnlyPvc — that simulates the cache-lag window where a Create + // has succeeded server-side but the watch event hasn't propagated. + mockClient.EXPECT().List( + context.TODO(), + gomock.AssignableToTypeOf(&corev1.PersistentVolumeClaimList{}), + client.InNamespace(namespace), + gomock.Any(), + ).Do(func(ctx context.Context, list *corev1.PersistentVolumeClaimList, opts ...client.ListOption) { + list.Items = []corev1.PersistentVolumeClaim{*test.existingPvc} + if test.additionalExistingPvc != nil { + list.Items = append(list.Items, *test.additionalExistingPvc) + } + }).Return(nil).AnyTimes() + + // Uncached DirectClient PVC list: same as cached, plus directClientOnlyPvc when set. + mockDirectClient.EXPECT().List( + context.TODO(), + gomock.AssignableToTypeOf(&corev1.PersistentVolumeClaimList{}), + client.InNamespace(namespace), + gomock.Any(), + ).Do(func(ctx context.Context, list *corev1.PersistentVolumeClaimList, opts ...client.ListOption) { + list.Items = []corev1.PersistentVolumeClaim{*test.existingPvc} + if test.additionalExistingPvc != nil { + list.Items = append(list.Items, *test.additionalExistingPvc) + } + if test.directClientOnlyPvc != nil { + list.Items = append(list.Items, *test.directClientOnlyPvc) + } + }).Return(nil).AnyTimes() + + // Mock Pod list — returns the configured pods + mockClient.EXPECT().List( + context.TODO(), + gomock.AssignableToTypeOf(&corev1.PodList{}), + client.InNamespace(namespace), + gomock.Any(), + ).Do(func(ctx context.Context, list *corev1.PodList, opts ...client.ListOption) { + list.Items = test.existingPods + }).Return(nil).AnyTimes() + + // Mock PVC update (annotating pending-deletion or stripping replacement annotation) + if test.expectedUpdatePvc { + mockClient.EXPECT().Update(context.TODO(), gomock.AssignableToTypeOf(&corev1.PersistentVolumeClaim{})).Return(nil).AnyTimes() + } + + // Mock PVC deletion + if test.expectedDeletePvc { + mockClient.EXPECT().Delete(context.TODO(), gomock.AssignableToTypeOf(&corev1.PersistentVolumeClaim{})).Return(nil).Times(1) + } + + // Mock PVC creation + if test.expectedCreatePvc { + mockClient.EXPECT().Create(context.TODO(), gomock.AssignableToTypeOf(&corev1.PersistentVolumeClaim{})).Return(nil).Times(1) + } + + mockClient.EXPECT().Status().Return(mockSubResourceClient).AnyTimes() + mockSubResourceClient.EXPECT().Update(context.Background(), gomock.AssignableToTypeOf(&v1beta1.KafkaCluster{})).Return(nil).AnyTimes() + + brokersDesiredPvcs := map[string][]*corev1.PersistentVolumeClaim{ + brokerId: {test.desiredPvc}, + } + + err := r.reconcileKafkaPvc(context.TODO(), logf.Log, brokersDesiredPvcs) + + if test.expectedError { + assert.NotNil(t, err, "expected an error but got nil") + } else { + assert.Nil(t, err, "expected no error but got: %v", err) + } + if test.expectedTieredCacheState != "" { + finalState := kafkaCluster.Status.BrokersState[brokerId].TieredCacheVolumes[mountPath] + assert.Equal(t, test.expectedTieredCacheState, finalState, + "TieredCacheVolumes[%s] after reconcile", mountPath) + } + }) + } +} + // nolint funlen func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { t.Parallel() diff --git a/pkg/resources/kafka/pvc.go b/pkg/resources/kafka/pvc.go index 29516f397..803885604 100644 --- a/pkg/resources/kafka/pvc.go +++ b/pkg/resources/kafka/pvc.go @@ -62,6 +62,13 @@ func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.Stora return nil, errors.WrapIfWithDetails(err, "couldn't unmarshal Pvc spec", errCtx...) } + annotations := map[string]string{"mountPath": storage.MountPath} + + // Mark tiered storage cache PVCs with annotation for special handling + if storage.TieredStorageCache { + annotations[v1beta1.TieredStorageCacheAnnotationKey] = annotationTrue + } + return &corev1.PersistentVolumeClaim{ ObjectMeta: templates.ObjectMetaWithGeneratedNameAndAnnotations( fmt.Sprintf(brokerStorageTemplate, r.KafkaCluster.Name, brokerId, storageIndex), @@ -69,7 +76,7 @@ func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.Stora apiutil.LabelsForKafka(r.KafkaCluster.Name), map[string]string{v1beta1.BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)}, ), - map[string]string{"mountPath": storage.MountPath}, r.KafkaCluster), + annotations, r.KafkaCluster), Spec: pvcSpec, }, nil } diff --git a/pkg/webhooks/errors.go b/pkg/webhooks/errors.go index 58f3d9ef1..0938847e5 100644 --- a/pkg/webhooks/errors.go +++ b/pkg/webhooks/errors.go @@ -29,6 +29,7 @@ const ( unsupportedRemovingStorageMsg = "removing storage from a broker is not supported" invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number" invalidContainerPortForIngressControllerErrMsg = "invalid trarget port number for ingress controller deployment" + immutableTieredStorageCacheErrMsg = "tieredStorageCache is immutable on existing storageConfigs entries; remove the entry and re-add to change" // errorDuringValidationMsg is added to infrastructure errors (e.g. failed to connect), but not to field validation errors errorDuringValidationMsg = "error during validation" @@ -62,6 +63,10 @@ func IsAdmissionInvalidExternalListenerPort(err error) bool { return apierrors.IsInvalid(err) && strings.Contains(err.Error(), invalidExternalListenerStartingPortErrMsg) } +func IsAdmissionImmutableTieredStorageCache(err error) bool { + return apierrors.IsInvalid(err) && strings.Contains(err.Error(), immutableTieredStorageCacheErrMsg) +} + func IsAdmissionErrorDuringValidation(err error) bool { return apierrors.IsInternalError(err) && strings.Contains(err.Error(), errorDuringValidationMsg) } diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index d9f7b8932..a2941cd41 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -18,6 +18,7 @@ package webhooks import ( "context" "fmt" + "strconv" corev1 "k8s.io/api/core/v1" @@ -39,6 +40,7 @@ type KafkaClusterValidator struct { func (s KafkaClusterValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (warnings admission.Warnings, err error) { var allErrs field.ErrorList + kafkaClusterOld := oldObj.(*banzaicloudv1beta1.KafkaCluster) kafkaClusterNew := newObj.(*banzaicloudv1beta1.KafkaCluster) log := s.Log.WithValues("name", kafkaClusterNew.GetName(), "namespace", kafkaClusterNew.GetNamespace()) @@ -47,6 +49,8 @@ func (s KafkaClusterValidator) ValidateUpdate(ctx context.Context, oldObj, newOb allErrs = append(allErrs, listenerErrs...) } + allErrs = append(allErrs, checkTieredStorageCacheImmutability(kafkaClusterOld, kafkaClusterNew)...) + if len(allErrs) == 0 { return nil, nil } @@ -81,6 +85,68 @@ func (s KafkaClusterValidator) ValidateDelete(ctx context.Context, obj runtime.O return nil, nil } +// checkTieredStorageCacheImmutability rejects updates that change the tieredStorageCache +// classification of an existing PVC. It uses status.BrokersState[brokerID].TieredCacheVolumes +// as the authoritative source of truth — the reconciler writes this map when it creates each PVC. +// Comparing against committed status catches all bypass paths (in-place flip, group-switch, +// inline-shadow override) without requiring spec-level merging or effective-config resolution. +// Removing a mountPath from the spec is allowed (delete-and-re-add path). +func checkTieredStorageCacheImmutability(oldCluster, newCluster *banzaicloudv1beta1.KafkaCluster) field.ErrorList { + var allErrs field.ErrorList + + for brokerIDStr, brokerState := range oldCluster.Status.BrokersState { + for mountPath, state := range brokerState.TieredCacheVolumes { + if state == "" { + continue // not a cache PVC + } + newValue, fieldPath, found := findTieredStorageCacheInSpec(&newCluster.Spec, brokerIDStr, mountPath) + if !found { + continue // mountPath removed from spec — remove-and-re-add is allowed + } + if !newValue { + allErrs = append(allErrs, field.Forbidden(fieldPath, immutableTieredStorageCacheErrMsg)) + } + } + } + + return allErrs +} + +// findTieredStorageCacheInSpec resolves the intended tieredStorageCache value for a given +// broker (by string ID, matching reconciler convention) and mountPath from the raw spec. +// Inline storageConfigs take priority over brokerConfigGroup (matching dedupStorageConfigs order). +// Returns (value, fieldPath, found=true) when the mountPath is present in the new spec. +func findTieredStorageCacheInSpec(spec *banzaicloudv1beta1.KafkaClusterSpec, brokerIDStr, mountPath string) (bool, *field.Path, bool) { + for i, broker := range spec.Brokers { + if strconv.Itoa(int(broker.Id)) != brokerIDStr { + continue + } + // Inline config has priority (mirrors dedupStorageConfigs order in GetBrokerConfig) + if broker.BrokerConfig != nil { + for k, sc := range broker.BrokerConfig.StorageConfigs { + if sc.MountPath == mountPath { + p := field.NewPath("spec").Child("brokers").Index(i). + Child("brokerConfig").Child("storageConfigs").Index(k).Child("tieredStorageCache") + return sc.TieredStorageCache, p, true + } + } + } + if broker.BrokerConfigGroup != "" { + if group, ok := spec.BrokerConfigGroups[broker.BrokerConfigGroup]; ok { + for k, sc := range group.StorageConfigs { + if sc.MountPath == mountPath { + p := field.NewPath("spec").Child("brokerConfigGroups"). + Key(broker.BrokerConfigGroup).Child("storageConfigs").Index(k).Child("tieredStorageCache") + return sc.TieredStorageCache, p, true + } + } + } + } + break + } + return false, nil, false +} + // checkListeners validates the spec.listenersConfig object func checkInternalAndExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { var allErrs field.ErrorList diff --git a/pkg/webhooks/kafkacluster_validator_test.go b/pkg/webhooks/kafkacluster_validator_test.go index 952c3f389..7d81acf69 100644 --- a/pkg/webhooks/kafkacluster_validator_test.go +++ b/pkg/webhooks/kafkacluster_validator_test.go @@ -258,6 +258,161 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { } } +func TestCheckTieredStorageCacheImmutability(t *testing.T) { + sc := func(path string, tc bool) v1beta1.StorageConfig { + return v1beta1.StorageConfig{MountPath: path, TieredStorageCache: tc} + } + // committed returns a KafkaCluster whose status records the given tieredCache volumes for broker 0. + // Only cache PVCs are tracked (TieredCacheVolumeActive or TieredCacheVolumePendingDeletion); + // non-cache PVCs have no entry. + committed := func(vols map[string]v1beta1.TieredCacheVolumeState) *v1beta1.KafkaCluster { + return &v1beta1.KafkaCluster{ + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": {TieredCacheVolumes: vols}, + }, + }, + } + } + + testCases := []struct { + testName string + oldCluster *v1beta1.KafkaCluster + newCluster *v1beta1.KafkaCluster + expected field.ErrorList + }{ + { + // No committed state — new cluster, any spec is valid. + testName: "no committed state — any spec change allowed", + oldCluster: &v1beta1.KafkaCluster{}, + newCluster: &v1beta1.KafkaCluster{Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 0, BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{sc("/data", true)}, + }}}, + }}, + expected: nil, + }, + { + // Committed status says /data IS a cache volume; inline spec now marks it as non-cache. + testName: "in-place inline flip true→false rejected", + oldCluster: committed(map[string]v1beta1.TieredCacheVolumeState{"/data": v1beta1.TieredCacheVolumeActive}), + newCluster: &v1beta1.KafkaCluster{Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 0, BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{sc("/data", false)}, + }}}, + }}, + expected: append(field.ErrorList{}, + field.Forbidden( + field.NewPath("spec").Child("brokers").Index(0).Child("brokerConfig").Child("storageConfigs").Index(0).Child("tieredStorageCache"), + immutableTieredStorageCacheErrMsg, + ), + ), + }, + { + // Committed status says /cache IS a cache volume; group spec now marks it as non-cache. + testName: "in-place group flip true→false rejected", + oldCluster: committed(map[string]v1beta1.TieredCacheVolumeState{"/cache": v1beta1.TieredCacheVolumeActive}), + newCluster: &v1beta1.KafkaCluster{Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 0, BrokerConfigGroup: "default"}}, + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": {StorageConfigs: []v1beta1.StorageConfig{sc("/cache", false)}}, + }, + }}, + expected: append(field.ErrorList{}, + field.Forbidden( + field.NewPath("spec").Child("brokerConfigGroups").Key("default").Child("storageConfigs").Index(0).Child("tieredStorageCache"), + immutableTieredStorageCacheErrMsg, + ), + ), + }, + { + // Bypass attempt: broker switches from groupA (where /cache=true) to groupB (where /cache=false). + // The raw-spec old→new comparison would miss this since groupA is unchanged. + // Status-based check catches it because committed status records /cache=active for broker 0. + testName: "group-switch bypass rejected", + oldCluster: committed(map[string]v1beta1.TieredCacheVolumeState{"/cache": v1beta1.TieredCacheVolumeActive}), + newCluster: &v1beta1.KafkaCluster{Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 0, BrokerConfigGroup: "groupB"}}, + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "groupA": {StorageConfigs: []v1beta1.StorageConfig{sc("/cache", true)}}, + "groupB": {StorageConfigs: []v1beta1.StorageConfig{sc("/cache", false)}}, + }, + }}, + expected: append(field.ErrorList{}, + field.Forbidden( + field.NewPath("spec").Child("brokerConfigGroups").Key("groupB").Child("storageConfigs").Index(0).Child("tieredStorageCache"), + immutableTieredStorageCacheErrMsg, + ), + ), + }, + { + // Bypass attempt: /cache was provisioned as a cache volume (via group); now an inline entry + // overrides it with TieredStorageCache=false. Inline takes priority in GetBrokerConfig so + // the effective value would flip — the check must reject the inline entry. + testName: "inline-shadow bypass rejected", + oldCluster: committed(map[string]v1beta1.TieredCacheVolumeState{"/cache": v1beta1.TieredCacheVolumeActive}), + newCluster: &v1beta1.KafkaCluster{Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{ + Id: 0, + BrokerConfigGroup: "default", + BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{sc("/cache", false)}, + }, + }}, + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": {StorageConfigs: []v1beta1.StorageConfig{sc("/cache", true)}}, + }, + }}, + expected: append(field.ErrorList{}, + field.Forbidden( + field.NewPath("spec").Child("brokers").Index(0).Child("brokerConfig").Child("storageConfigs").Index(0).Child("tieredStorageCache"), + immutableTieredStorageCacheErrMsg, + ), + ), + }, + { + // mountPath is removed from spec entirely — remove-and-re-add path is intentionally allowed. + testName: "remove mountPath from spec — allowed", + oldCluster: committed(map[string]v1beta1.TieredCacheVolumeState{"/cache": v1beta1.TieredCacheVolumeActive}), + newCluster: &v1beta1.KafkaCluster{Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 0, BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{sc("/data", false)}, + }}}, + }}, + expected: nil, + }, + { + // New mountPath with no committed state — any value is allowed. + testName: "new mountPath not in committed state — allowed", + oldCluster: committed(map[string]v1beta1.TieredCacheVolumeState{"/data": v1beta1.TieredCacheVolumeActive}), + newCluster: &v1beta1.KafkaCluster{Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 0, BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{sc("/data", true), sc("/cache", true)}, + }}}, + }}, + expected: nil, + }, + { + // Value unchanged — no error. + testName: "unchanged value — no error", + oldCluster: committed(map[string]v1beta1.TieredCacheVolumeState{"/cache": v1beta1.TieredCacheVolumeActive}), + newCluster: &v1beta1.KafkaCluster{Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 0, BrokerConfig: &v1beta1.BrokerConfig{ + StorageConfigs: []v1beta1.StorageConfig{sc("/cache", true)}, + }}}, + }}, + expected: nil, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.testName, func(t *testing.T) { + got := checkTieredStorageCacheImmutability(testCase.oldCluster, testCase.newCluster) + require.Equal(t, testCase.expected, got) + }) + } +} + func TestCheckTargetPortsCollisionForEnvoy(t *testing.T) { testCases := []struct { testName string diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index 3e2c35c3a..64c461e9c 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -78,6 +78,7 @@ var _ = ginkgo.When("Testing e2e test altogether", ginkgo.Ordered, func() { testProduceConsumeInternal() testJmxExporter() testUninstallKafkaCluster() + testTieredStorageCachePvcResize() testUninstall() snapshotClusterAndCompare(snapshottedInfo) }) diff --git a/tests/e2e/test_multidisk_removal.go b/tests/e2e/test_multidisk_removal.go index 18a730c89..b68f209b3 100644 --- a/tests/e2e/test_multidisk_removal.go +++ b/tests/e2e/test_multidisk_removal.go @@ -33,9 +33,10 @@ import ( ) const ( - multidiskRemovalTimeout = 1000 * time.Second // this test can take long - multidiskRemovalPollInterval = 15 * time.Second - brokerConfigTemplateFormat = "%s-config-%d" + multidiskRemovalTimeout = 1000 * time.Second // this test can take long + multidiskRemovalPollInterval = 15 * time.Second + multidiskRemovalBrokerReadinessWait = 360 * time.Second // rolling restart of all brokers after disk removal + brokerConfigTemplateFormat = "%s-config-%d" ) var ( @@ -75,7 +76,9 @@ func testMultiDiskRemoval() bool { }) ginkgo.It("Asserting Kafka brokers remain healthy", func() { - err := waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", defaultPodReadinessWaitTime, + // Use multidiskRemovalBrokerReadinessWait: after disk removal, CC triggers a rolling + // restart of all brokers, so 180s (defaultPodReadinessWaitTime) is too tight in kind. + err := waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", multidiskRemovalBrokerReadinessWait, v1beta1.KafkaCRLabelKey+"="+kafkaClusterName+",app=kafka", "") gomega.Expect(err).NotTo(gomega.HaveOccurred()) }) diff --git a/tests/e2e/test_tiered_storage_cache_resize.go b/tests/e2e/test_tiered_storage_cache_resize.go new file mode 100644 index 000000000..e826646cf --- /dev/null +++ b/tests/e2e/test_tiered_storage_cache_resize.go @@ -0,0 +1,442 @@ +// Copyright 2025 Adobe. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "emperror.dev/errors" + "github.com/gruntwork-io/terratest/modules/k8s" + ginkgo "github.com/onsi/ginkgo/v2" + gomega "github.com/onsi/gomega" +) + +const ( + tsResizeClusterName = "kafka-ts-resize" + + tsResizeInitialManifest = "../../config/samples/kafkacluster_tiered_storage_cache_resize_initial.yaml" + tsResizeShrunkManifest = "../../config/samples/kafkacluster_tiered_storage_cache_resize_shrunk.yaml" + + tsResizeCacheMountPath = "/tiered-storage-cache" + tsResizeInitialSize = "2Gi" + tsResizeShrunkSize = "1Gi" + + tsResizePhaseTimeout = 10 * time.Minute + tsResizePollingInterval = 15 * time.Second + + tsResizeBrokerID = 0 +) + +// pvcItem is a minimal representation of a PVC for assertion helpers. +type pvcItem struct { + Name string + Annotations map[string]string + StorageSize string + Phase string + IsTerminating bool +} + +// getCacheResizeState returns the tieredCacheVolumes entry for the given broker and mount path +// from the KafkaCluster CR status, or an empty string if not set. +func getCacheResizeState(kubectlOptions k8s.KubectlOptions, clusterName, brokerID, mountPath string) (string, error) { + rawOutput, err := k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, + "get", kafkaKind, clusterName, + "--output", "json", + ) + if err != nil { + return "", errors.WrapIf(err, "getting KafkaCluster failed") + } + + var cr struct { + Status struct { + BrokersState map[string]struct { + TieredCacheVolumes map[string]string `json:"tieredCacheVolumes"` + } `json:"brokersState"` + } `json:"status"` + } + if err := json.Unmarshal([]byte(rawOutput), &cr); err != nil { + return "", errors.WrapIf(err, "parsing KafkaCluster JSON failed") + } + + brokerState, ok := cr.Status.BrokersState[brokerID] + if !ok { + return "", nil + } + return brokerState.TieredCacheVolumes[mountPath], nil +} + +// listBrokerCachePVCs returns PVCs for broker tsResizeBrokerID that have the +// tiered-storage-cache mount path annotation. +func listBrokerCachePVCs(kubectlOptions k8s.KubectlOptions) ([]pvcItem, error) { + selector := fmt.Sprintf("%s=%s,brokerId=%d", kafkaCRLabelKey, tsResizeClusterName, tsResizeBrokerID) + + rawOutput, err := k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, + "get", "persistentvolumeclaims", + "-l", selector, + "--output", "json", + ) + if err != nil { + return nil, errors.WrapIf(err, "listing PVCs failed") + } + + var pvcList struct { + Items []struct { + Metadata struct { + Name string `json:"name"` + Annotations map[string]string `json:"annotations"` + DeletionTimestamp *string `json:"deletionTimestamp"` + } `json:"metadata"` + Spec struct { + Resources struct { + Requests struct { + Storage string `json:"storage"` + } `json:"requests"` + } `json:"resources"` + } `json:"spec"` + Status struct { + Phase string `json:"phase"` + } `json:"status"` + } `json:"items"` + } + if err := json.Unmarshal([]byte(rawOutput), &pvcList); err != nil { + return nil, errors.WrapIf(err, "parsing PVC list JSON failed") + } + + var result []pvcItem + for _, item := range pvcList.Items { + if item.Metadata.Annotations["mountPath"] == tsResizeCacheMountPath { + result = append(result, pvcItem{ + Name: item.Metadata.Name, + Annotations: item.Metadata.Annotations, + StorageSize: item.Spec.Resources.Requests.Storage, + Phase: item.Status.Phase, + IsTerminating: item.Metadata.DeletionTimestamp != nil, + }) + } + } + return result, nil +} + +// getBrokerPodUID returns the UID of the running (non-terminating) broker pod for the +// given broker ID, or an error if no such pod is found. +func getBrokerPodUID(kubectlOptions k8s.KubectlOptions, clusterName string, brokerID int) (string, error) { + rawOutput, err := k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, + "get", "pod", + "-l", fmt.Sprintf("%s=%s,brokerId=%d,app=kafka", kafkaCRLabelKey, clusterName, brokerID), + "--output", "json", + ) + if err != nil { + return "", errors.WrapIf(err, "listing broker pods failed") + } + + var podList struct { + Items []struct { + Metadata struct { + UID string `json:"uid"` + DeletionTimestamp *string `json:"deletionTimestamp"` + } `json:"metadata"` + } `json:"items"` + } + if err := json.Unmarshal([]byte(rawOutput), &podList); err != nil { + return "", errors.WrapIf(err, "parsing pod list JSON failed") + } + + for _, item := range podList.Items { + if item.Metadata.DeletionTimestamp == nil { + return item.Metadata.UID, nil + } + } + return "", fmt.Errorf("no running broker pod found for broker %d in cluster %s", brokerID, clusterName) +} + +// testTieredStorageCachePvcResize tests the full multi-phase delete-and-recreate +// flow for a tiered storage cache PVC shrink. It: +// +// 1. Installs a minimal KRaft cluster with a 2Gi cache PVC on broker 0. +// 2. Applies an updated manifest that shrinks the cache to 1Gi. +// 3. Waits for Phase 1 (staging): both the old PVC (pending-deletion) and +// the new PVC (replacement) exist simultaneously. +// 4. Waits for Phase 2 (pod cycle): the pod restarts, the old PVC is deleted, +// and the new pod starts referencing the replacement PVC. +// 5. Waits for Phase 3 (completion): the replacement annotation is stripped and +// the broker pod is running again. +// 6. Verifies the surviving PVC carries the new 1Gi size. +// 7. Cleans up the cluster. +func testTieredStorageCachePvcResize() bool { + return ginkgo.When("Testing tiered storage cache PVC shrink (delete-and-recreate flow)", ginkgo.Ordered, func() { + var kubectlOptions k8s.KubectlOptions + var err error + var broker0PodUID string // UID of the broker-0 pod before the resize, used to detect recycling + + // AfterAll ensures the cluster and its PVCs are removed even when a test step fails + // mid-run, preventing stale resources from blocking the next test run. + ginkgo.AfterAll(func() { + opts, e := kubectlOptionsForCurrentContext() + if e != nil { + return + } + opts.Namespace = koperatorLocalHelmDescriptor.Namespace + _, _ = k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &opts, + "patch", kafkaKind, tsResizeClusterName, + "--type=merge", `--patch={"metadata":{"finalizers":[]}}`, + ) + _, _ = k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &opts, + "delete", kafkaKind, tsResizeClusterName, "--ignore-not-found", + ) + _, _ = k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &opts, + "delete", "persistentvolumeclaims", + "-l", fmt.Sprintf("%s=%s", kafkaCRLabelKey, tsResizeClusterName), + "--ignore-not-found", + ) + _, _ = k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &opts, + "delete", "pods", + "-l", fmt.Sprintf("%s=%s", kafkaCRLabelKey, tsResizeClusterName), + "--ignore-not-found", "--grace-period=0", + ) + }) + + ginkgo.It("Acquiring K8s config and context", func() { + kubectlOptions, err = kubectlOptionsForCurrentContext() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace + }) + + // Pre-cleanup: forcibly remove any leftover cluster from a previous interrupted run + // so the install step always starts from a clean slate. + // We cannot rely on requireDeleteKafkaCluster here because a stuck finalizer (left by a + // mid-reconcile interruption) prevents the cascade deletion from completing in time. + // Instead we strip the finalizer first, delete the CR, then explicitly delete every + // owned resource so nothing blocks the subsequent fresh install. + ginkgo.It("Pre-cleanup: removing any leftover kafka-ts-resize cluster", func() { + // 1. Remove finalizers so the CR can be deleted regardless of operator state. + _, _ = k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, + "patch", kafkaKind, tsResizeClusterName, + "--type=merge", `--patch={"metadata":{"finalizers":[]}}`, + ) + // 2. Delete the CR itself (ignore not-found). + _, _ = k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, + "delete", kafkaKind, tsResizeClusterName, "--ignore-not-found", + ) + // 3. Explicitly delete PVCs — they carry a pvc-protection finalizer that + // blocks cascade GC while pods are still bound. + _, _ = k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, + "delete", "persistentvolumeclaims", + "-l", fmt.Sprintf("%s=%s", kafkaCRLabelKey, tsResizeClusterName), + "--ignore-not-found", + ) + // 4. Delete pods so they release PVC mounts promptly. + _, _ = k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, + "delete", "pods", + "-l", fmt.Sprintf("%s=%s", kafkaCRLabelKey, tsResizeClusterName), + "--ignore-not-found", "--grace-period=0", + ) + // 5. Wait for the CR itself to be gone (GC will handle the rest). + gomega.Eventually(context.Background(), func() error { + out, err := k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, + "get", kafkaKind, tsResizeClusterName, + ) + if err != nil || strings.Contains(out, "NotFound") || strings.Contains(out, "not found") { + return nil + } + return errors.New("KafkaCluster CR still exists") + }, tsResizePhaseTimeout, tsResizePollingInterval).ShouldNot(gomega.HaveOccurred()) + }) + + ginkgo.It("Installing KafkaCluster with tiered storage cache PVC (2Gi)", func() { + ginkgo.By("Applying initial KafkaCluster manifest") + applyK8sResourceManifest(kubectlOptions, tsResizeInitialManifest) + + ginkgo.By("Waiting for all broker pods to be ready") + // kubectl wait --for=condition=Ready fails immediately when no pods exist yet, so + // poll the pod list explicitly. Pod readiness is sufficient to proceed with the + // resize — we don't gate on ClusterRunning because Cruise Control may still be + // completing the post-startup rebalance when the resize is initiated. + gomega.Eventually(context.Background(), func() error { + output, err := k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), &kubectlOptions, + "get", "pod", + "-l", fmt.Sprintf("%s=%s,app=kafka,isBrokerNode=true", kafkaCRLabelKey, tsResizeClusterName), + "--output", "json", + ) + if err != nil { + return errors.WrapIf(err, "listing pods failed") + } + var podList struct { + Items []struct { + Status struct { + Conditions []struct { + Type string `json:"type"` + Status string `json:"status"` + } `json:"conditions"` + } `json:"status"` + } `json:"items"` + } + if err := json.Unmarshal([]byte(output), &podList); err != nil { + return errors.WrapIf(err, "parsing pod list JSON failed") + } + if len(podList.Items) < 1 { + return fmt.Errorf("expected at least 1 broker pod, got %d", len(podList.Items)) + } + for _, pod := range podList.Items { + ready := false + for _, cond := range pod.Status.Conditions { + if cond.Type == "Ready" && cond.Status == "True" { + ready = true + break + } + } + if !ready { + return errors.New("not all pods are ready yet") + } + } + return nil + }, kafkaClusterCreateTimeout, tsResizePollingInterval).ShouldNot(gomega.HaveOccurred()) + + ginkgo.By("Capturing broker-0 pod UID before resize") + broker0PodUID, err = getBrokerPodUID(kubectlOptions, tsResizeClusterName, 0) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(broker0PodUID).NotTo(gomega.BeEmpty()) + + ginkgo.By("Verifying initial cache PVC size is " + tsResizeInitialSize) + pvcs, err := listBrokerCachePVCs(kubectlOptions) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pvcs).To(gomega.HaveLen(1), "expected exactly one cache PVC for broker 0") + gomega.Expect(pvcs[0].StorageSize).To(gomega.Equal(tsResizeInitialSize)) + }) + + ginkgo.It("Triggering cache PVC shrink from 2Gi to 1Gi", func() { + ginkgo.By("Applying shrunk KafkaCluster manifest") + applyK8sResourceManifest(kubectlOptions, tsResizeShrunkManifest) + }) + + ginkgo.It("Phase 1: resize state recorded in brokerState and replacement PVC created", func() { + // Two acceptable success paths — the resize can complete in seconds, often faster + // than the polling interval, so insisting on the intermediate "pending-deletion" + // snapshot makes this phase flake-prone: + // A) In-progress: tieredCacheVolumes[mp]=="pending-deletion" AND ≥2 cache PVCs. + // B) Already completed: state is "active" (no resize in flight) AND exactly one + // cache PVC at the shrunk size. "active" is the steady-state after a resize; + // "" (absent) would mean the mount path was removed entirely. + // Either proves the resize was staged correctly; Phase 2/3 cover the remaining + // invariants regardless of which path we observed. + ginkgo.By("Waiting until the resize is observable as in-progress or completed") + gomega.Eventually(context.Background(), func() error { + state, err := getCacheResizeState(kubectlOptions, tsResizeClusterName, + fmt.Sprintf("%d", tsResizeBrokerID), tsResizeCacheMountPath) + if err != nil { + return err + } + pvcs, err := listBrokerCachePVCs(kubectlOptions) + if err != nil { + return err + } + if state == "pending-deletion" && len(pvcs) >= 2 { + return nil + } + if state == "active" && len(pvcs) == 1 && pvcs[0].StorageSize == tsResizeShrunkSize { + return nil + } + return fmt.Errorf("not at expected staging state: tieredCacheVolumes[%s]=%q, %d cache PVC(s)", + tsResizeCacheMountPath, state, len(pvcs)) + }, tsResizePhaseTimeout, tsResizePollingInterval).ShouldNot(gomega.HaveOccurred()) + }) + + ginkgo.It("Phase 2: broker pod restarts, old PVC is deleted, resize state is cleared", func() { + ginkgo.By("Waiting for broker-0 pod to be recycled (UID change indicates rolling restart)") + // We detect recycling by UID change rather than waiting for the pod count to hit zero. + // The pod may restart fast enough to be back before the next polling tick, causing a + // "pod count == 0" check to time out even when the restart already completed. + gomega.Eventually(context.Background(), func() error { + uid, err := getBrokerPodUID(kubectlOptions, tsResizeClusterName, 0) + if err != nil { + // Pod may be absent mid-restart; treat as not-yet-recycled. + return err + } + if uid == broker0PodUID { + return errors.New("broker-0 pod has not been recycled yet (same UID)") + } + return nil + }, tsResizePhaseTimeout, tsResizePollingInterval).ShouldNot(gomega.HaveOccurred()) + + ginkgo.By("Waiting for old PVC to be deleted and resize state to be cleared") + gomega.Eventually(context.Background(), func() error { + // The old (larger) PVC should be gone — only the replacement should remain. + pvcs, err := listBrokerCachePVCs(kubectlOptions) + if err != nil { + return err + } + activePvcs := make([]pvcItem, 0, len(pvcs)) + for _, pvc := range pvcs { + // Ignore PVCs that are being deleted (DeletionTimestamp set but not yet gone). + if !pvc.IsTerminating { + activePvcs = append(activePvcs, pvc) + } + } + if len(activePvcs) != 1 { + return fmt.Errorf("expected 1 active cache PVC for broker %d after pod restart, got %d", + tsResizeBrokerID, len(activePvcs)) + } + // After cleanup the resize state transitions back to "active" — no longer pending-deletion. + state, err := getCacheResizeState(kubectlOptions, tsResizeClusterName, + fmt.Sprintf("%d", tsResizeBrokerID), tsResizeCacheMountPath) + if err != nil { + return err + } + if state == "pending-deletion" { + return fmt.Errorf("tieredCacheVolumes[%s] still pending-deletion after old PVC deleted", + tsResizeCacheMountPath) + } + return nil + }, tsResizePhaseTimeout, tsResizePollingInterval).ShouldNot(gomega.HaveOccurred()) + }) + + ginkgo.It("Phase 3: broker pod running again with the new smaller PVC", func() { + ginkgo.By("Waiting for the broker-0 pod to come back Ready") + err = waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", + tsResizePhaseTimeout, + fmt.Sprintf("%s=%s,brokerId=0,app=kafka", kafkaCRLabelKey, tsResizeClusterName), "") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Verifying exactly one cache PVC remains with no resize state") + pvcs, err := listBrokerCachePVCs(kubectlOptions) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pvcs).To(gomega.HaveLen(1), "expected exactly one cache PVC after resize completes") + state, err := getCacheResizeState(kubectlOptions, tsResizeClusterName, + fmt.Sprintf("%d", tsResizeBrokerID), tsResizeCacheMountPath) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(state).To(gomega.Equal("active"), "tieredCacheVolumes entry should be active after resize completes") + }) + + ginkgo.It("Verifying the surviving cache PVC has the new size "+tsResizeShrunkSize, func() { + pvcs, err := listBrokerCachePVCs(kubectlOptions) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pvcs).To(gomega.HaveLen(1)) + gomega.Expect(pvcs[0].StorageSize).To(gomega.Equal(tsResizeShrunkSize), + "surviving PVC should carry the shrunk size") + gomega.Expect(strings.Contains(pvcs[0].Name, tsResizeClusterName)).To(gomega.BeTrue(), + "surviving PVC should belong to the test cluster") + }) + + // requireDeleteKafkaCluster receives a copy of kubectlOptions at registration + // time, so Namespace must be set here (not inside the It block above). + kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace + // requireDeleteKafkaCluster registers its own It block — must be called at + // the When scope, not nested inside an It. + requireDeleteKafkaCluster(kubectlOptions, tsResizeClusterName) + }) +}