From 2e7762622eb76558b21ed523e2d1ac4388ad740d Mon Sep 17 00:00:00 2001 From: Ashok Kumar Date: Fri, 19 Jun 2026 16:29:11 +0530 Subject: [PATCH 1/6] [LIVY-461] Support multi-tenant (multi-namespace) Kubernetes clusters Livy on Kubernetes previously assumed a single namespace and listed/killed applications across all namespaces (inAnyNamespace). In a multi-tenant cluster the Livy service account is typically scoped to a subset of namespaces, so cluster-wide calls fail. This change makes the namespace a first-class, per-session property: * SparkApp.getNamespace resolves the target namespace from the session Spark conf (spark.kubernetes.namespace), falling back to $SPARK_HOME/conf/spark-defaults.conf and finally the 'default' namespace. It only touches the filesystem on Kubernetes and returns an empty namespace for YARN/local, closing the input stream and tolerating a missing file or unset SPARK_HOME. * The namespace is threaded through SparkApp.create and persisted in the batch and interactive recovery metadata, so a recovered session keeps the namespace it was created with. * SparkKubernetesApp scopes every client call with .inNamespace(...) and tracks the set of namespaces it has seen in a thread-safe set (ConcurrentHashMap-backed) so leaked-application cleanup iterates only over namespaces Livy actually has access to. * Bumps kubernetes-client to 6.8.1 and netty to 4.1.108.Final for security fixes. * Adds SparkAppSpec covering the namespace-resolution precedence chain and updates existing recovery-metadata fixtures for the new field. Co-Authored-By: Claude Opus 4.8 (1M context) --- pom.xml | 11 +- .../livy/server/batch/BatchSession.scala | 14 ++- .../interactive/InteractiveSession.scala | 14 ++- .../org/apache/livy/utils/SparkApp.scala | 50 ++++++++- .../livy/utils/SparkKubernetesApp.scala | 39 ++++--- .../livy/server/batch/BatchSessionSpec.scala | 2 +- .../interactive/InteractiveSessionSpec.scala | 6 +- .../livy/sessions/SessionManagerSpec.scala | 2 +- .../org/apache/livy/utils/SparkAppSpec.scala | 106 ++++++++++++++++++ 9 files changed, 213 insertions(+), 31 deletions(-) create mode 100644 server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala diff --git a/pom.xml b/pom.xml index 218b43153..c52be2c36 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 1.7.36 3.3.4 ${spark.scala-2.12.version} - 5.6.0 + 6.8.1 3.0.0 1.15 3.17.0 @@ -1399,6 +1399,13 @@ + + hadoop3 + + 3 + 3.4.0 + + hadoop2 @@ -1432,7 +1439,7 @@ 1.8 0.10.9.7 3.7.0-M11 - 4.1.96.Final + 4.1.108.Final 2.15.2 2.15.2 spark-${spark.version}-bin-hadoop${hadoop.major-minor.version} diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 8b64a0398..2d4bd4a5b 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -40,6 +40,7 @@ case class BatchRecoveryMetadata( appTag: String, owner: String, proxyUser: Option[String], + namespace: String, version: Int = 1) extends RecoveryMetadata @@ -64,7 +65,7 @@ object BatchSession extends Logging { mockApp: Option[SparkApp] = None): BatchSession = { val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase() val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner) - + val namespace = SparkApp.getNamespace(request.conf, livyConf) def createSparkApp(s: BatchSession): SparkApp = { val conf = SparkApp.prepareSparkConf( appTag, @@ -106,7 +107,8 @@ object BatchSession extends Logging { childProcesses.decrementAndGet() } } - SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s)) + val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace) + SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s), extrasMap) } info(s"Creating batch session $id: [owner: $owner, request: $request]") @@ -120,6 +122,7 @@ object BatchSession extends Logging { owner, impersonatedUser, sessionStore, + namespace, mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp)) } @@ -137,8 +140,10 @@ object BatchSession extends Logging { m.owner, m.proxyUser, sessionStore, + m.namespace, mockApp.map { m => (_: BatchSession) => m }.getOrElse { s => - SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) + val extrasMap = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> m.namespace) + SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s), extrasMap) }) } } @@ -152,6 +157,7 @@ class BatchSession( owner: String, override val proxyUser: Option[String], sessionStore: SessionStore, + namespace: String, sparkApp: BatchSession => SparkApp) extends Session(id, name, owner, livyConf) with SparkAppListener { import BatchSession._ @@ -204,5 +210,5 @@ class BatchSession( override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo } override def recoveryMetadata: RecoveryMetadata = - BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser) + BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, namespace) } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 0667b718c..978951f0d 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -70,6 +70,7 @@ case class InteractiveRecoveryMetadata( // proxyUser is deprecated. It is available here only for backward compatibility proxyUser: Option[String], rscDriverUri: Option[URI], + namespace: String, version: Int = 1) extends RecoveryMetadata @@ -93,6 +94,7 @@ object InteractiveSession extends Logging { mockClient: Option[RSCClient] = None): InteractiveSession = { val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase() val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner) + val namespace = SparkApp.getNamespace(request.conf, livyConf) val client = mockClient.orElse { val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf( @@ -153,6 +155,7 @@ object InteractiveSession extends Logging { request.numExecutors, request.pyFiles, request.queue, + namespace, mockApp) } @@ -193,6 +196,7 @@ object InteractiveSession extends Logging { metadata.numExecutors, metadata.pyFiles, metadata.queue, + metadata.namespace, mockApp) } @@ -432,6 +436,7 @@ class InteractiveSession( val numExecutors: Option[Int], val pyFiles: List[String], val queue: Option[String], + val namespace: String, mockApp: Option[SparkApp]) // For unit test. extends Session(id, name, owner, ttl, idleTimeout, livyConf) with SessionHeartbeat @@ -461,11 +466,14 @@ class InteractiveSession( app = mockApp.orElse { val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) + val extrasMap = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace) if (!livyConf.isRunningOnKubernetes()) { - driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + driverProcess.map { _ => + SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap) + } } else { // Create SparkApp for Kubernetes anyway - Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap)) } } @@ -534,7 +542,7 @@ class InteractiveSession( heartbeatTimeout.toSeconds.toInt, owner, ttl, idleTimeout, driverMemory, driverCores, executorMemory, executorCores, conf, archives, files, jars, numExecutors, pyFiles, queue, - proxyUser, rscDriverUri) + proxyUser, rscDriverUri, namespace) override def state: SessionState = { if (serverSideState == SessionState.Running) { diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index e424f80fc..4c7e96bf3 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -17,6 +17,9 @@ package org.apache.livy.utils +import java.io.{File, FileInputStream} +import java.util.Properties + import scala.collection.JavaConverters._ import org.apache.livy.LivyConf @@ -56,12 +59,52 @@ trait SparkAppListener { */ object SparkApp { private val SPARK_YARN_TAG_KEY = "spark.yarn.tags" - + val SPARK_KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace" object State extends Enumeration { val STARTING, RUNNING, FINISHED, FAILED, KILLED = Value } type State = State.Value + val DEFAULT_KUBERNETES_NAMESPACE = "default" + + /** + * Resolve the Kubernetes namespace a Spark application should run in. + * + * The namespace is looked up, in order of precedence, from: + * 1. the session's Spark configuration ([[SPARK_KUBERNETES_NAMESPACE_KEY]]), + * 2. `$SPARK_HOME/conf/spark-defaults.conf` (if present), + * 3. the [[DEFAULT_KUBERNETES_NAMESPACE]] fallback. + * + * The namespace is only meaningful on Kubernetes, so for any other cluster + * manager (YARN, local) an empty string is returned without touching the + * filesystem. + */ + def getNamespace(conf: Map[String, String], livyConf: LivyConf): String = { + if (!livyConf.isRunningOnKubernetes()) { + return "" + } + conf.get(SPARK_KUBERNETES_NAMESPACE_KEY).filter(_.nonEmpty).getOrElse { + namespaceFromSparkDefaults(livyConf).getOrElse(DEFAULT_KUBERNETES_NAMESPACE) + } + } + + private def namespaceFromSparkDefaults(livyConf: LivyConf): Option[String] = { + livyConf.sparkHome().flatMap { sparkHome => + val sparkDefaults = new File(sparkHome, s"conf${File.separator}spark-defaults.conf") + if (!sparkDefaults.isFile) { + None + } else { + val in = new FileInputStream(sparkDefaults) + try { + val properties = new Properties() + properties.load(in) + Option(properties.getProperty(SPARK_KUBERNETES_NAMESPACE_KEY)).filter(_.nonEmpty) + } finally { + in.close() + } + } + } + } /** * Return cluster manager dependent SparkConf. * @@ -102,11 +145,12 @@ object SparkApp { appId: Option[String], process: Option[LineBufferedProcess], livyConf: LivyConf, - listener: Option[SparkAppListener]): SparkApp = { + listener: Option[SparkAppListener], + extrasMap: Map[String, String]): SparkApp = { if (livyConf.isRunningOnYarn()) { new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf) } else if (livyConf.isRunningOnKubernetes()) { - new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf) + new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf, extrasMap) } else { require(process.isDefined, "process must not be None when Livy master is not YARN or" + "Kubernetes.") diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 22ab6b8f8..e271ce04a 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -21,6 +21,8 @@ import java.util.Collections import java.util.concurrent._ import scala.annotation.tailrec +import scala.collection.JavaConverters.asScalaSetConverter +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent._ import scala.concurrent.duration._ @@ -52,14 +54,17 @@ object SparkKubernetesApp extends Logging { val iter = leakedAppTags.entrySet().iterator() var isRemoved = false val now = System.currentTimeMillis() - val apps = withRetry(kubernetesClient.getApplications()) + val apps = appNamespaces.flatMap { namespace => + withRetry(kubernetesClient.inNamespace(namespace).getApplications()) + } while (iter.hasNext) { val entry = iter.next() apps.find(_.getApplicationTag.contains(entry.getKey)) .foreach({ app => info(s"Kill leaked app ${app.getApplicationId}") - withRetry(kubernetesClient.killApplication(app)) + withRetry(kubernetesClient.inNamespace(app.getApplicationNamespace) + .killApplication(app)) iter.remove() isRemoved = true }) @@ -138,6 +143,8 @@ object SparkKubernetesApp extends Logging { private var sessionLeakageCheckInterval: Long = _ var kubernetesClient: DefaultKubernetesClient = _ + var appNamespaces: mutable.Set[String] = + ConcurrentHashMap.newKeySet[String]().asScala private var appLookupThreadPoolSize: Long = _ private var appLookupMaxFailedTimes: Long = _ @@ -146,8 +153,7 @@ object SparkKubernetesApp extends Logging { this.livyConf = livyConf // KubernetesClient is thread safe. Create once, share it across threads. - kubernetesClient = - KubernetesClientFactory.createKubernetesClient(livyConf) + kubernetesClient = KubernetesClientFactory.createKubernetesClient(livyConf) cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds @@ -245,7 +251,9 @@ class SparkKubernetesApp private[utils] ( process: Option[LineBufferedProcess], listener: Option[SparkAppListener], livyConf: LivyConf, - kubernetesClient: => KubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit test. + extrasMap: Map[String, String], + // For unit test. + kubernetesClient: => DefaultKubernetesClient = SparkKubernetesApp.kubernetesClient) extends SparkApp with Logging { @@ -262,6 +270,8 @@ class SparkKubernetesApp private[utils] ( private var kubernetesTagToAppIdFailedTimes: Int = _ private var kubernetesAppMonitorFailedTimes: Int = _ + private var namespace: String = extrasMap(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY) + appNamespaces.add(namespace) private def failToMonitor(): Unit = { changeState(SparkApp.State.FAILED) process.foreach(_.destroy()) @@ -292,10 +302,11 @@ class SparkKubernetesApp private[utils] ( } // Get KubernetesApplication by appTag. val appOption: Option[KubernetesApplication] = try { - getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow) + getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow, namespace) } catch { case e: Exception => failToGetAppId() + error(s"Exception getting app from tag $appTag in namespace $namespace with message: ", e) appPromise.failure(e) return } @@ -311,7 +322,7 @@ class SparkKubernetesApp private[utils] ( listener.foreach(_.appIdKnown(appId)) if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) { - withRetry(kubernetesClient.createSparkUIIngress(app, livyConf)) + withRetry(kubernetesClient.inNamespace(namespace).createSparkUIIngress(app, livyConf)) } var appInfo = AppInfo() @@ -326,7 +337,7 @@ class SparkKubernetesApp private[utils] ( debug(s"getApplicationReport, applicationId: ${app.getApplicationId}, " + s"namespace: ${app.getApplicationNamespace} " + s"applicationTag: ${app.getApplicationTag}") - val report = kubernetesClient.getApplicationReport(livyConf, app, + val report = kubernetesClient.inNamespace(namespace).getApplicationReport(livyConf, app, cacheLogSize = cacheLogSize) report } @@ -399,7 +410,7 @@ class SparkKubernetesApp private[utils] ( def kubernetesApplication: KubernetesApplication = applicationDetails.get.get if (kubernetesApplication != null && kubernetesApplication.getApplicationId != null) { try { - withRetry(kubernetesClient.killApplication( + withRetry(kubernetesClient.inNamespace(namespace).killApplication( Await.result(appPromise.future, appLookupTimeout))) } catch { // We cannot kill the Kubernetes app without the appTag. @@ -440,10 +451,11 @@ class SparkKubernetesApp private[utils] ( private def getAppFromTag( appTag: String, pollInterval: duration.Duration, - deadline: Deadline): Option[KubernetesApplication] = { + deadline: Deadline, + namespace: String): Option[KubernetesApplication] = { import KubernetesExtensions._ - - withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))) + withRetry(kubernetesClient.inNamespace(namespace).getApplications() + .find(_.getApplicationTag.contains(appTag))) match { case Some(app) => Some(app) case None => @@ -686,8 +698,7 @@ private[utils] object KubernetesExtensions { appTagLabel: String = SPARK_APP_TAG_LABEL, appIdLabel: String = SPARK_APP_ID_LABEL ): Seq[KubernetesApplication] = { - client.pods.inAnyNamespace - .withLabels(labels.asJava) + client.pods.withLabels(labels.asJava) .withLabel(appTagLabel) .withLabel(appIdLabel) .list.getItems.asScala.map(new KubernetesApplication(_)) diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 401a8beb1..481fcae3d 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -143,7 +143,7 @@ class BatchSessionSpec val req = new CreateBatchRequest() val name = Some("Test Batch Session") val mockApp = mock[SparkApp] - val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None) + val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None, "") val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp)) batch.state shouldBe (SessionState.Recovering) diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index e7d651f89..2b5ccf72a 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -279,7 +279,7 @@ class InteractiveSessionSpec extends FunSpec val m = InteractiveRecoveryMetadata( 78, Some("Test session"), None, "appTag", Spark, 0, null, None, None, None, None, None, None, Map.empty[String, String], List.empty[String], List.empty[String], - List.empty[String], None, List.empty[String], None, None, Some(URI.create(""))) + List.empty[String], None, List.empty[String], None, None, Some(URI.create("")), "") val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) s.start() @@ -298,7 +298,7 @@ class InteractiveSessionSpec extends FunSpec val m = InteractiveRecoveryMetadata( 78, None, None, "appTag", Spark, 0, null, None, None, None, None, None, None, Map.empty[String, String], List.empty[String], List.empty[String], - List.empty[String], None, List.empty[String], None, None, Some(URI.create(""))) + List.empty[String], None, List.empty[String], None, None, Some(URI.create("")), "") val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) s.start() @@ -315,7 +315,7 @@ class InteractiveSessionSpec extends FunSpec val m = InteractiveRecoveryMetadata( 78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None, None, None, None, Map.empty[String, String], List.empty[String], List.empty[String], - List.empty[String], None, List.empty[String], None, None, None) + List.empty[String], None, List.empty[String], None, None, None, "") val s = InteractiveSession.recover(m, conf, sessionStore, None) s.start() s.state shouldBe a[SessionState.Dead] diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 363b01f89..62b35b6e0 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -215,7 +215,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit implicit def executor: ExecutionContext = ExecutionContext.global def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = { - BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None) + BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None, "") } def mockSession(id: Int): BatchSession = { diff --git a/server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala new file mode 100644 index 000000000..fc668b558 --- /dev/null +++ b/server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.utils + +import java.io.{File, PrintWriter} +import java.nio.file.Files + +import org.scalatest.{BeforeAndAfterAll, FunSpec} + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} + +class SparkAppSpec extends FunSpec with LivyBaseUnitTestSuite with BeforeAndAfterAll { + + private def k8sConf(sparkHome: Option[String] = None): LivyConf = { + val conf = new LivyConf(false) + conf.set(LivyConf.LIVY_SPARK_MASTER, "k8s://https://kubernetes.default.svc:443") + sparkHome.foreach(conf.set(LivyConf.SPARK_HOME, _)) + conf + } + + /** Create a throwaway SPARK_HOME with the given spark-defaults.conf contents (or none). */ + private def withSparkHome(defaultsContent: Option[String])(f: String => Unit): Unit = { + val sparkHome = Files.createTempDirectory("livy-spark-home").toFile + try { + val confDir = new File(sparkHome, "conf") + assert(confDir.mkdirs()) + defaultsContent.foreach { content => + val writer = new PrintWriter(new File(confDir, "spark-defaults.conf")) + try writer.write(content) finally writer.close() + } + f(sparkHome.getAbsolutePath) + } finally { + deleteRecursively(sparkHome) + } + } + + private def deleteRecursively(file: File): Unit = { + if (file.isDirectory) { + Option(file.listFiles()).foreach(_.foreach(deleteRecursively)) + } + file.delete() + } + + describe("SparkApp.getNamespace") { + + it("should return an empty namespace when not running on Kubernetes") { + val conf = new LivyConf(false) + conf.set(LivyConf.LIVY_SPARK_MASTER, "yarn") + // A namespace in the conf must be ignored for non-Kubernetes masters. + val sparkConf = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "ignored") + assert(SparkApp.getNamespace(sparkConf, conf) === "") + } + + it("should prefer the namespace from the session Spark conf") { + val sparkConf = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "team-a") + assert(SparkApp.getNamespace(sparkConf, k8sConf()) === "team-a") + } + + it("should fall back to spark-defaults.conf when the conf has no namespace") { + withSparkHome(Some(s"${SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY} team-b\n")) { sparkHome => + assert(SparkApp.getNamespace(Map.empty, k8sConf(Some(sparkHome))) === "team-b") + } + } + + it("should fall back to the default namespace when spark-defaults.conf is absent") { + withSparkHome(None) { sparkHome => + assert(SparkApp.getNamespace(Map.empty, k8sConf(Some(sparkHome))) === + SparkApp.DEFAULT_KUBERNETES_NAMESPACE) + } + } + + it("should fall back to the default namespace when spark-defaults.conf lacks the key") { + withSparkHome(Some("spark.executor.memory 1g\n")) { sparkHome => + assert(SparkApp.getNamespace(Map.empty, k8sConf(Some(sparkHome))) === + SparkApp.DEFAULT_KUBERNETES_NAMESPACE) + } + } + + it("should fall back to the default namespace when SPARK_HOME is not set") { + // No SPARK_HOME on the conf; getNamespace must not throw and must use the default. + assert(SparkApp.getNamespace(Map.empty, k8sConf()) === + SparkApp.DEFAULT_KUBERNETES_NAMESPACE) + } + + it("should treat an empty namespace in the conf as unset") { + withSparkHome(Some(s"${SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY} team-c\n")) { sparkHome => + val sparkConf = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "") + assert(SparkApp.getNamespace(sparkConf, k8sConf(Some(sparkHome))) === "team-c") + } + } + } +} From 7342a22ffac935391f77674ed089a1f5473b4e3d Mon Sep 17 00:00:00 2001 From: Venugopal Allenki Date: Wed, 19 Nov 2025 18:45:51 +0530 Subject: [PATCH 2/6] Livy Interactive session fixes --- .../java/org/apache/livy/rsc/ContextLauncher.java | 13 +++++++++---- rsc/src/main/java/org/apache/livy/rsc/RSCConf.java | 3 +++ .../org/apache/livy/utils/SparkKubernetesApp.scala | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index 8c4494901..a68b86ea5 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -327,13 +327,18 @@ void dispose() { //Note. Your compiler or IDE may identify this method as unused //tests fail without it public void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) { - InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); - String ip = insocket.getAddress().getHostAddress(); - ContextInfo info = new ContextInfo(ip, msg.port, clientId, secret); + String driverHost; + if (conf.getBoolean(DRIVER_ADDRESS_USE_HOSTNAME)) { + driverHost = msg.host; + } else { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + driverHost = insocket.getAddress().getHostAddress(); + } + ContextInfo info = new ContextInfo(driverHost, msg.port, clientId, secret); if (promise.trySuccess(info)) { timeout.cancel(true); LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(), - msg.host, msg.port); + driverHost, msg.port); } else { LOG.warn("Connection established but promise is already finalized."); } diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 933948fa3..ea3974ac3 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -58,6 +58,9 @@ public enum Entry implements ConfEntry { // How long will the RSC wait for a connection for a Livy server before shutting itself down. SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"), + // Use driver's reported hostname instead of socket IP. Useful for Kubernetes + Istio mode. + DRIVER_ADDRESS_USE_HOSTNAME("driver.address.use-hostname", false), + PROXY_USER("proxy-user", null), RPC_SERVER_ADDRESS("rpc.server.address", null), diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index e271ce04a..887b3ce9c 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -705,7 +705,7 @@ private[utils] object KubernetesExtensions { } def killApplication(app: KubernetesApplication): Boolean = { - client.pods.inAnyNamespace.delete(app.getApplicationPod) + client.pods.inNamespace(app.getApplicationNamespace).delete(app.getApplicationPod) } def getApplicationReport( From e6d6155d2b7f87c417bb8670606f81835da21535 Mon Sep 17 00:00:00 2001 From: Venugopal Allenki Date: Thu, 30 Apr 2026 18:51:49 +0530 Subject: [PATCH 3/6] Reduce Kubernetes API load in SparkKubernetesApp polling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SparkKubernetesApp.getApplicationReport previously issued a single LIST returning every pod tagged with the session's app tag — on large jobs this deserializes hundreds of executor pods into memory each poll cycle, even though application state is derived only from the driver pod. Two changes: - Narrow the driver lookup to spark-role=driver at the API server so the LIST returns a single pod. - Gate the separate executor LIST on a new config flag livy.server.kubernetes.executor-tracking.enabled (default true, so existing deployments see no behavior change). Operators running large clusters can disable it to skip the executor LIST entirely. When disabled, per-executor Grafana/Loki log URLs and per-executor entries in session diagnostics are omitted; driver state tracking is unaffected. Adds two tests: default-enabled invariant and diagnostics graceful degradation when executors is empty. --- conf/livy.conf.template | 4 +++ .../main/scala/org/apache/livy/LivyConf.scala | 5 +++ .../livy/utils/SparkKubernetesApp.scala | 32 ++++++++++++++---- .../livy/utils/SparkKubernetesAppSpec.scala | 33 +++++++++++++++++++ 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 0299dca7f..04610c235 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -267,6 +267,10 @@ # details, routes, etc...) # livy.server.kubernetes.poll-interval = 15s +# Whether Livy fetches executor pods each poll cycle (used for executor log URLs and +# per-executor diagnostics). Disable on large clusters; driver pod is polled regardless. +# livy.server.kubernetes.executor-tracking.enabled = true + # Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, configure the desired # options below # livy.server.kubernetes.ingress.create = false diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 006ab5dff..12e16d169 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -304,6 +304,11 @@ object LivyConf { // How often Livy polls Kubernetes to refresh Kubernetes app state. val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s") + // Whether Livy fetches executor pods each poll cycle (used for executor log URLs + // and per-executor diagnostics). Driver pod is polled regardless. + val KUBERNETES_EXECUTOR_TRACKING_ENABLED = + Entry("livy.server.kubernetes.executor-tracking.enabled", true) + // How long to check livy session leakage. val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT = Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s") diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 887b3ce9c..3237894b2 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -166,6 +166,11 @@ object SparkKubernetesApp extends Logging { livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL) sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT) + if (!livyConf.getBoolean(LivyConf.KUBERNETES_EXECUTOR_TRACKING_ENABLED)) { + info("Kubernetes executor tracking is disabled. Per-executor log URLs and " + + "per-executor entries in session diagnostics will be omitted.") + } + leakedAppsGCThread.setDaemon(true) leakedAppsGCThread.setName("LeakedAppsGCThread") leakedAppsGCThread.start() @@ -714,12 +719,27 @@ private[utils] object KubernetesExtensions { cacheLogSize: Int, appTagLabel: String = SPARK_APP_TAG_LABEL ): KubernetesAppReport = { - val pods = client.pods.inNamespace(app.getApplicationNamespace) - .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) - .list.getItems.asScala - val driver = pods.find(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER) - val executors = - pods.filter(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR) + // Narrow the LIST to the driver pod; application state does not depend on executors. + val driver = client.pods.inNamespace(app.getApplicationNamespace) + .withLabels(Map( + appTagLabel -> app.getApplicationTag, + SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER + ).asJava) + .list.getItems.asScala.headOption + + // Executors are used only for log URLs and per-executor diagnostics; skip when disabled. + val executors: Seq[Pod] = + if (livyConf.getBoolean(LivyConf.KUBERNETES_EXECUTOR_TRACKING_ENABLED)) { + client.pods.inNamespace(app.getApplicationNamespace) + .withLabels(Map( + appTagLabel -> app.getApplicationTag, + SPARK_ROLE_LABEL -> SPARK_ROLE_EXECUTOR + ).asJava) + .list.getItems.asScala + } else { + Seq.empty + } + val appLog = Try( client.pods.inNamespace(app.getApplicationNamespace) .withName(app.getApplicationPod.getMetadata.getName) diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index 24e66e0d9..708bfc35a 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -121,6 +121,32 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef .getExecutorsLogUrls.isEmpty) } + it("should return diagnostics without executor entries when executors is empty") { + // When livy.server.kubernetes.executor-tracking.enabled=false, the + // executor LIST in getApplicationReport is skipped and executors is + // passed in as Seq.empty. Diagnostics must still render the driver + // section without error. + val driverStatus = when(mock[PodStatus].getPhase).thenReturn("Running") + .getMock[PodStatus] + val driverMeta = when(mock[ObjectMeta].getName).thenReturn("driver-pod") + .getMock[ObjectMeta] + when(driverMeta.getNamespace).thenReturn("ns") + when(driverMeta.getLabels).thenReturn(Map.empty[String, String].asJava) + val driverSpec = when(mock[PodSpec].getNodeName).thenReturn("node-1") + .getMock[PodSpec] + when(driverSpec.getContainers).thenReturn(java.util.Collections.emptyList[Container]) + when(driverStatus.getConditions).thenReturn(java.util.Collections.emptyList[PodCondition]) + val driver = when(mock[Pod].getStatus).thenReturn(driverStatus).getMock[Pod] + when(driver.getMetadata).thenReturn(driverMeta) + when(driver.getSpec).thenReturn(driverSpec) + + val diagnostics = KubernetesAppReport( + Some(driver), Seq.empty, IndexedSeq.empty, None, new LivyConf(false) + ).getApplicationDiagnostics + assert(diagnostics.exists(_.contains("driver-pod"))) + assert(!diagnostics.exists(_.contains("executor"))) + } + it("should return driver ingress url") { def livyConf(protocol: Option[String]): LivyConf = { @@ -208,6 +234,13 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef KubernetesClientFactory.createKubernetesClient(conf) } } + + it("should enable executor tracking by default") { + // Preserve existing behavior: operators must opt in to skipping the + // executor LIST. This guards against an accidental default flip that + // would silently drop executor entries from session diagnostics. + assert(new LivyConf(false).getBoolean(LivyConf.KUBERNETES_EXECUTOR_TRACKING_ENABLED)) + } } describe("KubernetesClientExtensions") { From 88a430b6e243f429180ba001f373ce48d48783d4 Mon Sep 17 00:00:00 2001 From: Venugopal Allenki Date: Tue, 12 May 2026 20:38:32 +0530 Subject: [PATCH 4/6] Add SessionManager refresh and /recovery operator endpoint --- .../org/apache/livy/server/LivyServer.scala | 37 ++++++- .../apache/livy/sessions/SessionManager.scala | 43 +++++++- .../livy/sessions/SessionManagerSpec.scala | 99 +++++++++++++++++++ 3 files changed, 177 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 7b6f323e0..931fb33e7 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -41,7 +41,7 @@ import org.apache.livy.server.batch.BatchSessionServlet import org.apache.livy.server.interactive.InteractiveSessionServlet import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManager} import org.apache.livy.server.ui.UIServlet -import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager} +import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager, SessionManager} import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp} import org.apache.livy.utils.LivySparkUtils._ @@ -176,6 +176,39 @@ class LivyServer extends Logging { } } + // Operator endpoint: re-scan the recovery state store and import any sessions + // written by another Livy server. Guarded by livy.superusers. + val recoveryServlet = new JsonServlet { + before() { + contentType = "application/json" + val user = request.getRemoteUser() + if (!accessManager.checkSuperUser(user)) { + halt(403, Map("msg" -> s"User '$user' not authorized for recovery endpoints.")) + } + } + + private def resultMap(r: SessionManager.RefreshResult): Map[String, Int] = + Map("added" -> r.added, "total" -> r.total, "failed" -> r.failed) + + post("/sessions/refresh") { + info(s"Interactive session refresh triggered by user='${request.getRemoteUser()}'") + resultMap(interactiveSessionManager.refresh()) + } + + post("/batches/refresh") { + info(s"Batch session refresh triggered by user='${request.getRemoteUser()}'") + resultMap(batchSessionManager.refresh()) + } + + post("/refresh") { + info(s"Full session refresh triggered by user='${request.getRemoteUser()}'") + Map( + "batches" -> resultMap(batchSessionManager.refresh()), + "sessions" -> resultMap(interactiveSessionManager.refresh()) + ) + } + } + // Servlet for hosting static files such as html, css, and js // Necessary since Jetty cannot set it's resource base inside a jar // Returns 404 if the file does not exist @@ -254,6 +287,8 @@ class LivyServer extends Logging { context.mountMetricsAdminServlet("/metrics") mount(context, livyVersionServlet, "/version/*") + + mount(context, recoveryServlet, "/recovery/*") } catch { case e: Throwable => error("Exception thrown when initializing server", e) diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala index 1dc1d820e..cb41e8c85 100644 --- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala @@ -36,6 +36,9 @@ import org.apache.livy.sessions.Session.RecoveryMetadata object SessionManager { val SESSION_RECOVERY_MODE_OFF = "off" val SESSION_RECOVERY_MODE_RECOVERY = "recovery" + + /** Outcome of a [[SessionManager.refresh]] call. */ + case class RefreshResult(added: Int, total: Int, failed: Int) } class BatchSessionManager( @@ -100,8 +103,14 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( } def register(session: S): S = { - info(s"Registering new session ${session.id}") synchronized { + sessions.get(session.id) match { + case Some(existing) => + debug(s"Session ${session.id} already registered; skipping duplicate registration.") + return existing + case None => + } + info(s"Registering new session ${session.id}") session.name.foreach { sessionName => if (sessionsByName.contains(sessionName)) { val errMsg = s"Duplicate session name: ${session.name} for session ${session.id}" @@ -229,6 +238,38 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( recoveredSessions } + /** + * Re-scan the state store and import sessions written by another Livy server. + * Add-only: in-memory sessions are not removed even if their state-store entry is + * gone. The id counter is advanced forward only. + */ + def refresh(): RefreshResult = { + // Read the state store outside the SessionManager monitor so we don't block the + // garbage collector and heartbeat watchdog while doing N small EFS / ZK reads. + val storeNextId = sessionStore.getNextSessionId(sessionType) + val sessionMetadata = sessionStore.getAllSessions[R](sessionType) + + val recoveryFailure = sessionMetadata.filter(_.isFailure).map(_.failed.get) + recoveryFailure.foreach(ex => warn(s"Refresh failure for $sessionType: ${ex.getMessage}", ex)) + + synchronized { + if (storeNextId > idCounter.get) { + idCounter.set(storeNextId) + } + + val before = sessions.size + sessionMetadata.flatMap(_.toOption) + .filterNot(m => sessions.contains(m.id)) + .map(sessionRecovery) + .foreach(register) + + val added = sessions.size - before + info(s"Refreshed $sessionType sessions: added=$added, total=${sessions.size}," + + s" failed=${recoveryFailure.size}, next session id=$idCounter") + RefreshResult(added, sessions.size, recoveryFailure.size) + } + } + private class GarbageCollector extends Thread("session gc thread") { setDaemon(true) diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 62b35b6e0..21798b48e 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -209,6 +209,16 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit when(session.lastActivity).thenReturn(System.nanoTime()) when(session.state).thenReturn(state) } + + it("should be idempotent on register() for a session id already tracked") { + val (livyConf, manager) = createSessionManager() + val session = new MockSession(manager.nextId(), null, livyConf, Some("foo")) + val first = manager.register(session) + val again = manager.register(session) + again should be theSameInstanceAs first + manager.size() should be (1) + session.stopped should be (false) + } } describe("BatchSessionManager") { @@ -305,5 +315,94 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit verify(session, never).stop() } + + it("refresh should be a no-op when state store has no new sessions") { + val conf = new LivyConf() + val sessionStore = mock[SessionStore] + when(sessionStore.getNextSessionId("batch")).thenReturn(0) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq.empty) + + val sm = new BatchSessionManager(conf, sessionStore) + val result = sm.refresh() + result.added shouldBe 0 + result.total shouldBe 0 + result.failed shouldBe 0 + } + + it("refresh should import only previously-unseen sessions and skip already-known ids") { + val conf = new LivyConf() + conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster") + + val sessionStore = mock[SessionStore] + // Initial recovery: only id=0. + when(sessionStore.getNextSessionId("batch")).thenReturn(1) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq(Try(makeMetadata(0, "t0")))) + + val sm = new BatchSessionManager(conf, sessionStore) + sm.size() shouldBe 1 + sm.get(0) shouldBe defined + + // After a peer writes id=1 to the state store, refresh should add it without + // re-registering id=0. + when(sessionStore.getNextSessionId("batch")).thenReturn(2) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq(Try(makeMetadata(0, "t0")), Try(makeMetadata(1, "t1")))) + + val r1 = sm.refresh() + r1.added shouldBe 1 + r1.total shouldBe 2 + sm.get(1) shouldBe defined + + // A second refresh with no new entries should add nothing. + val r2 = sm.refresh() + r2.added shouldBe 0 + r2.total shouldBe 2 + } + + it("refresh should advance the id counter forward but never backward") { + val conf = new LivyConf() + val sessionStore = mock[SessionStore] + when(sessionStore.getNextSessionId("batch")).thenReturn(0) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq.empty) + + val sm = new BatchSessionManager(conf, sessionStore) + + // Local counter ahead of state store. + sm.nextId() shouldBe 0 + sm.nextId() shouldBe 1 + sm.nextId() shouldBe 2 + + // State store reports a smaller next id; refresh must NOT regress local counter. + when(sessionStore.getNextSessionId("batch")).thenReturn(1) + sm.refresh() + sm.nextId() shouldBe 3 + + // State store reports a larger next id; refresh SHOULD jump local counter forward. + when(sessionStore.getNextSessionId("batch")).thenReturn(99) + sm.refresh() + sm.nextId() shouldBe 99 + } + + it("refresh should report deserialization failures via failed count") { + val conf = new LivyConf() + conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster") + + val sessionStore = mock[SessionStore] + when(sessionStore.getNextSessionId("batch")).thenReturn(1) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq( + Try(makeMetadata(0, "t0")), + Failure(new java.io.IOException("corrupted entry id=42")), + Failure(new java.io.IOException("corrupted entry id=43")))) + + val sm = new BatchSessionManager(conf, sessionStore) + val r = sm.refresh() + r.added shouldBe 0 + r.total shouldBe 1 + r.failed shouldBe 2 + } } } From 2ae3f20aeda218da2b5d3a6b4fe0561ed2a1d269 Mon Sep 17 00:00:00 2001 From: Venugopal Allenki Date: Tue, 19 May 2026 22:24:28 +0530 Subject: [PATCH 5/6] Fix recovery-file leak after session deletion Sessions deleted via DELETE / idle GC / heartbeat watchdog / retention GC had their recovery file in the state store resurrected by the next SparkKubernetesApp monitor tick: stopSession() called app.kill(), but kill() left the app on the global appQueue, and the in-flight tick fell through changeState(KILLED|FAILED) to listener.appIdKnown(), which unconditionally re-saved recovery metadata after SessionManager.delete had just removed it. End state: in-memory map clean, state store retains a zombie file. Same shape exists for batch sessions on DELETE / FAILED / KILLED, with a one-tick race window. The leaked files also skewed our state-store metrics so we did not get an accurate picture of state. Fix: - SparkKubernetesApp.kill() now also removes the app from the shared monitor queue, alongside flipping the killed flag. - monitorSparkKubernetesApp early-returns after changeState(KILLED) and changeState(FAILED) so the body cannot fall through to appIdKnown after a terminal state observation. - killed becomes AtomicBoolean for visibility from the unsynchronized monitor read. The SparkApp trait, InteractiveSession.stopSession, and BatchSession.stopSession are unchanged. Tests: - SparkKubernetesAppSpec verifies kill() removes from appQueue and is idempotent for the queue removal. - BatchSessionSpec verifies stopSession invokes kill() on the app. --- .../livy/utils/SparkKubernetesApp.scala | 19 +++++++++++--- .../livy/server/batch/BatchSessionSpec.scala | 12 +++++++++ .../livy/utils/SparkKubernetesAppSpec.scala | 26 +++++++++++++++++++ 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 3237894b2..d11012bb4 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -19,6 +19,7 @@ package org.apache.livy.utils import java.net.URLEncoder import java.util.Collections import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.tailrec import scala.collection.JavaConverters.asScalaSetConverter @@ -266,7 +267,8 @@ class SparkKubernetesApp private[utils] ( import SparkKubernetesApp._ appQueue.add(this) - private var killed = false + // Atomic so the monitor thread sees the write from kill() without locking. + private val killed = new AtomicBoolean(false) private val appPromise: Promise[KubernetesApplication] = Promise() private[utils] var state: SparkApp.State = SparkApp.State.STARTING private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] @@ -300,10 +302,15 @@ class SparkKubernetesApp private[utils] ( private def monitorSparkKubernetesApp(): Unit = { try { - if (killed) { + // Skip the body if the app already reached a terminal state. + // Falling through would re-fire appIdKnown. + if (killed.get()) { changeState(SparkApp.State.KILLED) - } else if (isProcessErrExit) { + return + } + if (isProcessErrExit) { changeState(SparkApp.State.FAILED) + return } // Get KubernetesApplication by appTag. val appOption: Option[KubernetesApplication] = try { @@ -399,7 +406,11 @@ class SparkKubernetesApp private[utils] ( ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics) override def kill(): Unit = synchronized { - killed = true + killed.set(true) + // Detach from the shared monitor queue so a polling tick cannot fall + // through to appIdKnown and resurrect recovery state after the owning + // session has been deleted. + appQueue.remove(this) if (!isRunning) { return diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 481fcae3d..f143734c7 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -138,6 +138,18 @@ class BatchSessionSpec }) should be (true) } + it("should call kill on the app on stopSession") { + val conf = new LivyConf() + val mockApp = mock[SparkApp] + val m = BatchRecoveryMetadata(99, None, None, "appTag", null, None, "") + val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp)) + batch.start() + + batch.stopSession() + + verify(mockApp).kill() + } + def testRecoverSession(name: Option[String]): Unit = { val conf = new LivyConf() val req = new CreateBatchRequest() diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index 708bfc35a..e06edbe59 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -207,6 +207,32 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef } + describe("SparkKubernetesApp.kill") { + it("should remove the app from the monitor queue and be idempotent") { + // Drain anything left over from prior tests so we can assert exact size. + SparkKubernetesApp.clearApps + val sizeBefore = SparkKubernetesApp.getAppSize + val app = new SparkKubernetesApp( + "test-kill-tag", + None, + None, + None, + new LivyConf(false), + Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "ns"), + SparkKubernetesApp.kubernetesClient) + + // Constructor enqueues itself. + assert(SparkKubernetesApp.getAppSize === sizeBefore + 1) + + app.kill() + assert(SparkKubernetesApp.getAppSize === sizeBefore) + + // Idempotent: a second call must be a no-op for the queue. + app.kill() + assert(SparkKubernetesApp.getAppSize === sizeBefore) + } + } + describe("KubernetesClientFactory") { it("should build KubernetesApi url from LivyConf masterUrl") { def actual(sparkMaster: String): String = From 88d3c11e8dab76e9ba371b402825e6faba49ab8f Mon Sep 17 00:00:00 2001 From: Venugopal Allenki Date: Tue, 9 Jun 2026 14:00:42 +0530 Subject: [PATCH 6/6] Fix Promise.failure race that falsely fails healthy Spark apps SparkKubernetesApp.monitorSparkKubernetesApp re-runs getAppFromTag on every poll tick. Once an earlier tick has set appPromise via trySuccess(app), a later tick that hits an exception in getAppFromTag called the non-idempotent Promise.failure(e) on the already-completed promise. That throws IllegalStateException("Promise already completed."), which escapes the inner try and lands in the outer NonFatal handler, transitioning the app to FAILED. The session listener (BatchSession / InteractiveSession stateChanged) then flips the session to Dead, killing a healthy app. The race was always latent but only became reachable in practice after livy.server.kubernetes.app-lookup.thread-pool.size was raised above 1: the higher k8s API pressure makes withRetry exhaustion in getAppFromTag (e.g., during istio-proxy restarts) frequent enough to hit the catch block after the promise is already set. Switch the catch block to Promise.tryFailure(e) so a late failure on an already-completed promise is dropped instead of throwing. This matches the symmetric trySuccess(app) used on the success path two lines below. Behavior on the cold path is unchanged: tryFailure stores the failure exactly like failure when the promise is still pending, and no caller depends on the throw (the catch block returns immediately). Adds two regression tests in SparkKubernetesAppSpec that pin the idempotency contract in both directions. --- .../livy/utils/SparkKubernetesApp.scala | 2 +- .../livy/utils/SparkKubernetesAppSpec.scala | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index d11012bb4..d1debbb15 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -319,7 +319,7 @@ class SparkKubernetesApp private[utils] ( case e: Exception => failToGetAppId() error(s"Exception getting app from tag $appTag in namespace $namespace with message: ", e) - appPromise.failure(e) + appPromise.tryFailure(e) return } if (appOption.isEmpty) { diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index e06edbe59..a353c380c 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -18,6 +18,8 @@ package org.apache.livy.utils import java.util.Objects._ +import scala.concurrent.Promise + import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule, IngressSpec} import io.fabric8.kubernetes.client.KubernetesClient @@ -207,6 +209,24 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef } + describe("SparkKubernetesApp.appPromise") { + it("tryFailure must be a no-op after trySuccess (no IllegalStateException)") { + val p = Promise[KubernetesApplication]() + val app = mock[KubernetesApplication] + assert(p.trySuccess(app)) + assert(!p.tryFailure(new IllegalStateException("simulated k8s API error"))) + assert(p.future.value.exists(_.isSuccess)) + } + + it("trySuccess must be a no-op after tryFailure") { + val p = Promise[KubernetesApplication]() + assert(p.tryFailure(new IllegalStateException("simulated k8s API error"))) + val app = mock[KubernetesApplication] + assert(!p.trySuccess(app)) + assert(p.future.value.exists(_.isFailure)) + } + } + describe("SparkKubernetesApp.kill") { it("should remove the app from the monitor queue and be idempotent") { // Drain anything left over from prior tests so we can assert exact size.