diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index 0299dca7f..04610c235 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -267,6 +267,10 @@
# details, routes, etc...)
# livy.server.kubernetes.poll-interval = 15s
+# Whether Livy fetches executor pods each poll cycle (used for executor log URLs and
+# per-executor diagnostics). Disable on large clusters; driver pod is polled regardless.
+# livy.server.kubernetes.executor-tracking.enabled = true
+
# Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, configure the desired
# options below
# livy.server.kubernetes.ingress.create = false
diff --git a/pom.xml b/pom.xml
index 218b43153..c52be2c36 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@
1.7.36
3.3.4
${spark.scala-2.12.version}
- 5.6.0
+ 6.8.1
3.0.0
1.15
3.17.0
@@ -1399,6 +1399,13 @@
+
+ hadoop3
+
+ 3
+ 3.4.0
+
+
hadoop2
@@ -1432,7 +1439,7 @@
1.8
0.10.9.7
3.7.0-M11
- 4.1.96.Final
+ 4.1.108.Final
2.15.2
2.15.2
spark-${spark.version}-bin-hadoop${hadoop.major-minor.version}
diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
index 8c4494901..a68b86ea5 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
@@ -327,13 +327,18 @@ void dispose() {
//Note. Your compiler or IDE may identify this method as unused
//tests fail without it
public void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
- InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
- String ip = insocket.getAddress().getHostAddress();
- ContextInfo info = new ContextInfo(ip, msg.port, clientId, secret);
+ String driverHost;
+ if (conf.getBoolean(DRIVER_ADDRESS_USE_HOSTNAME)) {
+ driverHost = msg.host;
+ } else {
+ InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
+ driverHost = insocket.getAddress().getHostAddress();
+ }
+ ContextInfo info = new ContextInfo(driverHost, msg.port, clientId, secret);
if (promise.trySuccess(info)) {
timeout.cancel(true);
LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(),
- msg.host, msg.port);
+ driverHost, msg.port);
} else {
LOG.warn("Connection established but promise is already finalized.");
}
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
index 933948fa3..ea3974ac3 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
@@ -58,6 +58,9 @@ public enum Entry implements ConfEntry {
// How long will the RSC wait for a connection for a Livy server before shutting itself down.
SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"),
+ // Use driver's reported hostname instead of socket IP. Useful for Kubernetes + Istio mode.
+ DRIVER_ADDRESS_USE_HOSTNAME("driver.address.use-hostname", false),
+
PROXY_USER("proxy-user", null),
RPC_SERVER_ADDRESS("rpc.server.address", null),
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 006ab5dff..12e16d169 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -304,6 +304,11 @@ object LivyConf {
// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")
+ // Whether Livy fetches executor pods each poll cycle (used for executor log URLs
+ // and per-executor diagnostics). Driver pod is polled regardless.
+ val KUBERNETES_EXECUTOR_TRACKING_ENABLED =
+ Entry("livy.server.kubernetes.executor-tracking.enabled", true)
+
// How long to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
index 7b6f323e0..931fb33e7 100644
--- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala
+++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
@@ -41,7 +41,7 @@ import org.apache.livy.server.batch.BatchSessionServlet
import org.apache.livy.server.interactive.InteractiveSessionServlet
import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManager}
import org.apache.livy.server.ui.UIServlet
-import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
+import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager, SessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
import org.apache.livy.utils.LivySparkUtils._
@@ -176,6 +176,39 @@ class LivyServer extends Logging {
}
}
+ // Operator endpoint: re-scan the recovery state store and import any sessions
+ // written by another Livy server. Guarded by livy.superusers.
+ val recoveryServlet = new JsonServlet {
+ before() {
+ contentType = "application/json"
+ val user = request.getRemoteUser()
+ if (!accessManager.checkSuperUser(user)) {
+ halt(403, Map("msg" -> s"User '$user' not authorized for recovery endpoints."))
+ }
+ }
+
+ private def resultMap(r: SessionManager.RefreshResult): Map[String, Int] =
+ Map("added" -> r.added, "total" -> r.total, "failed" -> r.failed)
+
+ post("/sessions/refresh") {
+ info(s"Interactive session refresh triggered by user='${request.getRemoteUser()}'")
+ resultMap(interactiveSessionManager.refresh())
+ }
+
+ post("/batches/refresh") {
+ info(s"Batch session refresh triggered by user='${request.getRemoteUser()}'")
+ resultMap(batchSessionManager.refresh())
+ }
+
+ post("/refresh") {
+ info(s"Full session refresh triggered by user='${request.getRemoteUser()}'")
+ Map(
+ "batches" -> resultMap(batchSessionManager.refresh()),
+ "sessions" -> resultMap(interactiveSessionManager.refresh())
+ )
+ }
+ }
+
// Servlet for hosting static files such as html, css, and js
// Necessary since Jetty cannot set it's resource base inside a jar
// Returns 404 if the file does not exist
@@ -254,6 +287,8 @@ class LivyServer extends Logging {
context.mountMetricsAdminServlet("/metrics")
mount(context, livyVersionServlet, "/version/*")
+
+ mount(context, recoveryServlet, "/recovery/*")
} catch {
case e: Throwable =>
error("Exception thrown when initializing server", e)
diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index 8b64a0398..2d4bd4a5b 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -40,6 +40,7 @@ case class BatchRecoveryMetadata(
appTag: String,
owner: String,
proxyUser: Option[String],
+ namespace: String,
version: Int = 1)
extends RecoveryMetadata
@@ -64,7 +65,7 @@ object BatchSession extends Logging {
mockApp: Option[SparkApp] = None): BatchSession = {
val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)
-
+ val namespace = SparkApp.getNamespace(request.conf, livyConf)
def createSparkApp(s: BatchSession): SparkApp = {
val conf = SparkApp.prepareSparkConf(
appTag,
@@ -106,7 +107,8 @@ object BatchSession extends Logging {
childProcesses.decrementAndGet()
}
}
- SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s))
+ val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace)
+ SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s), extrasMap)
}
info(s"Creating batch session $id: [owner: $owner, request: $request]")
@@ -120,6 +122,7 @@ object BatchSession extends Logging {
owner,
impersonatedUser,
sessionStore,
+ namespace,
mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
}
@@ -137,8 +140,10 @@ object BatchSession extends Logging {
m.owner,
m.proxyUser,
sessionStore,
+ m.namespace,
mockApp.map { m => (_: BatchSession) => m }.getOrElse { s =>
- SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s))
+ val extrasMap = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> m.namespace)
+ SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s), extrasMap)
})
}
}
@@ -152,6 +157,7 @@ class BatchSession(
owner: String,
override val proxyUser: Option[String],
sessionStore: SessionStore,
+ namespace: String,
sparkApp: BatchSession => SparkApp)
extends Session(id, name, owner, livyConf) with SparkAppListener {
import BatchSession._
@@ -204,5 +210,5 @@ class BatchSession(
override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
override def recoveryMetadata: RecoveryMetadata =
- BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser)
+ BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, namespace)
}
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 0667b718c..978951f0d 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -70,6 +70,7 @@ case class InteractiveRecoveryMetadata(
// proxyUser is deprecated. It is available here only for backward compatibility
proxyUser: Option[String],
rscDriverUri: Option[URI],
+ namespace: String,
version: Int = 1)
extends RecoveryMetadata
@@ -93,6 +94,7 @@ object InteractiveSession extends Logging {
mockClient: Option[RSCClient] = None): InteractiveSession = {
val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)
+ val namespace = SparkApp.getNamespace(request.conf, livyConf)
val client = mockClient.orElse {
val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
@@ -153,6 +155,7 @@ object InteractiveSession extends Logging {
request.numExecutors,
request.pyFiles,
request.queue,
+ namespace,
mockApp)
}
@@ -193,6 +196,7 @@ object InteractiveSession extends Logging {
metadata.numExecutors,
metadata.pyFiles,
metadata.queue,
+ metadata.namespace,
mockApp)
}
@@ -432,6 +436,7 @@ class InteractiveSession(
val numExecutors: Option[Int],
val pyFiles: List[String],
val queue: Option[String],
+ val namespace: String,
mockApp: Option[SparkApp]) // For unit test.
extends Session(id, name, owner, ttl, idleTimeout, livyConf)
with SessionHeartbeat
@@ -461,11 +466,14 @@ class InteractiveSession(
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
+ val extrasMap = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace)
if (!livyConf.isRunningOnKubernetes()) {
- driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
+ driverProcess.map { _ =>
+ SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap)
+ }
} else {
// Create SparkApp for Kubernetes anyway
- Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
+ Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap))
}
}
@@ -534,7 +542,7 @@ class InteractiveSession(
heartbeatTimeout.toSeconds.toInt, owner, ttl, idleTimeout,
driverMemory, driverCores, executorMemory, executorCores, conf,
archives, files, jars, numExecutors, pyFiles, queue,
- proxyUser, rscDriverUri)
+ proxyUser, rscDriverUri, namespace)
override def state: SessionState = {
if (serverSideState == SessionState.Running) {
diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
index 1dc1d820e..cb41e8c85 100644
--- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
@@ -36,6 +36,9 @@ import org.apache.livy.sessions.Session.RecoveryMetadata
object SessionManager {
val SESSION_RECOVERY_MODE_OFF = "off"
val SESSION_RECOVERY_MODE_RECOVERY = "recovery"
+
+ /** Outcome of a [[SessionManager.refresh]] call. */
+ case class RefreshResult(added: Int, total: Int, failed: Int)
}
class BatchSessionManager(
@@ -100,8 +103,14 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
}
def register(session: S): S = {
- info(s"Registering new session ${session.id}")
synchronized {
+ sessions.get(session.id) match {
+ case Some(existing) =>
+ debug(s"Session ${session.id} already registered; skipping duplicate registration.")
+ return existing
+ case None =>
+ }
+ info(s"Registering new session ${session.id}")
session.name.foreach { sessionName =>
if (sessionsByName.contains(sessionName)) {
val errMsg = s"Duplicate session name: ${session.name} for session ${session.id}"
@@ -229,6 +238,38 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
recoveredSessions
}
+ /**
+ * Re-scan the state store and import sessions written by another Livy server.
+ * Add-only: in-memory sessions are not removed even if their state-store entry is
+ * gone. The id counter is advanced forward only.
+ */
+ def refresh(): RefreshResult = {
+ // Read the state store outside the SessionManager monitor so we don't block the
+ // garbage collector and heartbeat watchdog while doing N small EFS / ZK reads.
+ val storeNextId = sessionStore.getNextSessionId(sessionType)
+ val sessionMetadata = sessionStore.getAllSessions[R](sessionType)
+
+ val recoveryFailure = sessionMetadata.filter(_.isFailure).map(_.failed.get)
+ recoveryFailure.foreach(ex => warn(s"Refresh failure for $sessionType: ${ex.getMessage}", ex))
+
+ synchronized {
+ if (storeNextId > idCounter.get) {
+ idCounter.set(storeNextId)
+ }
+
+ val before = sessions.size
+ sessionMetadata.flatMap(_.toOption)
+ .filterNot(m => sessions.contains(m.id))
+ .map(sessionRecovery)
+ .foreach(register)
+
+ val added = sessions.size - before
+ info(s"Refreshed $sessionType sessions: added=$added, total=${sessions.size}," +
+ s" failed=${recoveryFailure.size}, next session id=$idCounter")
+ RefreshResult(added, sessions.size, recoveryFailure.size)
+ }
+ }
+
private class GarbageCollector extends Thread("session gc thread") {
setDaemon(true)
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
index e424f80fc..4c7e96bf3 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
@@ -17,6 +17,9 @@
package org.apache.livy.utils
+import java.io.{File, FileInputStream}
+import java.util.Properties
+
import scala.collection.JavaConverters._
import org.apache.livy.LivyConf
@@ -56,12 +59,52 @@ trait SparkAppListener {
*/
object SparkApp {
private val SPARK_YARN_TAG_KEY = "spark.yarn.tags"
-
+ val SPARK_KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
object State extends Enumeration {
val STARTING, RUNNING, FINISHED, FAILED, KILLED = Value
}
type State = State.Value
+ val DEFAULT_KUBERNETES_NAMESPACE = "default"
+
+ /**
+ * Resolve the Kubernetes namespace a Spark application should run in.
+ *
+ * The namespace is looked up, in order of precedence, from:
+ * 1. the session's Spark configuration ([[SPARK_KUBERNETES_NAMESPACE_KEY]]),
+ * 2. `$SPARK_HOME/conf/spark-defaults.conf` (if present),
+ * 3. the [[DEFAULT_KUBERNETES_NAMESPACE]] fallback.
+ *
+ * The namespace is only meaningful on Kubernetes, so for any other cluster
+ * manager (YARN, local) an empty string is returned without touching the
+ * filesystem.
+ */
+ def getNamespace(conf: Map[String, String], livyConf: LivyConf): String = {
+ if (!livyConf.isRunningOnKubernetes()) {
+ return ""
+ }
+ conf.get(SPARK_KUBERNETES_NAMESPACE_KEY).filter(_.nonEmpty).getOrElse {
+ namespaceFromSparkDefaults(livyConf).getOrElse(DEFAULT_KUBERNETES_NAMESPACE)
+ }
+ }
+
+ private def namespaceFromSparkDefaults(livyConf: LivyConf): Option[String] = {
+ livyConf.sparkHome().flatMap { sparkHome =>
+ val sparkDefaults = new File(sparkHome, s"conf${File.separator}spark-defaults.conf")
+ if (!sparkDefaults.isFile) {
+ None
+ } else {
+ val in = new FileInputStream(sparkDefaults)
+ try {
+ val properties = new Properties()
+ properties.load(in)
+ Option(properties.getProperty(SPARK_KUBERNETES_NAMESPACE_KEY)).filter(_.nonEmpty)
+ } finally {
+ in.close()
+ }
+ }
+ }
+ }
/**
* Return cluster manager dependent SparkConf.
*
@@ -102,11 +145,12 @@ object SparkApp {
appId: Option[String],
process: Option[LineBufferedProcess],
livyConf: LivyConf,
- listener: Option[SparkAppListener]): SparkApp = {
+ listener: Option[SparkAppListener],
+ extrasMap: Map[String, String]): SparkApp = {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
} else if (livyConf.isRunningOnKubernetes()) {
- new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf)
+ new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf, extrasMap)
} else {
require(process.isDefined, "process must not be None when Livy master is not YARN or" +
"Kubernetes.")
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
index 22ab6b8f8..d1debbb15 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
@@ -19,8 +19,11 @@ package org.apache.livy.utils
import java.net.URLEncoder
import java.util.Collections
import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.tailrec
+import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.concurrent.duration._
@@ -52,14 +55,17 @@ object SparkKubernetesApp extends Logging {
val iter = leakedAppTags.entrySet().iterator()
var isRemoved = false
val now = System.currentTimeMillis()
- val apps = withRetry(kubernetesClient.getApplications())
+ val apps = appNamespaces.flatMap { namespace =>
+ withRetry(kubernetesClient.inNamespace(namespace).getApplications())
+ }
while (iter.hasNext) {
val entry = iter.next()
apps.find(_.getApplicationTag.contains(entry.getKey))
.foreach({
app =>
info(s"Kill leaked app ${app.getApplicationId}")
- withRetry(kubernetesClient.killApplication(app))
+ withRetry(kubernetesClient.inNamespace(app.getApplicationNamespace)
+ .killApplication(app))
iter.remove()
isRemoved = true
})
@@ -138,6 +144,8 @@ object SparkKubernetesApp extends Logging {
private var sessionLeakageCheckInterval: Long = _
var kubernetesClient: DefaultKubernetesClient = _
+ var appNamespaces: mutable.Set[String] =
+ ConcurrentHashMap.newKeySet[String]().asScala
private var appLookupThreadPoolSize: Long = _
private var appLookupMaxFailedTimes: Long = _
@@ -146,8 +154,7 @@ object SparkKubernetesApp extends Logging {
this.livyConf = livyConf
// KubernetesClient is thread safe. Create once, share it across threads.
- kubernetesClient =
- KubernetesClientFactory.createKubernetesClient(livyConf)
+ kubernetesClient = KubernetesClientFactory.createKubernetesClient(livyConf)
cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)
appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds
@@ -160,6 +167,11 @@ object SparkKubernetesApp extends Logging {
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL)
sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT)
+ if (!livyConf.getBoolean(LivyConf.KUBERNETES_EXECUTOR_TRACKING_ENABLED)) {
+ info("Kubernetes executor tracking is disabled. Per-executor log URLs and " +
+ "per-executor entries in session diagnostics will be omitted.")
+ }
+
leakedAppsGCThread.setDaemon(true)
leakedAppsGCThread.setName("LeakedAppsGCThread")
leakedAppsGCThread.start()
@@ -245,7 +257,9 @@ class SparkKubernetesApp private[utils] (
process: Option[LineBufferedProcess],
listener: Option[SparkAppListener],
livyConf: LivyConf,
- kubernetesClient: => KubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit test.
+ extrasMap: Map[String, String],
+ // For unit test.
+ kubernetesClient: => DefaultKubernetesClient = SparkKubernetesApp.kubernetesClient)
extends SparkApp
with Logging {
@@ -253,7 +267,8 @@ class SparkKubernetesApp private[utils] (
import SparkKubernetesApp._
appQueue.add(this)
- private var killed = false
+ // Atomic so the monitor thread sees the write from kill() without locking.
+ private val killed = new AtomicBoolean(false)
private val appPromise: Promise[KubernetesApplication] = Promise()
private[utils] var state: SparkApp.State = SparkApp.State.STARTING
private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
@@ -262,6 +277,8 @@ class SparkKubernetesApp private[utils] (
private var kubernetesTagToAppIdFailedTimes: Int = _
private var kubernetesAppMonitorFailedTimes: Int = _
+ private var namespace: String = extrasMap(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY)
+ appNamespaces.add(namespace)
private def failToMonitor(): Unit = {
changeState(SparkApp.State.FAILED)
process.foreach(_.destroy())
@@ -285,18 +302,24 @@ class SparkKubernetesApp private[utils] (
private def monitorSparkKubernetesApp(): Unit = {
try {
- if (killed) {
+ // Skip the body if the app already reached a terminal state.
+ // Falling through would re-fire appIdKnown.
+ if (killed.get()) {
changeState(SparkApp.State.KILLED)
- } else if (isProcessErrExit) {
+ return
+ }
+ if (isProcessErrExit) {
changeState(SparkApp.State.FAILED)
+ return
}
// Get KubernetesApplication by appTag.
val appOption: Option[KubernetesApplication] = try {
- getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
+ getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow, namespace)
} catch {
case e: Exception =>
failToGetAppId()
- appPromise.failure(e)
+ error(s"Exception getting app from tag $appTag in namespace $namespace with message: ", e)
+ appPromise.tryFailure(e)
return
}
if (appOption.isEmpty) {
@@ -311,7 +334,7 @@ class SparkKubernetesApp private[utils] (
listener.foreach(_.appIdKnown(appId))
if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) {
- withRetry(kubernetesClient.createSparkUIIngress(app, livyConf))
+ withRetry(kubernetesClient.inNamespace(namespace).createSparkUIIngress(app, livyConf))
}
var appInfo = AppInfo()
@@ -326,7 +349,7 @@ class SparkKubernetesApp private[utils] (
debug(s"getApplicationReport, applicationId: ${app.getApplicationId}, " +
s"namespace: ${app.getApplicationNamespace} " +
s"applicationTag: ${app.getApplicationTag}")
- val report = kubernetesClient.getApplicationReport(livyConf, app,
+ val report = kubernetesClient.inNamespace(namespace).getApplicationReport(livyConf, app,
cacheLogSize = cacheLogSize)
report
}
@@ -383,7 +406,11 @@ class SparkKubernetesApp private[utils] (
("\nKubernetes Diagnostics: " +: kubernetesDiagnostics)
override def kill(): Unit = synchronized {
- killed = true
+ killed.set(true)
+ // Detach from the shared monitor queue so a polling tick cannot fall
+ // through to appIdKnown and resurrect recovery state after the owning
+ // session has been deleted.
+ appQueue.remove(this)
if (!isRunning) {
return
@@ -399,7 +426,7 @@ class SparkKubernetesApp private[utils] (
def kubernetesApplication: KubernetesApplication = applicationDetails.get.get
if (kubernetesApplication != null && kubernetesApplication.getApplicationId != null) {
try {
- withRetry(kubernetesClient.killApplication(
+ withRetry(kubernetesClient.inNamespace(namespace).killApplication(
Await.result(appPromise.future, appLookupTimeout)))
} catch {
// We cannot kill the Kubernetes app without the appTag.
@@ -440,10 +467,11 @@ class SparkKubernetesApp private[utils] (
private def getAppFromTag(
appTag: String,
pollInterval: duration.Duration,
- deadline: Deadline): Option[KubernetesApplication] = {
+ deadline: Deadline,
+ namespace: String): Option[KubernetesApplication] = {
import KubernetesExtensions._
-
- withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag)))
+ withRetry(kubernetesClient.inNamespace(namespace).getApplications()
+ .find(_.getApplicationTag.contains(appTag)))
match {
case Some(app) => Some(app)
case None =>
@@ -686,15 +714,14 @@ private[utils] object KubernetesExtensions {
appTagLabel: String = SPARK_APP_TAG_LABEL,
appIdLabel: String = SPARK_APP_ID_LABEL
): Seq[KubernetesApplication] = {
- client.pods.inAnyNamespace
- .withLabels(labels.asJava)
+ client.pods.withLabels(labels.asJava)
.withLabel(appTagLabel)
.withLabel(appIdLabel)
.list.getItems.asScala.map(new KubernetesApplication(_))
}
def killApplication(app: KubernetesApplication): Boolean = {
- client.pods.inAnyNamespace.delete(app.getApplicationPod)
+ client.pods.inNamespace(app.getApplicationNamespace).delete(app.getApplicationPod)
}
def getApplicationReport(
@@ -703,12 +730,27 @@ private[utils] object KubernetesExtensions {
cacheLogSize: Int,
appTagLabel: String = SPARK_APP_TAG_LABEL
): KubernetesAppReport = {
- val pods = client.pods.inNamespace(app.getApplicationNamespace)
- .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava)
- .list.getItems.asScala
- val driver = pods.find(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER)
- val executors =
- pods.filter(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR)
+ // Narrow the LIST to the driver pod; application state does not depend on executors.
+ val driver = client.pods.inNamespace(app.getApplicationNamespace)
+ .withLabels(Map(
+ appTagLabel -> app.getApplicationTag,
+ SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER
+ ).asJava)
+ .list.getItems.asScala.headOption
+
+ // Executors are used only for log URLs and per-executor diagnostics; skip when disabled.
+ val executors: Seq[Pod] =
+ if (livyConf.getBoolean(LivyConf.KUBERNETES_EXECUTOR_TRACKING_ENABLED)) {
+ client.pods.inNamespace(app.getApplicationNamespace)
+ .withLabels(Map(
+ appTagLabel -> app.getApplicationTag,
+ SPARK_ROLE_LABEL -> SPARK_ROLE_EXECUTOR
+ ).asJava)
+ .list.getItems.asScala
+ } else {
+ Seq.empty
+ }
+
val appLog = Try(
client.pods.inNamespace(app.getApplicationNamespace)
.withName(app.getApplicationPod.getMetadata.getName)
diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
index 401a8beb1..f143734c7 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
@@ -138,12 +138,24 @@ class BatchSessionSpec
}) should be (true)
}
+ it("should call kill on the app on stopSession") {
+ val conf = new LivyConf()
+ val mockApp = mock[SparkApp]
+ val m = BatchRecoveryMetadata(99, None, None, "appTag", null, None, "")
+ val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp))
+ batch.start()
+
+ batch.stopSession()
+
+ verify(mockApp).kill()
+ }
+
def testRecoverSession(name: Option[String]): Unit = {
val conf = new LivyConf()
val req = new CreateBatchRequest()
val name = Some("Test Batch Session")
val mockApp = mock[SparkApp]
- val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None)
+ val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None, "")
val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp))
batch.state shouldBe (SessionState.Recovering)
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index e7d651f89..2b5ccf72a 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -279,7 +279,7 @@ class InteractiveSessionSpec extends FunSpec
val m = InteractiveRecoveryMetadata(
78, Some("Test session"), None, "appTag", Spark, 0, null, None, None, None,
None, None, None, Map.empty[String, String], List.empty[String], List.empty[String],
- List.empty[String], None, List.empty[String], None, None, Some(URI.create("")))
+ List.empty[String], None, List.empty[String], None, None, Some(URI.create("")), "")
val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
s.start()
@@ -298,7 +298,7 @@ class InteractiveSessionSpec extends FunSpec
val m = InteractiveRecoveryMetadata(
78, None, None, "appTag", Spark, 0, null, None, None, None,
None, None, None, Map.empty[String, String], List.empty[String], List.empty[String],
- List.empty[String], None, List.empty[String], None, None, Some(URI.create("")))
+ List.empty[String], None, List.empty[String], None, None, Some(URI.create("")), "")
val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
s.start()
@@ -315,7 +315,7 @@ class InteractiveSessionSpec extends FunSpec
val m = InteractiveRecoveryMetadata(
78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None,
None, None, None, Map.empty[String, String], List.empty[String], List.empty[String],
- List.empty[String], None, List.empty[String], None, None, None)
+ List.empty[String], None, List.empty[String], None, None, None, "")
val s = InteractiveSession.recover(m, conf, sessionStore, None)
s.start()
s.state shouldBe a[SessionState.Dead]
diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
index 363b01f89..21798b48e 100644
--- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
@@ -209,13 +209,23 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
when(session.lastActivity).thenReturn(System.nanoTime())
when(session.state).thenReturn(state)
}
+
+ it("should be idempotent on register() for a session id already tracked") {
+ val (livyConf, manager) = createSessionManager()
+ val session = new MockSession(manager.nextId(), null, livyConf, Some("foo"))
+ val first = manager.register(session)
+ val again = manager.register(session)
+ again should be theSameInstanceAs first
+ manager.size() should be (1)
+ session.stopped should be (false)
+ }
}
describe("BatchSessionManager") {
implicit def executor: ExecutionContext = ExecutionContext.global
def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = {
- BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None)
+ BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None, "")
}
def mockSession(id: Int): BatchSession = {
@@ -305,5 +315,94 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
verify(session, never).stop()
}
+
+ it("refresh should be a no-op when state store has no new sessions") {
+ val conf = new LivyConf()
+ val sessionStore = mock[SessionStore]
+ when(sessionStore.getNextSessionId("batch")).thenReturn(0)
+ when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch"))
+ .thenReturn(Seq.empty)
+
+ val sm = new BatchSessionManager(conf, sessionStore)
+ val result = sm.refresh()
+ result.added shouldBe 0
+ result.total shouldBe 0
+ result.failed shouldBe 0
+ }
+
+ it("refresh should import only previously-unseen sessions and skip already-known ids") {
+ val conf = new LivyConf()
+ conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster")
+
+ val sessionStore = mock[SessionStore]
+ // Initial recovery: only id=0.
+ when(sessionStore.getNextSessionId("batch")).thenReturn(1)
+ when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch"))
+ .thenReturn(Seq(Try(makeMetadata(0, "t0"))))
+
+ val sm = new BatchSessionManager(conf, sessionStore)
+ sm.size() shouldBe 1
+ sm.get(0) shouldBe defined
+
+ // After a peer writes id=1 to the state store, refresh should add it without
+ // re-registering id=0.
+ when(sessionStore.getNextSessionId("batch")).thenReturn(2)
+ when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch"))
+ .thenReturn(Seq(Try(makeMetadata(0, "t0")), Try(makeMetadata(1, "t1"))))
+
+ val r1 = sm.refresh()
+ r1.added shouldBe 1
+ r1.total shouldBe 2
+ sm.get(1) shouldBe defined
+
+ // A second refresh with no new entries should add nothing.
+ val r2 = sm.refresh()
+ r2.added shouldBe 0
+ r2.total shouldBe 2
+ }
+
+ it("refresh should advance the id counter forward but never backward") {
+ val conf = new LivyConf()
+ val sessionStore = mock[SessionStore]
+ when(sessionStore.getNextSessionId("batch")).thenReturn(0)
+ when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch"))
+ .thenReturn(Seq.empty)
+
+ val sm = new BatchSessionManager(conf, sessionStore)
+
+ // Local counter ahead of state store.
+ sm.nextId() shouldBe 0
+ sm.nextId() shouldBe 1
+ sm.nextId() shouldBe 2
+
+ // State store reports a smaller next id; refresh must NOT regress local counter.
+ when(sessionStore.getNextSessionId("batch")).thenReturn(1)
+ sm.refresh()
+ sm.nextId() shouldBe 3
+
+ // State store reports a larger next id; refresh SHOULD jump local counter forward.
+ when(sessionStore.getNextSessionId("batch")).thenReturn(99)
+ sm.refresh()
+ sm.nextId() shouldBe 99
+ }
+
+ it("refresh should report deserialization failures via failed count") {
+ val conf = new LivyConf()
+ conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster")
+
+ val sessionStore = mock[SessionStore]
+ when(sessionStore.getNextSessionId("batch")).thenReturn(1)
+ when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch"))
+ .thenReturn(Seq(
+ Try(makeMetadata(0, "t0")),
+ Failure(new java.io.IOException("corrupted entry id=42")),
+ Failure(new java.io.IOException("corrupted entry id=43"))))
+
+ val sm = new BatchSessionManager(conf, sessionStore)
+ val r = sm.refresh()
+ r.added shouldBe 0
+ r.total shouldBe 1
+ r.failed shouldBe 2
+ }
}
}
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala
new file mode 100644
index 000000000..fc668b558
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.utils
+
+import java.io.{File, PrintWriter}
+import java.nio.file.Files
+
+import org.scalatest.{BeforeAndAfterAll, FunSpec}
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class SparkAppSpec extends FunSpec with LivyBaseUnitTestSuite with BeforeAndAfterAll {
+
+ private def k8sConf(sparkHome: Option[String] = None): LivyConf = {
+ val conf = new LivyConf(false)
+ conf.set(LivyConf.LIVY_SPARK_MASTER, "k8s://https://kubernetes.default.svc:443")
+ sparkHome.foreach(conf.set(LivyConf.SPARK_HOME, _))
+ conf
+ }
+
+ /** Create a throwaway SPARK_HOME with the given spark-defaults.conf contents (or none). */
+ private def withSparkHome(defaultsContent: Option[String])(f: String => Unit): Unit = {
+ val sparkHome = Files.createTempDirectory("livy-spark-home").toFile
+ try {
+ val confDir = new File(sparkHome, "conf")
+ assert(confDir.mkdirs())
+ defaultsContent.foreach { content =>
+ val writer = new PrintWriter(new File(confDir, "spark-defaults.conf"))
+ try writer.write(content) finally writer.close()
+ }
+ f(sparkHome.getAbsolutePath)
+ } finally {
+ deleteRecursively(sparkHome)
+ }
+ }
+
+ private def deleteRecursively(file: File): Unit = {
+ if (file.isDirectory) {
+ Option(file.listFiles()).foreach(_.foreach(deleteRecursively))
+ }
+ file.delete()
+ }
+
+ describe("SparkApp.getNamespace") {
+
+ it("should return an empty namespace when not running on Kubernetes") {
+ val conf = new LivyConf(false)
+ conf.set(LivyConf.LIVY_SPARK_MASTER, "yarn")
+ // A namespace in the conf must be ignored for non-Kubernetes masters.
+ val sparkConf = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "ignored")
+ assert(SparkApp.getNamespace(sparkConf, conf) === "")
+ }
+
+ it("should prefer the namespace from the session Spark conf") {
+ val sparkConf = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "team-a")
+ assert(SparkApp.getNamespace(sparkConf, k8sConf()) === "team-a")
+ }
+
+ it("should fall back to spark-defaults.conf when the conf has no namespace") {
+ withSparkHome(Some(s"${SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY} team-b\n")) { sparkHome =>
+ assert(SparkApp.getNamespace(Map.empty, k8sConf(Some(sparkHome))) === "team-b")
+ }
+ }
+
+ it("should fall back to the default namespace when spark-defaults.conf is absent") {
+ withSparkHome(None) { sparkHome =>
+ assert(SparkApp.getNamespace(Map.empty, k8sConf(Some(sparkHome))) ===
+ SparkApp.DEFAULT_KUBERNETES_NAMESPACE)
+ }
+ }
+
+ it("should fall back to the default namespace when spark-defaults.conf lacks the key") {
+ withSparkHome(Some("spark.executor.memory 1g\n")) { sparkHome =>
+ assert(SparkApp.getNamespace(Map.empty, k8sConf(Some(sparkHome))) ===
+ SparkApp.DEFAULT_KUBERNETES_NAMESPACE)
+ }
+ }
+
+ it("should fall back to the default namespace when SPARK_HOME is not set") {
+ // No SPARK_HOME on the conf; getNamespace must not throw and must use the default.
+ assert(SparkApp.getNamespace(Map.empty, k8sConf()) ===
+ SparkApp.DEFAULT_KUBERNETES_NAMESPACE)
+ }
+
+ it("should treat an empty namespace in the conf as unset") {
+ withSparkHome(Some(s"${SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY} team-c\n")) { sparkHome =>
+ val sparkConf = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "")
+ assert(SparkApp.getNamespace(sparkConf, k8sConf(Some(sparkHome))) === "team-c")
+ }
+ }
+ }
+}
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
index 24e66e0d9..a353c380c 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
@@ -18,6 +18,8 @@ package org.apache.livy.utils
import java.util.Objects._
+import scala.concurrent.Promise
+
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule, IngressSpec}
import io.fabric8.kubernetes.client.KubernetesClient
@@ -121,6 +123,32 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef
.getExecutorsLogUrls.isEmpty)
}
+ it("should return diagnostics without executor entries when executors is empty") {
+ // When livy.server.kubernetes.executor-tracking.enabled=false, the
+ // executor LIST in getApplicationReport is skipped and executors is
+ // passed in as Seq.empty. Diagnostics must still render the driver
+ // section without error.
+ val driverStatus = when(mock[PodStatus].getPhase).thenReturn("Running")
+ .getMock[PodStatus]
+ val driverMeta = when(mock[ObjectMeta].getName).thenReturn("driver-pod")
+ .getMock[ObjectMeta]
+ when(driverMeta.getNamespace).thenReturn("ns")
+ when(driverMeta.getLabels).thenReturn(Map.empty[String, String].asJava)
+ val driverSpec = when(mock[PodSpec].getNodeName).thenReturn("node-1")
+ .getMock[PodSpec]
+ when(driverSpec.getContainers).thenReturn(java.util.Collections.emptyList[Container])
+ when(driverStatus.getConditions).thenReturn(java.util.Collections.emptyList[PodCondition])
+ val driver = when(mock[Pod].getStatus).thenReturn(driverStatus).getMock[Pod]
+ when(driver.getMetadata).thenReturn(driverMeta)
+ when(driver.getSpec).thenReturn(driverSpec)
+
+ val diagnostics = KubernetesAppReport(
+ Some(driver), Seq.empty, IndexedSeq.empty, None, new LivyConf(false)
+ ).getApplicationDiagnostics
+ assert(diagnostics.exists(_.contains("driver-pod")))
+ assert(!diagnostics.exists(_.contains("executor")))
+ }
+
it("should return driver ingress url") {
def livyConf(protocol: Option[String]): LivyConf = {
@@ -181,6 +209,50 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef
}
+ describe("SparkKubernetesApp.appPromise") {
+ it("tryFailure must be a no-op after trySuccess (no IllegalStateException)") {
+ val p = Promise[KubernetesApplication]()
+ val app = mock[KubernetesApplication]
+ assert(p.trySuccess(app))
+ assert(!p.tryFailure(new IllegalStateException("simulated k8s API error")))
+ assert(p.future.value.exists(_.isSuccess))
+ }
+
+ it("trySuccess must be a no-op after tryFailure") {
+ val p = Promise[KubernetesApplication]()
+ assert(p.tryFailure(new IllegalStateException("simulated k8s API error")))
+ val app = mock[KubernetesApplication]
+ assert(!p.trySuccess(app))
+ assert(p.future.value.exists(_.isFailure))
+ }
+ }
+
+ describe("SparkKubernetesApp.kill") {
+ it("should remove the app from the monitor queue and be idempotent") {
+ // Drain anything left over from prior tests so we can assert exact size.
+ SparkKubernetesApp.clearApps
+ val sizeBefore = SparkKubernetesApp.getAppSize
+ val app = new SparkKubernetesApp(
+ "test-kill-tag",
+ None,
+ None,
+ None,
+ new LivyConf(false),
+ Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "ns"),
+ SparkKubernetesApp.kubernetesClient)
+
+ // Constructor enqueues itself.
+ assert(SparkKubernetesApp.getAppSize === sizeBefore + 1)
+
+ app.kill()
+ assert(SparkKubernetesApp.getAppSize === sizeBefore)
+
+ // Idempotent: a second call must be a no-op for the queue.
+ app.kill()
+ assert(SparkKubernetesApp.getAppSize === sizeBefore)
+ }
+ }
+
describe("KubernetesClientFactory") {
it("should build KubernetesApi url from LivyConf masterUrl") {
def actual(sparkMaster: String): String =
@@ -208,6 +280,13 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef
KubernetesClientFactory.createKubernetesClient(conf)
}
}
+
+ it("should enable executor tracking by default") {
+ // Preserve existing behavior: operators must opt in to skipping the
+ // executor LIST. This guards against an accidental default flip that
+ // would silently drop executor entries from session diagnostics.
+ assert(new LivyConf(false).getBoolean(LivyConf.KUBERNETES_EXECUTOR_TRACKING_ENABLED))
+ }
}
describe("KubernetesClientExtensions") {