diff --git a/CHANGELOG.md b/CHANGELOG.md index d21fceba..d5b99c68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file. This has the effect that applications where the `spark-submit` Pod fails are not resubmitted. Previously, Jobs were retried at most 6 times by default ([#647]). - Support for Spark `3.5.8` ([#650]). +- First class support for S3 on Spark connect clusters ([#652]). ### Fixed @@ -41,6 +42,7 @@ All notable changes to this project will be documented in this file. [#649]: https://github.com/stackabletech/spark-k8s-operator/pull/649 [#650]: https://github.com/stackabletech/spark-k8s-operator/pull/650 [#651]: https://github.com/stackabletech/spark-k8s-operator/pull/651 +[#652]: https://github.com/stackabletech/spark-k8s-operator/pull/652 [#655]: https://github.com/stackabletech/spark-k8s-operator/pull/655 [#656]: https://github.com/stackabletech/spark-k8s-operator/pull/656 diff --git a/Cargo.nix b/Cargo.nix index 7421b0cb..f6500787 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -9906,6 +9906,10 @@ rec { packageId = "futures 0.3.31"; features = [ "compat" ]; } + { + name = "indoc"; + packageId = "indoc"; + } { name = "product-config"; packageId = "product-config"; diff --git a/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc b/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc index af9d5a79..f7e29293 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc @@ -47,6 +47,56 @@ spec: The example above adds a new endpoint for application metrics. +== Connect to S3 + +There are multiple ways to connect a Spark Connect server to S3: + +1. Configure a S3 connection using the `s3connections.stackable.tech` resource. + Use this method when you are sure all clients are equally allowed to access all buckets on the given connection. +2. Configure a list of S3 buckers using the `s3buckets.stackable.tech` resources. + Use this when you want to restrict access to specific buckets but don't need to set up different permissions for different clients. +3. A combination of both. + +For more details on how the Stackable Data Platform manages S3 resources see the xref:concepts:s3.adoc[S3 resources] page. + +In the simplest case, you can just set up a S3 connection and the Spark Connect server will automatically make it available to clients. The example below demonstrates how to set up a Spark Connect server with a S3 connection. + +```yaml +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkConnectServer +metadata: + name: spark-connect +spec: + image: + productVersion: 4.1.1 + connectors: + s3connection: + - reference: s3-connection +... +``` + +In a more complex use case, clients can read data from the two buckets on AWS, transform it and write to a local corporate S3 instance. +The example below demonstrates how to set up a Spark Connect server with with two S3 buckets and a connection. + +```yaml +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkConnectServer +metadata: + name: spark-connect +spec: + image: + productVersion: 4.1.1 + connectors: + s3buckets: + - reference: aws-cust-1-ingest + - reference: aws-cust-2-ingest + s3connection: + - reference: corporate-s3-connection +... +``` + == Spark History Server Unfortunately integration with the Spark History Server is not supported yet. @@ -57,7 +107,6 @@ The connect server seems to ignore the `spark.eventLog` properties while also pr The following features are not supported by the Stackable Spark operator yet * Authorization and authentication. Currently, anyone with access to the Spark Connect service can run jobs. -* S3Connection references are not supported yet * Volumes and volume mounts can be added only with pod overrides. * Job dependencies must be provisioned as custom images or via `--packages` or `--jars` arguments. diff --git a/extra/crds.yaml b/extra/crds.yaml index 14a9c9a0..95a19e26 100644 --- a/extra/crds.yaml +++ b/extra/crds.yaml @@ -2297,6 +2297,333 @@ spec: and `stopped` will take no effect until `reconciliationPaused` is set to false or removed. type: boolean type: object + connectors: + default: + s3buckets: [] + description: One or more S3 connections to be used by the Spark Connect server. + properties: + s3buckets: + default: [] + items: + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: |- + S3 bucket specification containing the bucket name and an inlined or referenced connection specification. + Learn more on the [S3 concept documentation](https://docs.stackable.tech/home/nightly/concepts/s3). + properties: + bucketName: + description: The name of the S3 bucket. + type: string + connection: + description: The definition of an S3 connection, either inline or as a reference. + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: |- + S3 connection definition as a resource. + Learn more on the [S3 concept documentation](https://docs.stackable.tech/home/nightly/concepts/s3). + properties: + accessStyle: + default: VirtualHosted + description: |- + Which access style to use. + Defaults to virtual hosted-style as most of the data products out there. + Have a look at the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html). + enum: + - Path + - VirtualHosted + type: string + credentials: + description: |- + If the S3 uses authentication you have to specify you S3 credentials. + In the most cases a [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) + providing `accessKey` and `secretKey` is sufficient. + nullable: true + properties: + scope: + description: |- + [Scope](https://docs.stackable.tech/home/nightly/secret-operator/scope) of the + [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass). + nullable: true + properties: + listenerVolumes: + default: [] + description: |- + The listener volume scope allows Node and Service scopes to be inferred from the applicable listeners. + This must correspond to Volume names in the Pod that mount Listeners. + items: + type: string + type: array + node: + default: false + description: |- + The node scope is resolved to the name of the Kubernetes Node object that the Pod is running on. + This will typically be the DNS name of the node. + type: boolean + pod: + default: false + description: |- + The pod scope is resolved to the name of the Kubernetes Pod. + This allows the secret to differentiate between StatefulSet replicas. + type: boolean + services: + default: [] + description: |- + The service scope allows Pod objects to specify custom scopes. + This should typically correspond to Service objects that the Pod participates in. + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) containing the LDAP bind credentials.' + type: string + required: + - secretClass + type: object + host: + description: 'Host of the S3 server without any protocol or port. For example: `west1.my-cloud.com`.' + type: string + port: + description: |- + Port the S3 server listens on. + If not specified the product will determine the port to use. + format: uint16 + maximum: 65535.0 + minimum: 0.0 + nullable: true + type: integer + region: + default: + name: us-east-1 + description: |- + Bucket region used for signing headers (sigv4). + + This defaults to `us-east-1` which is compatible with other implementations such as Minio. + + WARNING: Some products use the Hadoop S3 implementation which falls back to us-east-2. + properties: + name: + default: us-east-1 + type: string + type: object + tls: + description: Use a TLS connection. If not specified no TLS will be used. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client. + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates. + type: object + server: + description: Use TLS and a CA certificate to verify the server. + properties: + caCert: + description: CA cert to verify the server. + oneOf: + - required: + - webPki + - required: + - secretClass + properties: + secretClass: + description: |- + Name of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) which will provide the CA certificate. + Note that a SecretClass does not need to have a key but can also work with just a CA certificate, + so if you got provided with a CA cert but don't have access to the key you can still use this method. + type: string + webPki: + description: |- + Use TLS and the CA certificates trusted by the common web browsers to verify the server. + This can be useful when you e.g. use public AWS S3 or other public available services. + type: object + type: object + required: + - caCert + type: object + type: object + required: + - verification + type: object + required: + - host + type: object + reference: + type: string + type: object + required: + - bucketName + - connection + type: object + reference: + type: string + type: object + type: array + s3connection: + nullable: true + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: |- + S3 connection definition as a resource. + Learn more on the [S3 concept documentation](https://docs.stackable.tech/home/nightly/concepts/s3). + properties: + accessStyle: + default: VirtualHosted + description: |- + Which access style to use. + Defaults to virtual hosted-style as most of the data products out there. + Have a look at the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html). + enum: + - Path + - VirtualHosted + type: string + credentials: + description: |- + If the S3 uses authentication you have to specify you S3 credentials. + In the most cases a [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) + providing `accessKey` and `secretKey` is sufficient. + nullable: true + properties: + scope: + description: |- + [Scope](https://docs.stackable.tech/home/nightly/secret-operator/scope) of the + [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass). + nullable: true + properties: + listenerVolumes: + default: [] + description: |- + The listener volume scope allows Node and Service scopes to be inferred from the applicable listeners. + This must correspond to Volume names in the Pod that mount Listeners. + items: + type: string + type: array + node: + default: false + description: |- + The node scope is resolved to the name of the Kubernetes Node object that the Pod is running on. + This will typically be the DNS name of the node. + type: boolean + pod: + default: false + description: |- + The pod scope is resolved to the name of the Kubernetes Pod. + This allows the secret to differentiate between StatefulSet replicas. + type: boolean + services: + default: [] + description: |- + The service scope allows Pod objects to specify custom scopes. + This should typically correspond to Service objects that the Pod participates in. + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) containing the LDAP bind credentials.' + type: string + required: + - secretClass + type: object + host: + description: 'Host of the S3 server without any protocol or port. For example: `west1.my-cloud.com`.' + type: string + port: + description: |- + Port the S3 server listens on. + If not specified the product will determine the port to use. + format: uint16 + maximum: 65535.0 + minimum: 0.0 + nullable: true + type: integer + region: + default: + name: us-east-1 + description: |- + Bucket region used for signing headers (sigv4). + + This defaults to `us-east-1` which is compatible with other implementations such as Minio. + + WARNING: Some products use the Hadoop S3 implementation which falls back to us-east-2. + properties: + name: + default: us-east-1 + type: string + type: object + tls: + description: Use a TLS connection. If not specified no TLS will be used. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client. + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates. + type: object + server: + description: Use TLS and a CA certificate to verify the server. + properties: + caCert: + description: CA cert to verify the server. + oneOf: + - required: + - webPki + - required: + - secretClass + properties: + secretClass: + description: |- + Name of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) which will provide the CA certificate. + Note that a SecretClass does not need to have a key but can also work with just a CA certificate, + so if you got provided with a CA cert but don't have access to the key you can still use this method. + type: string + webPki: + description: |- + Use TLS and the CA certificates trusted by the common web browsers to verify the server. + This can be useful when you e.g. use public AWS S3 or other public available services. + type: object + type: object + required: + - caCert + type: object + type: object + required: + - verification + type: object + required: + - host + type: object + reference: + type: string + type: object + type: object executor: description: Spark Connect executor properties. nullable: true diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index a7d48b23..1c7e1073 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -25,6 +25,7 @@ tracing-futures.workspace = true clap.workspace = true futures.workspace = true tokio.workspace = true +indoc.workspace = true [dev-dependencies] indoc.workspace = true diff --git a/rust/operator-binary/src/connect/common.rs b/rust/operator-binary/src/connect/common.rs index 78ca6e87..06128e83 100644 --- a/rust/operator-binary/src/connect/common.rs +++ b/rust/operator-binary/src/connect/common.rs @@ -96,7 +96,7 @@ pub(crate) fn jvm_args( // Merges server and executor properties and renders the contents // of the Spark properties file. pub(crate) fn spark_properties( - props: &[BTreeMap>; 2], + props: &[BTreeMap>], ) -> Result { let mut result = BTreeMap::new(); for p in props { diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index 9100852b..922f3f49 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -21,7 +21,7 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use super::crd::{CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, v1alpha1}; use crate::{ Ctx, - connect::{common, crd::SparkConnectServerStatus, executor, server, service}, + connect::{common, crd::SparkConnectServerStatus, executor, s3, server, service}, crd::constants::{OPERATOR_NAME, SPARK_IMAGE_BASE_NAME}, }; @@ -142,6 +142,12 @@ pub enum Error { ResolveProductImage { source: product_image_selection::Error, }, + + #[snafu(display("failed to resolve S3 connections for SparkConnectServer {name:?}"))] + ResolveS3Connections { source: s3::Error, name: String }, + + #[snafu(display("failed to build connect server S3 properties"))] + S3SparkProperties { source: crate::connect::s3::Error }, } type Result = std::result::Result; @@ -186,6 +192,13 @@ pub async fn reconcile( .resolve(SPARK_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION) .context(ResolveProductImageSnafu)?; + // Resolve any S3 connections early to fail fast if there are issues. + let resolved_s3 = s3::ResolvedS3::resolve(client, scs) + .await + .with_context(|_| ResolveS3ConnectionsSnafu { + name: scs.name_unchecked(), + })?; + // Use a dedicated service account for connect server pods. let (service_account, role_binding) = build_rbac_resources( scs, @@ -229,6 +242,9 @@ pub async fn reconcile( // Server config map let spark_props = common::spark_properties(&[ + resolved_s3 + .spark_properties() + .context(S3SparkPropertiesSnafu)?, server::server_properties( scs, &server_config, @@ -263,6 +279,7 @@ pub async fn reconcile( &executor_config, &resolved_product_image, &executor_config_map, + &resolved_s3, ) .context(ExecutorPodTemplateSnafu)?, ) @@ -308,6 +325,7 @@ pub async fn reconcile( &server_config_map, &applied_listener.name_any(), args, + &resolved_s3, ) .context(BuildServerStatefulSetSnafu)?; diff --git a/rust/operator-binary/src/connect/crd.rs b/rust/operator-binary/src/connect/crd.rs index d9d138ba..6fe36bc0 100644 --- a/rust/operator-binary/src/connect/crd.rs +++ b/rust/operator-binary/src/connect/crd.rs @@ -15,6 +15,7 @@ use stackable_operator::{ fragment::{self, Fragment, ValidationError}, merge::Merge, }, + crd::s3, deep_merger::ObjectOverrides, k8s_openapi::{api::core::v1::PodAntiAffinity, apimachinery::pkg::api::resource::Quantity}, kube::{CustomResource, ResourceExt}, @@ -67,7 +68,6 @@ pub enum Error { ) )] pub mod versioned { - /// An Apache Spark Connect server component. This resource is managed by the Stackable operator /// for Apache Spark. Find more information on how to use it in the /// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/connect-server). @@ -87,6 +87,10 @@ pub mod versioned { #[serde(default)] pub cluster_operation: ClusterOperation, + /// One or more S3 connections to be used by the Spark Connect server. + #[serde(default)] + connectors: Connectors, + // Docs are on the ObjectOverrides struct #[serde(default)] pub object_overrides: ObjectOverrides, @@ -184,6 +188,14 @@ pub mod versioned { #[fragment_attrs(serde(default))] pub requested_secret_lifetime: Option, } + + #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Deserialize, Serialize)] + struct Connectors { + #[serde(default)] + pub s3buckets: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub s3connection: Option, + } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -391,3 +403,80 @@ impl v1alpha1::ExecutorConfig { } } } +#[cfg(test)] +mod tests { + use indoc::indoc; + + use super::*; + + #[test] + fn test_cr_minimal_deserialization() { + let _spark_connect_cr = serde_yaml::from_str::(indoc! { r#" + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkConnectServer + metadata: + name: spark-connect + spec: + image: + productVersion: 4.1.1 + "# }) + .expect("Failed to deserialize minimal SparkConnectServer CR"); + } + + #[test] + fn test_cr_s3_ref_deserialization() { + let input = indoc! { r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkConnectServer + metadata: + name: spark-connect + spec: + image: + productVersion: 4.1.1 + connectors: + s3: + - reference: my-s3-bucket + "# }; + + let deserializer = serde_yaml::Deserializer::from_str(input); + let _spark_connect_cr: v1alpha1::SparkConnectServer = + serde_yaml::with::singleton_map_recursive::deserialize(deserializer) + .expect("Failed to deserialize SparkConnectServer with S3 connectors CR"); + } + + #[test] + fn test_cr_s3_inline_deserialization() { + let input = indoc! { r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkConnectServer + metadata: + name: spark-connect + spec: + image: + productVersion: 4.1.1 + connectors: + s3: + - inline: + bucketName: mybucket + connection: + inline: + host: minio + port: 9000 + accessStyle: Path + credentials: + secretClass: minio-credentials-class + tls: + verification: + server: + caCert: + secretClass: minio-tls-ca + "# }; + + let deserializer = serde_yaml::Deserializer::from_str(input); + let _spark_connect_cr: v1alpha1::SparkConnectServer = + serde_yaml::with::singleton_map_recursive::deserialize(deserializer) + .expect("Failed to deserialize SparkConnectServer with S3 connectors CR"); + } +} diff --git a/rust/operator-binary/src/connect/executor.rs b/rust/operator-binary/src/connect/executor.rs index 2aba7e49..c914ab2c 100644 --- a/rust/operator-binary/src/connect/executor.rs +++ b/rust/operator-binary/src/connect/executor.rs @@ -14,7 +14,7 @@ use stackable_operator::{ }, k8s_openapi::{ DeepMerge, - api::core::v1::{ConfigMap, EnvVar, PodTemplateSpec}, + api::core::v1::{ConfigMap, EnvVar, PodSecurityContext, PodTemplateSpec}, }, kube::{ResourceExt, runtime::reflector::ObjectRef}, product_logging::framework::calculate_log_volume_size_limit, @@ -26,7 +26,7 @@ use super::{ crd::{DEFAULT_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer}, }; use crate::{ - connect::{common, crd::v1alpha1}, + connect::{common, crd::v1alpha1, s3}, crd::constants::{ JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, @@ -85,6 +85,15 @@ pub enum Error { source: builder::configmap::Error, cm_name: String, }, + + #[snafu(display("failed to add S3 secret or tls volume mounts to executors"))] + AddS3VolumeMount { source: s3::Error }, + + #[snafu(display("failed to add S3 secret volumes to executors"))] + AddS3Volume { source: s3::Error }, + + #[snafu(display("failed to create the init container for the S3 truststore"))] + TrustStoreInitContainer { source: s3::Error }, } // The executor pod template can contain only a handful of properties. @@ -102,6 +111,7 @@ pub fn executor_pod_template( config: &v1alpha1::ExecutorConfig, resolved_product_image: &ResolvedProductImage, config_map: &ConfigMap, + resolved_s3: &s3::ResolvedS3, ) -> Result { let container_env = executor_env( scs.spec @@ -118,6 +128,13 @@ pub fn executor_pod_template( .add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG) .context(AddVolumeMountSnafu)? .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG) + .context(AddVolumeMountSnafu)? + .add_volume_mounts( + resolved_s3 + .volumes_and_mounts() + .context(AddS3VolumeMountSnafu)? + .1, + ) .context(AddVolumeMountSnafu)?; let metadata = ObjectMetaBuilder::new() @@ -148,7 +165,28 @@ pub fn executor_pod_template( .with_config_map(config_map.name_unchecked()) .build(), ) - .context(AddVolumeSnafu)?; + .context(AddVolumeSnafu)? + .add_volumes( + resolved_s3 + .volumes_and_mounts() + .context(AddS3VolumeSnafu)? + .0, + ) + .context(AddVolumeSnafu)? + // This is needed for shared enpryDir volumes with other containers like the truststore + // init container. + .security_context(PodSecurityContext { + fs_group: Some(1000), + ..PodSecurityContext::default() + }); + + // S3: Add truststore init container for S3 endpoint communication with TLS. + if let Some(truststore_init_container) = resolved_s3 + .truststore_init_container(resolved_product_image.clone()) + .context(TrustStoreInitContainerSnafu)? + { + template.add_init_container(truststore_init_container); + } if let Some(cm_name) = config.log_config_map() { container diff --git a/rust/operator-binary/src/connect/mod.rs b/rust/operator-binary/src/connect/mod.rs index 1692e6f5..daab765f 100644 --- a/rust/operator-binary/src/connect/mod.rs +++ b/rust/operator-binary/src/connect/mod.rs @@ -2,6 +2,7 @@ mod common; pub mod controller; pub mod crd; mod executor; +mod s3; pub mod server; mod service; diff --git a/rust/operator-binary/src/connect/s3.rs b/rust/operator-binary/src/connect/s3.rs new file mode 100644 index 00000000..15626048 --- /dev/null +++ b/rust/operator-binary/src/connect/s3.rs @@ -0,0 +1,913 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_operator::{ + commons::{ + product_image_selection::ResolvedProductImage, secret_class::SecretClassVolumeError, + }, + crd::s3::{self, v1alpha1::S3AccessStyle}, + k8s_openapi::api::core::v1::{Volume, VolumeMount}, +}; + +use crate::{ + connect::crd, + crd::{ + constants::{ + STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, STACKABLE_TRUST_STORE_NAME, + }, + tlscerts, + }, +}; + +#[derive(Snafu, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("failed to resolve S3 connection"))] + ResolveS3Connection { + source: s3::v1alpha1::ConnectionError, + }, + + #[snafu(display("failed to resolve S3 bucket"))] + ResolveS3Bucket { source: s3::v1alpha1::BucketError }, + + #[snafu(display("missing namespace"))] + MissingNamespace, + + #[snafu(display("failed to get endpoint for S3 connection"))] + S3ConnectionEndpoint { + source: s3::v1alpha1::ConnectionError, + }, + + #[snafu(display("failed to get endpoint for S3 bucket {bucket_name:?}"))] + BucketEndpoint { + bucket_name: String, + source: s3::v1alpha1::ConnectionError, + }, + + #[snafu(display( + "failed to create secret volume for S3 bucket with secret class {secret_class:?}" + ))] + S3SecretVolume { + secret_class: String, + source: SecretClassVolumeError, + }, + + #[snafu(display("failed to get volumes and mounts for S3 connection"))] + ConnectionVolumesAndMounts { + source: s3::v1alpha1::ConnectionError, + }, +} + +pub(crate) struct ResolvedS3 { + s3_buckets: Vec, + s3_connection: Option, +} + +impl ResolvedS3 { + pub(crate) async fn resolve( + client: &stackable_operator::client::Client, + connect_server: &crd::v1alpha1::SparkConnectServer, + ) -> Result { + let mut s3_buckets = Vec::new(); + let namespace = connect_server + .metadata + .namespace + .as_ref() + .context(MissingNamespaceSnafu)?; + for bucket in connect_server.spec.connectors.s3buckets.iter() { + let resolved_bucket = bucket + .clone() + .resolve(client, namespace) + .await + .context(ResolveS3BucketSnafu)?; + + s3_buckets.push(resolved_bucket); + } + + let s3_connection = match connect_server.spec.connectors.s3connection.clone() { + Some(conn) => Some( + conn.resolve(client, namespace) + .await + .context(ResolveS3ConnectionSnafu)?, + ), + None => None, + }; + + Ok(ResolvedS3 { + s3_buckets, + s3_connection, + }) + } + + // Generate Spark properties for the resolved S3 buckets. + // Properties are generated "per bucket" using the prefix: spark.hadoop.fs.s3a.bucket.{bucket_name}. + pub(crate) fn spark_properties(&self) -> Result>, Error> { + let mut result = BTreeMap::new(); + + // -------------------------------------------------------------------------------- + // Add global connection properties if a connection is defined. + // -------------------------------------------------------------------------------- + if let Some(conn) = &self.s3_connection { + result.insert( + "spark.hadoop.fs.s3a.endpoint".to_string(), + Some( + conn.endpoint() + .context(S3ConnectionEndpointSnafu)? + .to_string(), + ), + ); + result.insert( + "spark.hadoop.fs.s3a.path.style.access".to_string(), + Some((conn.access_style == S3AccessStyle::Path).to_string()), + ); + result.insert( + "spark.hadoop.fs.s3a.endpoint.region".to_string(), + Some(conn.region.name.clone()), + ); + if let Some((access_key_file_path, secret_key_file_path)) = + conn.credentials_mount_paths() + { + result.insert( + "spark.hadoop.fs.s3a.access.key".to_string(), + Some(format!("${{file:UTF-8:{access_key_file_path}}}")), + ); + result.insert( + "spark.hadoop.fs.s3a.secret.key".to_string(), + Some(format!("${{file:UTF-8:{secret_key_file_path}}}")), + ); + result.insert( + "spark.hadoop.fs.s3a.aws.credentials.provider".to_string(), + Some("org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider".to_string()), + ); + } else { + result.insert( + "spark.hadoop.fs.s3a.aws.credentials.provider".to_string(), + Some("org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider".to_string()), + ); + } + } + + // -------------------------------------------------------------------------------- + // Add per-bucket properties for all the buckets. + // -------------------------------------------------------------------------------- + for bucket in &self.s3_buckets { + let bucket_name = bucket.bucket_name.clone(); + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.endpoint"), + Some( + bucket + .connection + .endpoint() + .with_context(|_| BucketEndpointSnafu { + bucket_name: bucket_name.clone(), + })? + .to_string(), + ), + ); + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.path.style.access"), + Some((bucket.connection.access_style == S3AccessStyle::Path).to_string()), + ); + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.endpoint.region"), + Some(bucket.connection.region.name.clone()), + ); + if let Some((access_key_file_path, secret_key_file_path)) = + bucket.connection.credentials_mount_paths() + { + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.access.key"), + Some(format!("${{file:UTF-8:{access_key_file_path}}}")), + ); + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.secret.key"), + Some(format!("${{file:UTF-8:{secret_key_file_path}}}")), + ); + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.aws.credentials.provider"), + Some("org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider".to_string()), + ); + } else { + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.aws.credentials.provider"), + Some("org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider".to_string()), + ); + } + } + + // Add any extra properties needed for TLS configuration. + if let Some(extra_tls_properties) = self.extra_java_options() { + result.extend(extra_tls_properties); + } + + Ok(result) + } + + // Build the list of volumes and mounts needed for the S3 credentials and/or TLS trust stores. + pub(crate) fn volumes_and_mounts(&self) -> Result<(Vec, Vec), Error> { + // Ensures that there are no duplicate volumes or mounts across buckets. + let mut volumes_by_name = BTreeMap::new(); + let mut mounts_by_name = BTreeMap::new(); + + // -------------------------------------------------------------------------------- + // Connection volumes and mounts. + // -------------------------------------------------------------------------------- + if let Some(conn) = &self.s3_connection { + let (conn_volumes, conn_mounts) = conn + .volumes_and_mounts() + .context(ConnectionVolumesAndMountsSnafu)?; + + for volume in conn_volumes.iter() { + let volume_name = volume.name.clone(); + volumes_by_name.entry(volume_name).or_insert(volume.clone()); + } + for mount in conn_mounts.iter() { + let mount_name = mount.name.clone(); + mounts_by_name.entry(mount_name).or_insert(mount.clone()); + } + } + + // -------------------------------------------------------------------------------- + // Per-bucket volumes and mounts. + // -------------------------------------------------------------------------------- + for bucket in self.s3_buckets.iter() { + let (bucket_volumes, bucket_mounts) = bucket + .connection + .volumes_and_mounts() + .context(ConnectionVolumesAndMountsSnafu)?; + + for volume in bucket_volumes.iter() { + let volume_name = volume.name.clone(); + volumes_by_name.entry(volume_name).or_insert(volume.clone()); + } + for mount in bucket_mounts.iter() { + let mount_name = mount.name.clone(); + mounts_by_name.entry(mount_name).or_insert(mount.clone()); + } + } + + // -------------------------------------------------------------------------------- + // Trust store volume and mount. + // This is where the the trust store built by `cert-tools` in the init container is stored. + // -------------------------------------------------------------------------------- + volumes_by_name + .entry(STACKABLE_TRUST_STORE_NAME.to_string()) + .or_insert_with(|| Volume { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + empty_dir: Some(Default::default()), + ..Default::default() + }); + mounts_by_name + .entry(STACKABLE_TRUST_STORE_NAME.to_string()) + .or_insert_with(|| VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }); + + Ok(( + volumes_by_name.into_values().collect(), + mounts_by_name.into_values().collect(), + )) + } + + pub(crate) fn truststore_init_container( + &self, + image: ResolvedProductImage, + ) -> Result, Error> { + if let Some(command) = self.truststore_init_container_command() { + let (_, volume_mounts) = self.volumes_and_mounts()?; + Ok(Some( + stackable_operator::k8s_openapi::api::core::v1::Container { + name: "tls-truststore-init".to_string(), + image: Some(image.image.clone()), + command: Some(vec![ + "/bin/bash".to_string(), + "-x".to_string(), + "-euo".to_string(), + "pipefail".to_string(), + "-c".to_string(), + command, + ]), + volume_mounts: Some(volume_mounts), + ..Default::default() + }, + )) + } else { + Ok(None) + } + } + + // List of paths to ca.crt files mounted by the secret classes. + fn secret_class_tls_ca_paths(&self) -> Vec { + // Ensure that CA paths are unique across all buckets and the connection. + let paths: BTreeSet = self + .s3_buckets + .iter() + .flat_map(|bucket| bucket.connection.tls.tls_ca_cert_mount_path()) + .chain( + self.s3_connection + .iter() + .flat_map(|conn| conn.tls.tls_ca_cert_mount_path().into_iter()), + ) + .collect(); + + paths.into_iter().collect() + } + + // Builds the command that generates a truststore from the system trust store + // and any additional CA certs provided by the user through secret classes. + // + // IMPORTANT: We assume the CA certs are in PEM format because we know that `s3::v1alpha1::ConnectionSpec::volumes_and_mounts()` + // does NOT set the format annotation on the secret volumes. + // + fn truststore_init_container_command(&self) -> Option { + let input_ca_paths: Vec = self.secret_class_tls_ca_paths(); + + let out_truststore_path = format!("{STACKABLE_TRUST_STORE}/truststore.p12"); + + if input_ca_paths.is_empty() { + None + } else { + Some( + Some(tlscerts::convert_system_trust_store_to_pkcs12()) + .into_iter() + .chain(input_ca_paths.iter().map(|path| format!("cert-tools generate-pkcs12-truststore --out {out_truststore_path} --out-password {STACKABLE_TLS_STORE_PASSWORD} --pkcs12 {out_truststore_path}:{STACKABLE_TLS_STORE_PASSWORD} --pem {path}"))) + .collect::>() + .join(" && ")) + } + } + + fn extra_java_options(&self) -> Option>> { + if self.truststore_init_container_command().is_some() { + let mut ssl_options = BTreeMap::new(); + ssl_options.insert( + "-Djavax.net.ssl.trustStore".to_string(), + format!("{STACKABLE_TRUST_STORE}/truststore.p12"), + ); + ssl_options.insert( + "-Djavax.net.ssl.trustStorePassword".to_string(), + STACKABLE_TLS_STORE_PASSWORD.to_string(), + ); + ssl_options.insert( + "-Djavax.net.ssl.trustStoreType".to_string(), + "pkcs12".to_string(), + ); + + let ssl_options_str = ssl_options + .into_iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(" "); + + Some(BTreeMap::from([ + ( + "spark.driver.extraJavaOptions".to_string(), + Some(ssl_options_str.clone()), + ), + ( + "spark.executor.extraJavaOptions".to_string(), + Some(ssl_options_str), + ), + ])) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use product_config::writer::to_java_properties_string; + use rstest::*; + use stackable_operator::commons::{ + secret_class::SecretClassVolume, + tls_verification::{CaCert, Tls, TlsClientDetails, TlsServerVerification, TlsVerification}, + }; + + use super::*; + + #[rstest] + #[case("no connection and no buckets", + ResolvedS3 { + s3_buckets: vec![], + s3_connection: None, + }, + vec![STACKABLE_TRUST_STORE_NAME.to_string(),], + vec![VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }], + None, + )] + #[case("connection without credentials and without tls", + ResolvedS3 { + s3_buckets: vec![], + s3_connection: Some(s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: None, + tls: TlsClientDetails { + tls: None, + }, + }) + }, + vec![STACKABLE_TRUST_STORE_NAME.to_string()], + vec![VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }], + None, + )] + #[case("connection with credentials and no tls", + ResolvedS3 { + s3_buckets: vec![], + s3_connection: Some(s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "connection-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: None, + }, + }) + }, + vec!["connection-secret-class-s3-credentials".to_string(), + STACKABLE_TRUST_STORE_NAME.to_string()], + vec![ + VolumeMount { + name: "connection-secret-class-s3-credentials".to_string(), + mount_path: "/stackable/secrets/connection-secret-class".to_string(), + ..Default::default() + }, + VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }], + None, + )] + #[case("connection with credentials and tls", + ResolvedS3 { + s3_buckets: vec![], + s3_connection: Some(s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "connection-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: Some(Tls { + verification: TlsVerification::Server( + TlsServerVerification{ + ca_cert: CaCert::SecretClass("connection-tls-ca-secret-class".to_string()), + }) + }), + }, + }) + }, + vec!["connection-secret-class-s3-credentials".to_string(), + "connection-tls-ca-secret-class-ca-cert".to_string(), + STACKABLE_TRUST_STORE_NAME.to_string()], + vec![ + VolumeMount { + name: "connection-secret-class-s3-credentials".to_string(), + mount_path: "/stackable/secrets/connection-secret-class".to_string(), + ..Default::default() + }, + VolumeMount { + name: "connection-tls-ca-secret-class-ca-cert".to_string(), + mount_path: "/stackable/secrets/connection-tls-ca-secret-class".to_string(), + ..Default::default() + }, + VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }], + Some("cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit && cert-tools generate-pkcs12-truststore --out /stackable/truststore/truststore.p12 --out-password changeit --pkcs12 /stackable/truststore/truststore.p12:changeit --pem /stackable/secrets/connection-tls-ca-secret-class/ca.crt".to_string()), + )] + #[case("one bucket without credentials and without tls", + ResolvedS3 { + s3_connection: None, + s3_buckets: vec![ + s3::v1alpha1::ResolvedBucket { + bucket_name: "bucket-1".to_string(), + connection: s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: None, + tls: TlsClientDetails { + tls: None, + }, + }}] + }, + vec![STACKABLE_TRUST_STORE_NAME.to_string()], + vec![VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }], + None, + )] + #[case("one bucket with credentials and no tls", + ResolvedS3 { + s3_connection: None, + s3_buckets: vec![ + s3::v1alpha1::ResolvedBucket { + bucket_name: "bucket-1".to_string(), + connection: s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "bucket-1-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: None, + }, + }}] + }, + vec!["bucket-1-secret-class-s3-credentials".to_string(), + STACKABLE_TRUST_STORE_NAME.to_string()], + vec![ + VolumeMount { + name: "bucket-1-secret-class-s3-credentials".to_string(), + mount_path: "/stackable/secrets/bucket-1-secret-class".to_string(), + ..Default::default() + }, + VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }], + None, + )] + #[case("one bucket with credentials and tls", + ResolvedS3 { + s3_connection: None, + s3_buckets: vec![ + s3::v1alpha1::ResolvedBucket { + bucket_name: "bucket-1".to_string(), + connection: s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "bucket-1-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: Some(Tls { + verification: TlsVerification::Server( + TlsServerVerification{ + ca_cert: CaCert::SecretClass("bucket-1-tls-ca-secret-class".to_string()), + }) + }), + }, + }}] + }, + vec!["bucket-1-secret-class-s3-credentials".to_string(), + "bucket-1-tls-ca-secret-class-ca-cert".to_string(), + STACKABLE_TRUST_STORE_NAME.to_string()], + vec![ + VolumeMount { + name: "bucket-1-secret-class-s3-credentials".to_string(), + mount_path: "/stackable/secrets/bucket-1-secret-class".to_string(), + ..Default::default() + }, + VolumeMount { + name: "bucket-1-tls-ca-secret-class-ca-cert".to_string(), + mount_path: "/stackable/secrets/bucket-1-tls-ca-secret-class".to_string(), + ..Default::default() + }, + VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }], + Some("cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit && cert-tools generate-pkcs12-truststore --out /stackable/truststore/truststore.p12 --out-password changeit --pkcs12 /stackable/truststore/truststore.p12:changeit --pem /stackable/secrets/bucket-1-tls-ca-secret-class/ca.crt".to_string()), + )] + #[case("connection and one bucket with different credentials and same TLS secret class", + ResolvedS3 { + s3_connection: Some( + s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "connection-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: Some(Tls { + verification: TlsVerification::Server( + TlsServerVerification{ + ca_cert: CaCert::SecretClass("tls-ca-secret-class".to_string()), + }) + }), + }, + } + ), + s3_buckets: vec![ + s3::v1alpha1::ResolvedBucket { + bucket_name: "bucket-1".to_string(), + connection: s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "bucket-1-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: Some(Tls { + verification: TlsVerification::Server( + TlsServerVerification{ + ca_cert: CaCert::SecretClass("tls-ca-secret-class".to_string()), + }) + }), + }, + }}] + }, + vec![ + "bucket-1-secret-class-s3-credentials".to_string(), + "connection-secret-class-s3-credentials".to_string(), + STACKABLE_TRUST_STORE_NAME.to_string(), + "tls-ca-secret-class-ca-cert".to_string(),], + vec![ + VolumeMount { + name: "bucket-1-secret-class-s3-credentials".to_string(), + mount_path: "/stackable/secrets/bucket-1-secret-class".to_string(), + ..Default::default() + }, + VolumeMount { + name: "connection-secret-class-s3-credentials".to_string(), + mount_path: "/stackable/secrets/connection-secret-class".to_string(), + ..Default::default() + }, + VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }, + VolumeMount { + name: "tls-ca-secret-class-ca-cert".to_string(), + mount_path: "/stackable/secrets/tls-ca-secret-class".to_string(), + ..Default::default() + }], + Some("cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit && cert-tools generate-pkcs12-truststore --out /stackable/truststore/truststore.p12 --out-password changeit --pkcs12 /stackable/truststore/truststore.p12:changeit --pem /stackable/secrets/tls-ca-secret-class/ca.crt".to_string()), + )] + #[case("connection and one bucket with same credentials and same TLS secret class", + ResolvedS3 { + s3_connection: Some( + s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "credentials-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: Some(Tls { + verification: TlsVerification::Server( + TlsServerVerification{ + ca_cert: CaCert::SecretClass("tls-ca-secret-class".to_string()), + }) + }), + }, + } + ), + s3_buckets: vec![ + s3::v1alpha1::ResolvedBucket { + bucket_name: "bucket-1".to_string(), + connection: s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "credentials-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: Some(Tls { + verification: TlsVerification::Server( + TlsServerVerification{ + ca_cert: CaCert::SecretClass("tls-ca-secret-class".to_string()), + }) + }), + }, + }}] + }, + vec![ + "credentials-secret-class-s3-credentials".to_string(), + STACKABLE_TRUST_STORE_NAME.to_string(), + "tls-ca-secret-class-ca-cert".to_string(),], + vec![ + VolumeMount { + name: "credentials-secret-class-s3-credentials".to_string(), + mount_path: "/stackable/secrets/credentials-secret-class".to_string(), + ..Default::default() + }, + VolumeMount { + name: STACKABLE_TRUST_STORE_NAME.to_string(), + mount_path: STACKABLE_TRUST_STORE.to_string(), + ..Default::default() + }, + VolumeMount { + name: "tls-ca-secret-class-ca-cert".to_string(), + mount_path: "/stackable/secrets/tls-ca-secret-class".to_string(), + ..Default::default() + }], + Some("cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit && cert-tools generate-pkcs12-truststore --out /stackable/truststore/truststore.p12 --out-password changeit --pkcs12 /stackable/truststore/truststore.p12:changeit --pem /stackable/secrets/tls-ca-secret-class/ca.crt".to_string()), + )] + fn test_volumes_and_mounts( + #[case] case_name: &str, + #[case] resolved_s3: ResolvedS3, + #[case] expected_volumes_names: Vec, + #[case] expected_mounts: Vec, + #[case] init_container_command: Option, + ) { + let (volumes, mounts) = resolved_s3.volumes_and_mounts().unwrap(); + assert_eq!( + volumes.into_iter().map(|v| v.name).collect::>(), + expected_volumes_names, + "Case failed for volumes: {}", + case_name + ); + assert_eq!( + mounts, expected_mounts, + "Case failed for mounts: {}", + case_name + ); + + assert_eq!( + resolved_s3.truststore_init_container_command(), + init_container_command, + "Case failed for init container command: {}", + case_name + ); + } + + #[rstest] + #[case("connection and one bucket with same credentials", + ResolvedS3 { + s3_connection: Some( + s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "credentials-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: None, + }, + }), + s3_buckets: vec![ + s3::v1alpha1::ResolvedBucket { + bucket_name: "bucket-1".to_string(), + connection: s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "credentials-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: None, + }, + }}] + }, + indoc::indoc! {r#" + spark.hadoop.fs.s3a.access.key=${file\:UTF-8\:/stackable/secrets/credentials-secret-class/accessKey} + spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + spark.hadoop.fs.s3a.bucket.bucket-1.access.key=${file\:UTF-8\:/stackable/secrets/credentials-secret-class/accessKey} + spark.hadoop.fs.s3a.bucket.bucket-1.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + spark.hadoop.fs.s3a.bucket.bucket-1.endpoint=http\://my-s3-endpoint.com/ + spark.hadoop.fs.s3a.bucket.bucket-1.endpoint.region=us-east-1 + spark.hadoop.fs.s3a.bucket.bucket-1.path.style.access=true + spark.hadoop.fs.s3a.bucket.bucket-1.secret.key=${file\:UTF-8\:/stackable/secrets/credentials-secret-class/secretKey} + spark.hadoop.fs.s3a.endpoint=http\://my-s3-endpoint.com/ + spark.hadoop.fs.s3a.endpoint.region=us-east-1 + spark.hadoop.fs.s3a.path.style.access=true + spark.hadoop.fs.s3a.secret.key=${file\:UTF-8\:/stackable/secrets/credentials-secret-class/secretKey} + "#}.to_string(), + )] + #[case("connection and one bucket with different credentials and endpoints", + ResolvedS3 { + s3_connection: Some( + s3::v1alpha1::ConnectionSpec { + host: "far-away.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "connection-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: None, + }, + }), + s3_buckets: vec![ + s3::v1alpha1::ResolvedBucket { + bucket_name: "bucket-1".to_string(), + connection: s3::v1alpha1::ConnectionSpec { + host: "my-s3-endpoint.com".parse().unwrap(), + port: None, + region: s3::v1alpha1::Region { + name: "us-east-1".to_string(), + }, + access_style: S3AccessStyle::Path, + credentials: Some(SecretClassVolume { + secret_class: "bucket-1-secret-class".to_string(), + scope: None, + }), + tls: TlsClientDetails { + tls: None, + }, + }}] + }, + indoc::indoc! {r#" + spark.hadoop.fs.s3a.access.key=${file\:UTF-8\:/stackable/secrets/connection-secret-class/accessKey} + spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + spark.hadoop.fs.s3a.bucket.bucket-1.access.key=${file\:UTF-8\:/stackable/secrets/bucket-1-secret-class/accessKey} + spark.hadoop.fs.s3a.bucket.bucket-1.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + spark.hadoop.fs.s3a.bucket.bucket-1.endpoint=http\://my-s3-endpoint.com/ + spark.hadoop.fs.s3a.bucket.bucket-1.endpoint.region=us-east-1 + spark.hadoop.fs.s3a.bucket.bucket-1.path.style.access=true + spark.hadoop.fs.s3a.bucket.bucket-1.secret.key=${file\:UTF-8\:/stackable/secrets/bucket-1-secret-class/secretKey} + spark.hadoop.fs.s3a.endpoint=http\://far-away.com/ + spark.hadoop.fs.s3a.endpoint.region=us-east-1 + spark.hadoop.fs.s3a.path.style.access=true + spark.hadoop.fs.s3a.secret.key=${file\:UTF-8\:/stackable/secrets/connection-secret-class/secretKey} + "#}.to_string(), + )] + + fn test_spark_properties( + #[case] case_name: &str, + #[case] resolved_s3: ResolvedS3, + #[case] spark_properties_string: String, + ) { + let properties = resolved_s3.spark_properties().unwrap(); + + assert_eq!( + to_java_properties_string(properties.iter()).unwrap(), + spark_properties_string, + "Case failed for spark properties: {}", + case_name + ); + } +} diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs index 861168c0..35eb8745 100644 --- a/rust/operator-binary/src/connect/server.rs +++ b/rust/operator-binary/src/connect/server.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, HashMap}; +use indoc::formatdoc; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ builder::{ @@ -44,6 +45,7 @@ use crate::{ CONNECT_GRPC_PORT, CONNECT_UI_PORT, DEFAULT_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer, v1alpha1, }, + s3, }, crd::{ constants::{ @@ -124,6 +126,15 @@ pub enum Error { #[snafu(display("failed build connect server jvm args for {name}"))] ServerJvmArgs { source: common::Error, name: String }, + + #[snafu(display("failed to add S3 secret or tls volume mounts to stateful set"))] + AddS3VolumeMount { source: s3::Error }, + + #[snafu(display("failed to add S3 secret volumes to stateful set"))] + AddS3Volume { source: s3::Error }, + + #[snafu(display("failed to create the init container for the S3 truststore"))] + TrustStoreInitContainer { source: s3::Error }, } // Assemble the configuration of the spark-connect server. @@ -204,6 +215,7 @@ pub(crate) fn server_config_map( .context(InvalidConfigMapSnafu { name: cm_name }) } +#[allow(clippy::too_many_arguments)] pub(crate) fn build_stateful_set( scs: &v1alpha1::SparkConnectServer, config: &v1alpha1::ServerConfig, @@ -212,6 +224,7 @@ pub(crate) fn build_stateful_set( config_map: &ConfigMap, listener_name: &str, args: Vec, + resolved_s3: &s3::ResolvedS3, ) -> Result { let server_role = SparkConnectRole::Server.to_string(); let recommended_object_labels = common::labels( @@ -249,6 +262,8 @@ pub(crate) fn build_stateful_set( .build(), ) .context(AddVolumeSnafu)? + // This is needed for shared enpryDir volumes with other containers like the truststore + // init container. .security_context(PodSecurityContext { fs_group: Some(1000), ..PodSecurityContext::default() @@ -284,6 +299,13 @@ pub(crate) fn build_stateful_set( .context(AddVolumeMountSnafu)? .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) .context(AddVolumeMountSnafu)? + .add_volume_mounts( + resolved_s3 + .volumes_and_mounts() + .context(AddS3VolumeMountSnafu)? + .1, + ) + .context(AddVolumeMountSnafu)? .readiness_probe(probe()) .liveness_probe(probe()); @@ -346,6 +368,23 @@ pub(crate) fn build_stateful_set( .context(BuildListenerVolumeSnafu)?, ]); + // S3: Add secret volumes needed for accessing S3 buckets. + pb.add_volumes( + resolved_s3 + .volumes_and_mounts() + .context(AddS3VolumeSnafu)? + .0, + ) + .context(AddVolumeSnafu)?; + + // S3: Add truststore init container for S3 endpoint communication with TLS. + if let Some(truststore_init_container) = resolved_s3 + .truststore_init_container(resolved_product_image.clone()) + .context(TrustStoreInitContainerSnafu)? + { + pb.add_init_container(truststore_init_container); + } + // Merge user defined pod template if available let mut pod_template = pb.build_template(); if let Some(pod_overrides_spec) = scs @@ -396,18 +435,17 @@ pub(crate) fn build_stateful_set( #[allow(clippy::result_large_err)] pub(crate) fn command_args(user_args: &[String]) -> Vec { - let mut command = vec![ - // ---------- start containerdebug - format!( - "containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &" - ), - // ---------- start spark connect server - "/stackable/spark/sbin/start-connect-server.sh".to_string(), - "--deploy-mode client".to_string(), // 'cluster' mode not supported - "--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}" - .to_string(), - format!("--properties-file {VOLUME_MOUNT_PATH_CONFIG}/{SPARK_DEFAULTS_FILE_NAME}"), - ]; + let mut command = vec![formatdoc! { " + containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop & + + cp {VOLUME_MOUNT_PATH_CONFIG}/{SPARK_DEFAULTS_FILE_NAME} /tmp/spark.properties + config-utils template /tmp/spark.properties + + /stackable/spark/sbin/start-connect-server.sh \\ + --deploy-mode client \\ + --master k8s://https://${{KUBERNETES_SERVICE_HOST}}:${{KUBERNETES_SERVICE_PORT_HTTPS}} \\ + --properties-file /tmp/spark.properties + " }]; // User provided command line arguments command.extend_from_slice(user_args); diff --git a/tests/templates/kuttl/spark-connect/04-minio-secrets.yaml b/tests/templates/kuttl/spark-connect/04-minio-secrets.yaml new file mode 100644 index 00000000..219d7f48 --- /dev/null +++ b/tests/templates/kuttl/spark-connect/04-minio-secrets.yaml @@ -0,0 +1,56 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-users +type: Opaque +stringData: + username1: | + username=spark + password=sparkspark + disabled=false + policies=readwrite,consoleAdmin,diagnostics + setPolicies=false +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: minio-tls-ca +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +# Certificate authoity for the "minio" host. +# Generated with certs/generate.sh in this test folder. +apiVersion: v1 +kind: Secret +metadata: + name: minio-tls-ca + labels: + secrets.stackable.tech/class: minio-tls-ca +data: + ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQyVENDQXNHZ0F3SUJBZ0lVVUR4Z0VQUnlqQzU2anFoaDNKQTRBQng0a1RFd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2V6RUxNQWtHQTFVRUJoTUNSRVV4R3pBWkJnTlZCQWdNRWxOamFHeGxjM2RwWnkxSWIyeHpkR1ZwYmpFTwpNQXdHQTFVRUJ3d0ZWMlZrWld3eEtEQW1CZ05WQkFvTUgxTjBZV05yWVdKc1pTQlRhV2R1YVc1bklFRjFkR2h2CmNtbDBlU0JKYm1NeEZUQVRCZ05WQkFNTURITjBZV05yWVdKc1pTNWtaVEFnRncweU5qQXlNRFV4TnpVeU5EUmEKR0E4eU1USTJNREV4TWpFM05USTBORm93ZXpFTE1Ba0dBMVVFQmhNQ1JFVXhHekFaQmdOVkJBZ01FbE5qYUd4bApjM2RwWnkxSWIyeHpkR1ZwYmpFT01Bd0dBMVVFQnd3RlYyVmtaV3d4S0RBbUJnTlZCQW9NSDFOMFlXTnJZV0pzClpTQlRhV2R1YVc1bklFRjFkR2h2Y21sMGVTQkpibU14RlRBVEJnTlZCQU1NREhOMFlXTnJZV0pzWlM1a1pUQ0MKQVNJd0RRWUpLb1pJaHZjTkFRRUJCUUFEZ2dFUEFEQ0NBUW9DZ2dFQkFOQ1RHZlQ5L3E0YTlZYnVWcDltdkhkZwoyTFdzWFowUG9tTnJLUmhDajBxTFdaR3JTampVbzJqMkhlVmRpVUY0bWZIQkF0Y2Y1RGdoOHZlWmJ4eUc5SDE2Cnp0bnNjdUEvZ3dJc3VhUnlsTlNSSy9QVzlMREw0VXJXK1RrMGpZYTdlMzNlUmR3OFJ3VVpkU0hRMFhTMHJWMjMKQzgzYjJoVGo3Z0dkNXVVejBLeWt1cldLVWRDallNQTJaaEhUZWtwTTRYNEF3WCs0bk5TUi9JY3FFVzhzVE1pZwowVEQvYU9DSTFEdEtBV2ErU2lUYWdsQU9lNHcvMUZiQ2RmU0hudHJSSUVVOE1DVkxDSUo3UTdGSVI2N1dTUXdlCmQ2SGdFQTJmNXFiSlNhQjVTcG1VT3JVdXZmL1R2b0QrQ0E2TVA3RVBLdFVRdWZIMjQ2MzdpdkpIa1dTTUs2OEMKQXdFQUFhTlRNRkV3SFFZRFZSME9CQllFRkpJbW53aXd4eEdpN3BlVTc2dlk4ZjZVMkRZcE1COEdBMVVkSXdRWQpNQmFBRkpJbW53aXd4eEdpN3BlVTc2dlk4ZjZVMkRZcE1BOEdBMVVkRXdFQi93UUZNQU1CQWY4d0RRWUpLb1pJCmh2Y05BUUVMQlFBRGdnRUJBSkN2aTV4YWpIQ0pzd1d2SkZ3UmVkZ2grSHhFalI5QngwSWQvbWo4dEhNQ1J1VzAKdmdxL3hpQ3NtRmw4MFJIRjRGM1hiSHFMRWpMUVE4aGZqNXFFbkhMWFJwa2JuRG13NkpLK1NPNUkzUjRBblRrdApGY08vMUQwZ2pjOTFXb3lrUmFZT3pFU3dNbXBTbE9id2thT1lmb3dhVjQ5VEhIRG12Q3U5Um5sT0QxVXZhd2psCkRMTjFFZFVrRG1FMjJiN1RHK2wvRUhlYURGUzM4YitYSUQvZEkzS25GZ3JNbTNTTHNCRmQzKzhyN0xlUC9IT1AKUXpRWGEzRTBpQ1Vud1A4NlU1VHBaczJaR3RJamEwMnh5U0tCUUhoaUZPdE1lWlJWa0VnRCt5b3BIUUx5TXlKMQpyU1lxdm80UUcxRHdMUU1PTllhWndWeHJhQUYySHgzVldKcU5DbFU9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + tls.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUR5RENDQXJDZ0F3SUJBZ0lVVStiUHpFMDBBUzBFRklmYi9SMEJ2M2dDWFNNd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2V6RUxNQWtHQTFVRUJoTUNSRVV4R3pBWkJnTlZCQWdNRWxOamFHeGxjM2RwWnkxSWIyeHpkR1ZwYmpFTwpNQXdHQTFVRUJ3d0ZWMlZrWld3eEtEQW1CZ05WQkFvTUgxTjBZV05yWVdKc1pTQlRhV2R1YVc1bklFRjFkR2h2CmNtbDBlU0JKYm1NeEZUQVRCZ05WQkFNTURITjBZV05yWVdKc1pTNWtaVEFnRncweU5qQXlNRFV4TnpVeU5EUmEKR0E4eU1USTJNREV4TWpFM05USTBORm93WGpFTE1Ba0dBMVVFQmhNQ1JFVXhHekFaQmdOVkJBZ01FbE5qYUd4bApjM2RwWnkxSWIyeHpkR1ZwYmpFT01Bd0dBMVVFQnd3RlYyVmtaV3d4RWpBUUJnTlZCQW9NQ1ZOMFlXTnJZV0pzClpURU9NQXdHQTFVRUF3d0ZiV2x1YVc4d2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUIKQVFETm4rbDdwQVNGekZmTUlWRE1qUVB5eTZnY2g1K1lkYW1ZMjJ1QjI5OUxZVXk3aGdJL25kRGNWZFMweWJ0SgpHTi9UYU0zcmpLVWVLYWUxR2pIeHFnZTczMWtsbXlJeWF6MWtCYm9qMHVEcEtMS2kzcVlpNmR0cFkzYTV5c295CmJoYlQvRkd5MGJwNXZRZmZ0c1NyNDZBNGZyQ1I0UmhiTWVXYnBqYmgrUkJGOWNDekxoa2ZBUzFkSVRTeFlPNjYKQzB4ZXFWWGwxc2x0bkZjRS9GRWVaNXVHQ1kvTDdEY1duV1pGZ2p1RG1HUitTSkpLdnFFT2tucHlMUzh5QTY2WQpaZDJxQnFaSUxTSUs4UFRtK0E4TlVEZ1UxNHhWa3BkWC9tRjBncFpMcnhOYWtEUkVoUEs2MkljQ09SOFJBcDJNCnA1TVpyZEtaSS9KUGlHVTkzUWZueUI3bEFnTUJBQUdqWHpCZE1Cc0dBMVVkRVFRVU1CS0NCVzFwYm1sdmdnbHMKYjJOaGJHaHZjM1F3SFFZRFZSME9CQllFRktUeG9NVzdTQW5pQXh6cTFqQ1pnNjJmZmtaY01COEdBMVVkSXdRWQpNQmFBRkpJbW53aXd4eEdpN3BlVTc2dlk4ZjZVMkRZcE1BMEdDU3FHU0liM0RRRUJDd1VBQTRJQkFRREtrTXZJCk5XU1ZQa2llZTU4WHdvYTU1TmRmVXgwNUUvWDYzZXhyMGluYVBnZW4xMlppeXhMcFVRZ3hXdWFVU3pkQWd2ZHgKRW9zc2J0L3J2b0pDaUlmRDkyd1BsS2JIVlk5OUR4WGNqdTlOMmZZZWdSVHFrVTRDVUdjd25RcDMzMDNpNEY4ago4OW5JUW9KVkZwTEdVQUJaOC9VM1Z3U3ZEUCs2bDlrbVpQREVtbkdLR3RoTWFRMW9CZGUxV1BWR2hwMHJCdm9MCkpFejRFTnRCNWpEK05ldndibWNCT1UvbUtiLzJ3U1dCS0NTTTQ3Wk44QzR3Q0tLRTZ0VVk0VERuYWZid2RhMmMKWWU0UnZJYTE3cytiVmF0VUMxdUpRL1FMdEdQS3lRamYvWmh4QnVndElIN3pIVDZUYllCSDMvdW1kUGxmYS8zSApZQTdFL041MkN0K3VSRjU0Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + tls.key: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2UUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktjd2dnU2pBZ0VBQW9JQkFRRE5uK2w3cEFTRnpGZk0KSVZETWpRUHl5NmdjaDUrWWRhbVkyMnVCMjk5TFlVeTdoZ0kvbmREY1ZkUzB5YnRKR04vVGFNM3JqS1VlS2FlMQpHakh4cWdlNzMxa2xteUl5YXoxa0Jib2owdURwS0xLaTNxWWk2ZHRwWTNhNXlzb3liaGJUL0ZHeTBicDV2UWZmCnRzU3I0NkE0ZnJDUjRSaGJNZVdicGpiaCtSQkY5Y0N6TGhrZkFTMWRJVFN4WU82NkMweGVxVlhsMXNsdG5GY0UKL0ZFZVo1dUdDWS9MN0RjV25XWkZnanVEbUdSK1NKSkt2cUVPa25weUxTOHlBNjZZWmQycUJxWklMU0lLOFBUbQorQThOVURnVTE0eFZrcGRYL21GMGdwWkxyeE5ha0RSRWhQSzYySWNDT1I4UkFwMk1wNU1acmRLWkkvSlBpR1U5CjNRZm55QjdsQWdNQkFBRUNnZ0VBR21vM2tVV1J1eXAwQU9vcXVneEhmSkpEQjE4NDFsb1BMbTdKa2NZUUdsdm0KZ3BTRmgyeWJueUo3ajduMmtENWN5b2pGSTBSUEZkL2VCbnJWL2FpTkU4cHVabEZXaEVtWWVsZnVBSm9mZ0hSVQo5bTFKeEdSc1prNTd1d1JkRXp0blBWWkZuSVlxd1diU015QUVoZHhaQWNqc24rRGR3eUZXMExiNmgrNzU4ektTCmpsSG13cWpHQUl2YVJlSnViU1JvdHRwZ2JobjdHSHZDaVZOR3JUdWV1VStNcFV1eHhJbXVYRHZpZ1krY1ZxdzIKcktPYkwyVHRSTDJ1L01adEJGQ0pYbHBKK2JXa0FzRXdXNHVBWnhKeTJSMkx1cDVEdzgxVTI0Rmw4b2U5clpGTQoxMlBZWFJFVmhISWI1djNhVzJLL0ptNkswMGFLUHhKbEtncmJFdkRUSVFLQmdRRDl3ODFNdUNUWFl3dWRvQjUwCm5PeHREVTBqTENoVjE0VmxhR25MSmh0Ukp0TWZpaXdQa0R1TmF2S0RJMnpvbGtnWCszYXZ0MmI3NlNiY0hvTzkKMzBPLytjSE9KUWZ0K0Y4L0xlYWZOaEtIYTVudUQxeWtHbEZPQVJpV3p3TWF0Zm1IZHJtOTJXS2JNVHpUaS9tVwpqWENuKzVrY3lNR1QzUGl6dWxTVDZRQkpYUUtCZ1FEUGI0L1dGYS9qK0VPcHIwdkZzR3RLWDlUUTFFNU1HblVkCmJSVHBCWk80QmUyZE5DWHRFeWVtZ0cwbnB6cDZXTXZzVjRzV2NBSmt5aytWa0RBZ2o5cUVJbTNEUDFnOFRRbGMKRHo4N1pxV1lPZzhwRkVCbURxbk5aYW1CVy9aK1ZiNHBiRUdlMm14RHBiSmRMdTNOVmxRZnAvVWI3WGtaQ2dnUgp0YVRSMXJUcktRS0JnREVkRUVMazhOeHU0dlNpNU1JVkRQMGVNZXU0eENXNURLeFB4UW40V2hrZXRvWElMRGJtCjUxKzdieXhLVXUzQkNEcjhCRUNGOG55VzUxcDYzV3lHSllxbVFBZ3h5cE1ZR3ZjVFh5czVQK1ROd29EOG9DVnkKb29IQ1hJdnpqTnBDbGdUTnlhMGd3YURmcXJJV3lUdUdMR09Xb2srYjJ6dE83U043MEpxLzRicFJBb0dBZnBBQQo0SXdtM3g3d21hMWN1K2RoN3VUOWdkU25XUU9qaFNxeTRXSUh1UFhVL2w5ODdHTU5oQ2REY2pnMEU4WHQxZXVyCjd5cTBLeTdNMCtJL212NXFRc2lHMCtQb1FCSjRyWFNZRGZRWkFRSWJrZUxMVC9tT1hNVzBZRHJ0OERMOGJXV2gKdS94a3BmbUpGQlczL2RxNFJRQkRLcUQvaStsMDl1a3ZBT0RSVGRFQ2dZRUFySnV3aUgxaUJobUFwUFJaTE1BLwpDNW5memxlbFJDQmwybngrZHA1c1hsZFdseHBsREsvVTByZjJaTVBZNkdQQmlBRnBZSXVHNThrREtRUTlRNjgrCmVDYmNMMno4b0llU3ZQWndZUTRPVkFyTTFTR1dKWE5kZm1IbndJTjZlRVI1cG9UNVYyRXA1NjU3bVNRakJkQ1YKbDMwMVU0blBFY1BsekdBdjVJVFJzT1E9Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: minio-credentials-class +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-credentials + labels: + secrets.stackable.tech/class: minio-credentials-class +stringData: + accessKey: spark + secretKey: sparkspark diff --git a/tests/templates/kuttl/spark-connect/04-minio-users.yaml b/tests/templates/kuttl/spark-connect/04-minio-users.yaml deleted file mode 100644 index 400fef72..00000000 --- a/tests/templates/kuttl/spark-connect/04-minio-users.yaml +++ /dev/null @@ -1,21 +0,0 @@ ---- -apiVersion: v1 -kind: Secret -metadata: - name: minio-users -type: Opaque -stringData: - username1: | - username=spark - password=sparkspark - disabled=false - policies=readwrite,consoleAdmin,diagnostics - setPolicies=false ---- -apiVersion: v1 -kind: Secret -metadata: - name: s3-credentials -stringData: - accessKey: spark - secretKey: sparkspark diff --git a/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 index da974901..30bd56dd 100644 --- a/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 +++ b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 @@ -1,4 +1,31 @@ --- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: mybucket-s3-connection +spec: + host: minio + port: 9000 + accessStyle: Path + credentials: + secretClass: minio-credentials-class +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + tls: + verification: + server: + caCert: + secretClass: minio-tls-ca +{% endif %} +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Bucket +metadata: + name: mybucket-s3-bucket +spec: + bucketName: mybucket + connection: + reference: mybucket-s3-connection +--- apiVersion: v1 kind: ConfigMap metadata: @@ -35,30 +62,10 @@ spec: {% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} - args: - # These are unfortunately required to make the S3A connector work with MinIO - # I had expected the clients to be able to set these, but that is not the case. - - --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider - - --conf spark.hadoop.fs.s3a.path.style.access=true - - --conf spark.hadoop.fs.s3a.endpoint=http://minio:9000 - - --conf spark.hadoop.fs.s3a.region=us-east-1 + connectors: + s3buckets: + - reference: mybucket-s3-bucket server: - podOverrides: - spec: - containers: - - name: spark - env: - - name: AWS_ACCESS_KEY_ID - valueFrom: - secretKeyRef: - name: s3-credentials - key: accessKey - - name: AWS_SECRET_ACCESS_KEY - valueFrom: - secretKeyRef: - name: s3-credentials - key: secretKey - jvmArgumentOverrides: add: - -Dmy.custom.jvm.arg=customValue diff --git a/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 index 67842a27..6d2b5f15 100644 --- a/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 +++ b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 @@ -76,17 +76,6 @@ spec: requests: cpu: 200m memory: 128Mi - env: - - name: AWS_ACCESS_KEY_ID - valueFrom: - secretKeyRef: - name: s3-credentials - key: accessKey - - name: AWS_SECRET_ACCESS_KEY - valueFrom: - secretKeyRef: - name: s3-credentials - key: secretKey volumeMounts: - name: spark-connect-client mountPath: /app diff --git a/tests/templates/kuttl/spark-connect/certs/generate.sh b/tests/templates/kuttl/spark-connect/certs/generate.sh new file mode 100755 index 00000000..dfd63c18 --- /dev/null +++ b/tests/templates/kuttl/spark-connect/certs/generate.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +echo "Creating client cert" +FQDN="minio" + +echo "Creating Root Certificate Authority" +openssl genrsa \ + -out root-ca.key.pem \ + 2048 + +echo "Self-signing the Root Certificate Authority" +openssl req \ + -x509 \ + -new \ + -nodes \ + -key root-ca.key.pem \ + -days 36500 \ + -out root-ca.crt.pem \ + -subj "/C=DE/ST=Schleswig-Holstein/L=Wedel/O=Stackable Signing Authority Inc/CN=stackable.de" + +openssl genrsa \ + -out client.key.pem \ + 2048 + +echo "Creating the CSR" +openssl req -new \ + -key client.key.pem \ + -out client.csr.pem \ + -subj "/C=DE/ST=Schleswig-Holstein/L=Wedel/O=Stackable/CN=${FQDN}" \ + -addext "subjectAltName = DNS:${FQDN}, DNS:localhost" + +echo "Signing the client cert with the root ca" +openssl x509 \ + -req -in client.csr.pem \ + -CA root-ca.crt.pem \ + -CAkey root-ca.key.pem \ + -CAcreateserial \ + -out client.crt.pem \ + -days 36500 \ + -copy_extensions copy + +echo "Copying the files to match the api of the secret-operator" +cp root-ca.crt.pem ca.crt +cp client.key.pem tls.key +cp client.crt.pem tls.crt + +echo "To create a k8s secret run" +echo "kubectl create secret generic minio-tls-ca --from-file=ca.crt --from-file=tls.crt --from-file=tls.key" diff --git a/tests/templates/kuttl/spark-connect/helm-bitnami-minio-values.yaml b/tests/templates/kuttl/spark-connect/helm-bitnami-minio-values.yaml.j2 similarity index 91% rename from tests/templates/kuttl/spark-connect/helm-bitnami-minio-values.yaml rename to tests/templates/kuttl/spark-connect/helm-bitnami-minio-values.yaml.j2 index b9c72811..cf794a80 100644 --- a/tests/templates/kuttl/spark-connect/helm-bitnami-minio-values.yaml +++ b/tests/templates/kuttl/spark-connect/helm-bitnami-minio-values.yaml.j2 @@ -67,3 +67,8 @@ resources: service: type: NodePort +{% if test_scenario['values']['s3-use-tls'] == 'true' %} +tls: + enabled: true + existingSecret: minio-tls-ca +{% endif %} diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index ebb0b697..f8662181 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -134,6 +134,7 @@ tests: dimensions: - spark-connect - openshift + - s3-use-tls suites: - name: nightly