Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 117 additions & 85 deletions ocp_resources/datavolume.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from ocp_resources.utils.constants import (
TIMEOUT_1MINUTE,
TIMEOUT_2MINUTES,
Expand All @@ -10,6 +12,11 @@
from timeout_sampler import TimeoutExpiredError, TimeoutSampler
from warnings import warn

from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
from ocp_resources.secret import Secret


class DataVolume(NamespacedResource):
"""
Expand Down Expand Up @@ -70,69 +77,58 @@ class Status(Resource.Condition.Status):

def __init__(
self,
name=None,
namespace=None,
source=None,
size=None,
storage_class=None,
url=None,
content_type=ContentType.KUBEVIRT,
access_modes=None,
cert_configmap=None,
secret=None,
client=None,
volume_mode=None,
hostpath_node=None,
source_pvc=None,
source_namespace=None,
multus_annotation=None,
bind_immediate_annotation=None,
preallocation=None,
teardown=True,
yaml_file=None,
delete_timeout=TIMEOUT_4MINUTES,
api_name="pvc",
delete_after_completion=None,
**kwargs,
):
source: str | None = None,
source_dict: dict[str, Any] | None = None,
size: str | None = None,
storage_class: str | None = None,
url: str | None = None,
content_type: str | None = None,
access_modes: str | None = None,
volume_mode: str | None = None,
cert_configmap: str | None = None,
secret: Secret | None = None,
hostpath_node: str | None = None,
source_pvc: str | None = None,
source_namespace: str | None = None,
source_ref: dict[str, Any] | None = None,
multus_annotation: str | None = None,
bind_immediate_annotation: bool | None = None,
preallocation: bool | None = None,
api_name: str | None = "pvc",
checkpoints: list[Any] | None = None,
final_checkpoint: bool | None = None,
priority_class_name: str | None = None,
**kwargs: Any,
) -> None:
"""
DataVolume object

Args:
name (str): DataVolume name.
namespace (str): DataVolume namespace.
source (str): source of DV - upload/http/pvc/registry.
size (str): DataVolume size - format size+size unit, for example: "5Gi".
source (str, default: None): source of DV - upload/http/pvc/registry/blank.
source_dict (dict[str, Any], default: None): DataVolume.source dictionary.
size (str, default: None): DataVolume size - format size+size unit, for example: "5Gi".
storage_class (str, default: None): storage class name for DataVolume.
url (str, default: None): url for importing DV, when source is http/registry.
content_type (str, default: "kubevirt"): DataVolume content type.
access_modes (str, default: None): DataVolume access mode.
content_type (str, default: None): DataVolume content type (e.g., "kubevirt", "archive").
access_modes (str, default: None): DataVolume access mode (e.g., "ReadWriteOnce", "ReadWriteMany").
volume_mode (str, default: None): DataVolume volume mode (e.g., "Filesystem", "Block").
cert_configmap (str, default: None): name of config map for TLS certificates.
secret (Secret, default: None): to be set as secretRef.
client (DynamicClient): DynamicClient to use.
volume_mode (str, default: None): DataVolume volume mode.
hostpath_node (str, default: None): Node name to provision the DV on.
source_pvc (str, default: None): PVC name for when cloning the DV.
source_namespace (str, default: None): PVC namespace for when cloning the DV.
source_ref (dict[str, Any], default: None): SourceRef is an indirect reference to the source of data for the
requested DataVolume. Currently only "DataSource" is supported. Fields: kind (str), name (str), namespace (str)
multus_annotation (str, default: None): network nad name.
bind_immediate_annotation (bool, default: None): when WaitForFirstConsumer is set in StorageClass and DV
should be bound immediately.
bind_immediate_annotation (bool, default: None): when WaitForFirstConsumer is set in StorageClass and DV
should be bound immediately.
preallocation (bool, default: None): preallocate disk space.
teardown (bool, default: True): Indicates if this resource would need to be deleted.
yaml_file (yaml, default: None): yaml file for the resource.
delete_timeout (int, default: 4 minutes): timeout associated with delete action.
api_name (str, default: "pvc"): api used for DV, pvc/storage
delete_after_completion (str, default: None): annotation for garbage collector - "true"/"false"
api_name (str, default: "pvc"): api used for DV (e.g., "storage", "pvc").
checkpoints (list[Any], default: None): list of DataVolumeCheckpoints for snapshot operations.
final_checkpoint (bool, default: None): indicates whether the current DataVolumeCheckpoint is the final one.
priority_class_name (str, default: None): priority class name for the DataVolume pod.
"""
super().__init__(
name=name,
namespace=namespace,
client=client,
teardown=teardown,
yaml_file=yaml_file,
delete_timeout=delete_timeout,
**kwargs,
)
super().__init__(**kwargs)
self.source = source
self.url = url
self.cert_configmap = cert_configmap
Expand All @@ -145,62 +141,98 @@ def __init__(
self.hostpath_node = hostpath_node
self.source_pvc = source_pvc
self.source_namespace = source_namespace
self.source_dict = source_dict
self.source_ref = source_ref
self.multus_annotation = multus_annotation
self.bind_immediate_annotation = bind_immediate_annotation
self.preallocation = preallocation
self.api_name = api_name
self.delete_after_completion = delete_after_completion
self.checkpoints = checkpoints
self.final_checkpoint = final_checkpoint
self.priority_class_name = priority_class_name

def to_dict(self) -> None:
super().to_dict()
if not self.kind_dict and not self.yaml_file:
self.res.update({
"spec": {
"source": {self.source: {"url": self.url}},
self.api_name: {
"resources": {"requests": {"storage": self.size}},
},
}
})
if self.access_modes:
self.res["spec"][self.api_name]["accessModes"] = [self.access_modes]
if self.content_type:
self.res["spec"]["contentType"] = self.content_type
if self.storage_class:
self.res["spec"][self.api_name]["storageClassName"] = self.storage_class
if self.secret:
self.res["spec"]["source"][self.source]["secretRef"] = self.secret.name
if self.volume_mode:
self.res["spec"][self.api_name]["volumeMode"] = self.volume_mode
if self.source == "http" or "registry":
self.res["spec"]["source"][self.source]["url"] = self.url
if self.cert_configmap:
self.res["spec"]["source"][self.source]["certConfigMap"] = self.cert_configmap
if self.source == "upload" or self.source == "blank":
self.res["spec"]["source"][self.source] = {}
self.res["spec"] = {}
_spec = self.res["spec"]

if self.checkpoints is not None:
_spec["checkpoints"] = self.checkpoints

if self.content_type is not None:
_spec["contentType"] = self.content_type

if self.final_checkpoint is not None:
_spec["finalCheckpoint"] = self.final_checkpoint

if self.preallocation is not None:
_spec["preallocation"] = self.preallocation

if self.priority_class_name is not None:
_spec["priorityClassName"] = self.priority_class_name

# Set api_name spec fields (pvc/storage)
if self.api_name is not None:
_spec[self.api_name] = {}

if self.access_modes is not None:
_spec[self.api_name]["accessModes"] = [self.access_modes]

if self.volume_mode is not None:
_spec[self.api_name]["volumeMode"] = self.volume_mode

if self.storage_class is not None:
_spec[self.api_name]["storageClassName"] = self.storage_class

if self.size is not None:
_spec[self.api_name]["resources"] = {"requests": {"storage": self.size}}

# Handle source configuration
if self.source_dict is not None:
_spec["source"] = self.source_dict
elif self.source is not None:
warn(
"source is deprecated and will be removed in the next version. Use source_dict instead.",
DeprecationWarning,
stacklevel=2,
)

_spec["source"] = {}
source_spec = _spec["source"]

if self.source in ["http", "registry"]:
source_spec[self.source] = {"url": self.url}
elif self.source in ["upload", "blank"]:
source_spec[self.source] = {}
elif self.source == "pvc":
source_spec[self.source] = {
"name": self.source_pvc,
"namespace": self.source_namespace or self.namespace,
}

if self.secret is not None:
source_spec[self.source]["secretRef"] = self.secret.name
if self.cert_configmap is not None:
source_spec[self.source]["certConfigMap"] = self.cert_configmap

if self.source_ref is not None:
_spec["sourceRef"] = self.source_ref

if self.hostpath_node:
self.res["metadata"].setdefault("annotations", {}).update({
f"{NamespacedResource.ApiGroup.KUBEVIRT_IO}/provisionOnNode": (self.hostpath_node)
})

if self.multus_annotation:
self.res["metadata"].setdefault("annotations", {}).update({
f"{NamespacedResource.ApiGroup.K8S_V1_CNI_CNCF_IO}/networks": (self.multus_annotation)
})

if self.bind_immediate_annotation:
self.res["metadata"].setdefault("annotations", {}).update({
f"{self.api_group}/storage.bind.immediate.requested": "true"
})
if self.source == "pvc":
self.res["spec"]["source"]["pvc"] = {
"name": self.source_pvc or "dv-source",
"namespace": self.source_namespace or self.namespace,
}
if self.preallocation is not None:
self.res["spec"]["preallocation"] = self.preallocation
if self.delete_after_completion:
self.res["metadata"].setdefault("annotations", {}).update({
f"{self.api_group}/storage.deleteAfterCompletion": (self.delete_after_completion)
})

def wait_deleted(self, timeout=TIMEOUT_4MINUTES):
"""
Expand Down