Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file.
This has the effect that applications where the `spark-submit` Pod fails are not resubmitted.
Previously, Jobs were retried at most 6 times by default ([#647]).
- Support for Spark `3.5.8` ([#650]).
- First class support for S3 buckets on Spark connect clusters ([#652]).

### Fixed

Expand Down Expand Up @@ -41,6 +42,7 @@ All notable changes to this project will be documented in this file.
[#649]: https://github.com/stackabletech/spark-k8s-operator/pull/649
[#650]: https://github.com/stackabletech/spark-k8s-operator/pull/650
[#651]: https://github.com/stackabletech/spark-k8s-operator/pull/651
[#652]: https://github.com/stackabletech/spark-k8s-operator/pull/652
[#655]: https://github.com/stackabletech/spark-k8s-operator/pull/655
[#656]: https://github.com/stackabletech/spark-k8s-operator/pull/656

Expand Down
4 changes: 4 additions & 0 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

327 changes: 327 additions & 0 deletions extra/crds.yaml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tracing-futures.workspace = true
clap.workspace = true
futures.workspace = true
tokio.workspace = true
indoc.workspace = true

[dev-dependencies]
indoc.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion rust/operator-binary/src/connect/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub(crate) fn jvm_args(
// Merges server and executor properties and renders the contents
// of the Spark properties file.
pub(crate) fn spark_properties(
props: &[BTreeMap<String, Option<String>>; 2],
props: &[BTreeMap<String, Option<String>>],
) -> Result<String, Error> {
let mut result = BTreeMap::new();
for p in props {
Expand Down
20 changes: 19 additions & 1 deletion rust/operator-binary/src/connect/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use strum::{EnumDiscriminants, IntoStaticStr};
use super::crd::{CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, v1alpha1};
use crate::{
Ctx,
connect::{common, crd::SparkConnectServerStatus, executor, server, service},
connect::{common, crd::SparkConnectServerStatus, executor, s3, server, service},
crd::constants::{OPERATOR_NAME, SPARK_IMAGE_BASE_NAME},
};

Expand Down Expand Up @@ -142,6 +142,12 @@ pub enum Error {
ResolveProductImage {
source: product_image_selection::Error,
},

#[snafu(display("failed to resolve S3 connections for SparkConnectServer {name:?}"))]
ResolveS3Connections { source: s3::Error, name: String },

#[snafu(display("failed to build connect server S3 properties"))]
S3SparkProperties { source: crate::connect::s3::Error },
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -186,6 +192,13 @@ pub async fn reconcile(
.resolve(SPARK_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION)
.context(ResolveProductImageSnafu)?;

// Resolve any S3 connections early to fail fast if there are issues.
let resolved_s3 = s3::ResolvedS3::resolve(client, scs)
.await
.with_context(|_| ResolveS3ConnectionsSnafu {
name: scs.name_unchecked(),
})?;

// Use a dedicated service account for connect server pods.
let (service_account, role_binding) = build_rbac_resources(
scs,
Expand Down Expand Up @@ -229,6 +242,9 @@ pub async fn reconcile(
// Server config map

let spark_props = common::spark_properties(&[
resolved_s3
.spark_properties()
.context(S3SparkPropertiesSnafu)?,
server::server_properties(
scs,
&server_config,
Expand Down Expand Up @@ -263,6 +279,7 @@ pub async fn reconcile(
&executor_config,
&resolved_product_image,
&executor_config_map,
&resolved_s3,
)
.context(ExecutorPodTemplateSnafu)?,
)
Expand Down Expand Up @@ -308,6 +325,7 @@ pub async fn reconcile(
&server_config_map,
&applied_listener.name_any(),
args,
&resolved_s3,
)
.context(BuildServerStatefulSetSnafu)?;

Expand Down
91 changes: 90 additions & 1 deletion rust/operator-binary/src/connect/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use stackable_operator::{
fragment::{self, Fragment, ValidationError},
merge::Merge,
},
crd::s3,
deep_merger::ObjectOverrides,
k8s_openapi::{api::core::v1::PodAntiAffinity, apimachinery::pkg::api::resource::Quantity},
kube::{CustomResource, ResourceExt},
Expand Down Expand Up @@ -67,7 +68,6 @@ pub enum Error {
)
)]
pub mod versioned {

/// An Apache Spark Connect server component. This resource is managed by the Stackable operator
/// for Apache Spark. Find more information on how to use it in the
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/connect-server).
Expand All @@ -87,6 +87,10 @@ pub mod versioned {
#[serde(default)]
pub cluster_operation: ClusterOperation,

/// One or more S3 connections to be used by the Spark Connect server.
#[serde(default)]
connectors: Connectors,

// Docs are on the ObjectOverrides struct
#[serde(default)]
pub object_overrides: ObjectOverrides,
Expand Down Expand Up @@ -184,6 +188,14 @@ pub mod versioned {
#[fragment_attrs(serde(default))]
pub requested_secret_lifetime: Option<Duration>,
}

#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Deserialize, Serialize)]
struct Connectors {
#[serde(default)]
pub s3buckets: Vec<s3::v1alpha1::InlineBucketOrReference>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub s3connection: Option<s3::v1alpha1::InlineConnectionOrReference>,
}
}

#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -391,3 +403,80 @@ impl v1alpha1::ExecutorConfig {
}
}
}
#[cfg(test)]
mod tests {
use indoc::indoc;

use super::*;

#[test]
fn test_cr_minimal_deserialization() {
let _spark_connect_cr = serde_yaml::from_str::<v1alpha1::SparkConnectServer>(indoc! { r#"
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
name: spark-connect
spec:
image:
productVersion: 4.1.1
"# })
.expect("Failed to deserialize minimal SparkConnectServer CR");
}

#[test]
fn test_cr_s3_ref_deserialization() {
let input = indoc! { r#"
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
name: spark-connect
spec:
image:
productVersion: 4.1.1
connectors:
s3:
- reference: my-s3-bucket
"# };

let deserializer = serde_yaml::Deserializer::from_str(input);
let _spark_connect_cr: v1alpha1::SparkConnectServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer)
.expect("Failed to deserialize SparkConnectServer with S3 connectors CR");
}

#[test]
fn test_cr_s3_inline_deserialization() {
let input = indoc! { r#"
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
name: spark-connect
spec:
image:
productVersion: 4.1.1
connectors:
s3:
- inline:
bucketName: mybucket
connection:
inline:
host: minio
port: 9000
accessStyle: Path
credentials:
secretClass: minio-credentials-class
tls:
verification:
server:
caCert:
secretClass: minio-tls-ca
"# };

let deserializer = serde_yaml::Deserializer::from_str(input);
let _spark_connect_cr: v1alpha1::SparkConnectServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer)
.expect("Failed to deserialize SparkConnectServer with S3 connectors CR");
}
}
44 changes: 41 additions & 3 deletions rust/operator-binary/src/connect/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use stackable_operator::{
},
k8s_openapi::{
DeepMerge,
api::core::v1::{ConfigMap, EnvVar, PodTemplateSpec},
api::core::v1::{ConfigMap, EnvVar, PodSecurityContext, PodTemplateSpec},
},
kube::{ResourceExt, runtime::reflector::ObjectRef},
product_logging::framework::calculate_log_volume_size_limit,
Expand All @@ -26,7 +26,7 @@ use super::{
crd::{DEFAULT_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer},
};
use crate::{
connect::{common, crd::v1alpha1},
connect::{common, crd::v1alpha1, s3},
crd::constants::{
JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE,
METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME,
Expand Down Expand Up @@ -85,6 +85,15 @@ pub enum Error {
source: builder::configmap::Error,
cm_name: String,
},

#[snafu(display("failed to add S3 secret or tls volume mounts to executors"))]
AddS3VolumeMount { source: s3::Error },

#[snafu(display("failed to add S3 secret volumes to executors"))]
AddS3Volume { source: s3::Error },

#[snafu(display("failed to create the init container for the S3 truststore"))]
TrustStoreInitContainer { source: s3::Error },
}

// The executor pod template can contain only a handful of properties.
Expand All @@ -102,6 +111,7 @@ pub fn executor_pod_template(
config: &v1alpha1::ExecutorConfig,
resolved_product_image: &ResolvedProductImage,
config_map: &ConfigMap,
resolved_s3: &s3::ResolvedS3,
) -> Result<PodTemplateSpec, Error> {
let container_env = executor_env(
scs.spec
Expand All @@ -118,6 +128,13 @@ pub fn executor_pod_template(
.add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG)
.context(AddVolumeMountSnafu)?
.add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG)
.context(AddVolumeMountSnafu)?
.add_volume_mounts(
resolved_s3
.volumes_and_mounts()
.context(AddS3VolumeMountSnafu)?
.1,
)
.context(AddVolumeMountSnafu)?;

let metadata = ObjectMetaBuilder::new()
Expand Down Expand Up @@ -148,7 +165,28 @@ pub fn executor_pod_template(
.with_config_map(config_map.name_unchecked())
.build(),
)
.context(AddVolumeSnafu)?;
.context(AddVolumeSnafu)?
.add_volumes(
resolved_s3
.volumes_and_mounts()
.context(AddS3VolumeSnafu)?
.0,
)
.context(AddVolumeSnafu)?
// This is needed for shared enpryDir volumes with other containers like the truststore
// init container.
.security_context(PodSecurityContext {
fs_group: Some(1000),
..PodSecurityContext::default()
});

// S3: Add truststore init container for S3 endpoint communication with TLS.
if let Some(truststore_init_container) = resolved_s3
.truststore_init_container(resolved_product_image.clone())
.context(TrustStoreInitContainerSnafu)?
{
template.add_init_container(truststore_init_container);
}

if let Some(cm_name) = config.log_config_map() {
container
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod common;
pub mod controller;
pub mod crd;
mod executor;
mod s3;
pub mod server;
mod service;

Expand Down
Loading