diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 0299dca7f..04610c235 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -267,6 +267,10 @@ # details, routes, etc...) # livy.server.kubernetes.poll-interval = 15s +# Whether Livy fetches executor pods each poll cycle (used for executor log URLs and +# per-executor diagnostics). Disable on large clusters; driver pod is polled regardless. +# livy.server.kubernetes.executor-tracking.enabled = true + # Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, configure the desired # options below # livy.server.kubernetes.ingress.create = false diff --git a/pom.xml b/pom.xml index 218b43153..c52be2c36 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 1.7.36 3.3.4 ${spark.scala-2.12.version} - 5.6.0 + 6.8.1 3.0.0 1.15 3.17.0 @@ -1399,6 +1399,13 @@ + + hadoop3 + + 3 + 3.4.0 + + hadoop2 @@ -1432,7 +1439,7 @@ 1.8 0.10.9.7 3.7.0-M11 - 4.1.96.Final + 4.1.108.Final 2.15.2 2.15.2 spark-${spark.version}-bin-hadoop${hadoop.major-minor.version} diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index 8c4494901..a68b86ea5 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -327,13 +327,18 @@ void dispose() { //Note. Your compiler or IDE may identify this method as unused //tests fail without it public void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) { - InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); - String ip = insocket.getAddress().getHostAddress(); - ContextInfo info = new ContextInfo(ip, msg.port, clientId, secret); + String driverHost; + if (conf.getBoolean(DRIVER_ADDRESS_USE_HOSTNAME)) { + driverHost = msg.host; + } else { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + driverHost = insocket.getAddress().getHostAddress(); + } + ContextInfo info = new ContextInfo(driverHost, msg.port, clientId, secret); if (promise.trySuccess(info)) { timeout.cancel(true); LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(), - msg.host, msg.port); + driverHost, msg.port); } else { LOG.warn("Connection established but promise is already finalized."); } diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 933948fa3..ea3974ac3 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -58,6 +58,9 @@ public enum Entry implements ConfEntry { // How long will the RSC wait for a connection for a Livy server before shutting itself down. SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"), + // Use driver's reported hostname instead of socket IP. Useful for Kubernetes + Istio mode. + DRIVER_ADDRESS_USE_HOSTNAME("driver.address.use-hostname", false), + PROXY_USER("proxy-user", null), RPC_SERVER_ADDRESS("rpc.server.address", null), diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 006ab5dff..12e16d169 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -304,6 +304,11 @@ object LivyConf { // How often Livy polls Kubernetes to refresh Kubernetes app state. val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s") + // Whether Livy fetches executor pods each poll cycle (used for executor log URLs + // and per-executor diagnostics). Driver pod is polled regardless. + val KUBERNETES_EXECUTOR_TRACKING_ENABLED = + Entry("livy.server.kubernetes.executor-tracking.enabled", true) + // How long to check livy session leakage. val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT = Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s") diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 7b6f323e0..931fb33e7 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -41,7 +41,7 @@ import org.apache.livy.server.batch.BatchSessionServlet import org.apache.livy.server.interactive.InteractiveSessionServlet import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManager} import org.apache.livy.server.ui.UIServlet -import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager} +import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager, SessionManager} import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp} import org.apache.livy.utils.LivySparkUtils._ @@ -176,6 +176,39 @@ class LivyServer extends Logging { } } + // Operator endpoint: re-scan the recovery state store and import any sessions + // written by another Livy server. Guarded by livy.superusers. + val recoveryServlet = new JsonServlet { + before() { + contentType = "application/json" + val user = request.getRemoteUser() + if (!accessManager.checkSuperUser(user)) { + halt(403, Map("msg" -> s"User '$user' not authorized for recovery endpoints.")) + } + } + + private def resultMap(r: SessionManager.RefreshResult): Map[String, Int] = + Map("added" -> r.added, "total" -> r.total, "failed" -> r.failed) + + post("/sessions/refresh") { + info(s"Interactive session refresh triggered by user='${request.getRemoteUser()}'") + resultMap(interactiveSessionManager.refresh()) + } + + post("/batches/refresh") { + info(s"Batch session refresh triggered by user='${request.getRemoteUser()}'") + resultMap(batchSessionManager.refresh()) + } + + post("/refresh") { + info(s"Full session refresh triggered by user='${request.getRemoteUser()}'") + Map( + "batches" -> resultMap(batchSessionManager.refresh()), + "sessions" -> resultMap(interactiveSessionManager.refresh()) + ) + } + } + // Servlet for hosting static files such as html, css, and js // Necessary since Jetty cannot set it's resource base inside a jar // Returns 404 if the file does not exist @@ -254,6 +287,8 @@ class LivyServer extends Logging { context.mountMetricsAdminServlet("/metrics") mount(context, livyVersionServlet, "/version/*") + + mount(context, recoveryServlet, "/recovery/*") } catch { case e: Throwable => error("Exception thrown when initializing server", e) diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 8b64a0398..2d4bd4a5b 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -40,6 +40,7 @@ case class BatchRecoveryMetadata( appTag: String, owner: String, proxyUser: Option[String], + namespace: String, version: Int = 1) extends RecoveryMetadata @@ -64,7 +65,7 @@ object BatchSession extends Logging { mockApp: Option[SparkApp] = None): BatchSession = { val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase() val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner) - + val namespace = SparkApp.getNamespace(request.conf, livyConf) def createSparkApp(s: BatchSession): SparkApp = { val conf = SparkApp.prepareSparkConf( appTag, @@ -106,7 +107,8 @@ object BatchSession extends Logging { childProcesses.decrementAndGet() } } - SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s)) + val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace) + SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s), extrasMap) } info(s"Creating batch session $id: [owner: $owner, request: $request]") @@ -120,6 +122,7 @@ object BatchSession extends Logging { owner, impersonatedUser, sessionStore, + namespace, mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp)) } @@ -137,8 +140,10 @@ object BatchSession extends Logging { m.owner, m.proxyUser, sessionStore, + m.namespace, mockApp.map { m => (_: BatchSession) => m }.getOrElse { s => - SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) + val extrasMap = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> m.namespace) + SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s), extrasMap) }) } } @@ -152,6 +157,7 @@ class BatchSession( owner: String, override val proxyUser: Option[String], sessionStore: SessionStore, + namespace: String, sparkApp: BatchSession => SparkApp) extends Session(id, name, owner, livyConf) with SparkAppListener { import BatchSession._ @@ -204,5 +210,5 @@ class BatchSession( override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo } override def recoveryMetadata: RecoveryMetadata = - BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser) + BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, namespace) } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 0667b718c..978951f0d 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -70,6 +70,7 @@ case class InteractiveRecoveryMetadata( // proxyUser is deprecated. It is available here only for backward compatibility proxyUser: Option[String], rscDriverUri: Option[URI], + namespace: String, version: Int = 1) extends RecoveryMetadata @@ -93,6 +94,7 @@ object InteractiveSession extends Logging { mockClient: Option[RSCClient] = None): InteractiveSession = { val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase() val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner) + val namespace = SparkApp.getNamespace(request.conf, livyConf) val client = mockClient.orElse { val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf( @@ -153,6 +155,7 @@ object InteractiveSession extends Logging { request.numExecutors, request.pyFiles, request.queue, + namespace, mockApp) } @@ -193,6 +196,7 @@ object InteractiveSession extends Logging { metadata.numExecutors, metadata.pyFiles, metadata.queue, + metadata.namespace, mockApp) } @@ -432,6 +436,7 @@ class InteractiveSession( val numExecutors: Option[Int], val pyFiles: List[String], val queue: Option[String], + val namespace: String, mockApp: Option[SparkApp]) // For unit test. extends Session(id, name, owner, ttl, idleTimeout, livyConf) with SessionHeartbeat @@ -461,11 +466,14 @@ class InteractiveSession( app = mockApp.orElse { val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) + val extrasMap = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace) if (!livyConf.isRunningOnKubernetes()) { - driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + driverProcess.map { _ => + SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap) + } } else { // Create SparkApp for Kubernetes anyway - Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap)) } } @@ -534,7 +542,7 @@ class InteractiveSession( heartbeatTimeout.toSeconds.toInt, owner, ttl, idleTimeout, driverMemory, driverCores, executorMemory, executorCores, conf, archives, files, jars, numExecutors, pyFiles, queue, - proxyUser, rscDriverUri) + proxyUser, rscDriverUri, namespace) override def state: SessionState = { if (serverSideState == SessionState.Running) { diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala index 1dc1d820e..cb41e8c85 100644 --- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala @@ -36,6 +36,9 @@ import org.apache.livy.sessions.Session.RecoveryMetadata object SessionManager { val SESSION_RECOVERY_MODE_OFF = "off" val SESSION_RECOVERY_MODE_RECOVERY = "recovery" + + /** Outcome of a [[SessionManager.refresh]] call. */ + case class RefreshResult(added: Int, total: Int, failed: Int) } class BatchSessionManager( @@ -100,8 +103,14 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( } def register(session: S): S = { - info(s"Registering new session ${session.id}") synchronized { + sessions.get(session.id) match { + case Some(existing) => + debug(s"Session ${session.id} already registered; skipping duplicate registration.") + return existing + case None => + } + info(s"Registering new session ${session.id}") session.name.foreach { sessionName => if (sessionsByName.contains(sessionName)) { val errMsg = s"Duplicate session name: ${session.name} for session ${session.id}" @@ -229,6 +238,38 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( recoveredSessions } + /** + * Re-scan the state store and import sessions written by another Livy server. + * Add-only: in-memory sessions are not removed even if their state-store entry is + * gone. The id counter is advanced forward only. + */ + def refresh(): RefreshResult = { + // Read the state store outside the SessionManager monitor so we don't block the + // garbage collector and heartbeat watchdog while doing N small EFS / ZK reads. + val storeNextId = sessionStore.getNextSessionId(sessionType) + val sessionMetadata = sessionStore.getAllSessions[R](sessionType) + + val recoveryFailure = sessionMetadata.filter(_.isFailure).map(_.failed.get) + recoveryFailure.foreach(ex => warn(s"Refresh failure for $sessionType: ${ex.getMessage}", ex)) + + synchronized { + if (storeNextId > idCounter.get) { + idCounter.set(storeNextId) + } + + val before = sessions.size + sessionMetadata.flatMap(_.toOption) + .filterNot(m => sessions.contains(m.id)) + .map(sessionRecovery) + .foreach(register) + + val added = sessions.size - before + info(s"Refreshed $sessionType sessions: added=$added, total=${sessions.size}," + + s" failed=${recoveryFailure.size}, next session id=$idCounter") + RefreshResult(added, sessions.size, recoveryFailure.size) + } + } + private class GarbageCollector extends Thread("session gc thread") { setDaemon(true) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index e424f80fc..4c7e96bf3 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -17,6 +17,9 @@ package org.apache.livy.utils +import java.io.{File, FileInputStream} +import java.util.Properties + import scala.collection.JavaConverters._ import org.apache.livy.LivyConf @@ -56,12 +59,52 @@ trait SparkAppListener { */ object SparkApp { private val SPARK_YARN_TAG_KEY = "spark.yarn.tags" - + val SPARK_KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace" object State extends Enumeration { val STARTING, RUNNING, FINISHED, FAILED, KILLED = Value } type State = State.Value + val DEFAULT_KUBERNETES_NAMESPACE = "default" + + /** + * Resolve the Kubernetes namespace a Spark application should run in. + * + * The namespace is looked up, in order of precedence, from: + * 1. the session's Spark configuration ([[SPARK_KUBERNETES_NAMESPACE_KEY]]), + * 2. `$SPARK_HOME/conf/spark-defaults.conf` (if present), + * 3. the [[DEFAULT_KUBERNETES_NAMESPACE]] fallback. + * + * The namespace is only meaningful on Kubernetes, so for any other cluster + * manager (YARN, local) an empty string is returned without touching the + * filesystem. + */ + def getNamespace(conf: Map[String, String], livyConf: LivyConf): String = { + if (!livyConf.isRunningOnKubernetes()) { + return "" + } + conf.get(SPARK_KUBERNETES_NAMESPACE_KEY).filter(_.nonEmpty).getOrElse { + namespaceFromSparkDefaults(livyConf).getOrElse(DEFAULT_KUBERNETES_NAMESPACE) + } + } + + private def namespaceFromSparkDefaults(livyConf: LivyConf): Option[String] = { + livyConf.sparkHome().flatMap { sparkHome => + val sparkDefaults = new File(sparkHome, s"conf${File.separator}spark-defaults.conf") + if (!sparkDefaults.isFile) { + None + } else { + val in = new FileInputStream(sparkDefaults) + try { + val properties = new Properties() + properties.load(in) + Option(properties.getProperty(SPARK_KUBERNETES_NAMESPACE_KEY)).filter(_.nonEmpty) + } finally { + in.close() + } + } + } + } /** * Return cluster manager dependent SparkConf. * @@ -102,11 +145,12 @@ object SparkApp { appId: Option[String], process: Option[LineBufferedProcess], livyConf: LivyConf, - listener: Option[SparkAppListener]): SparkApp = { + listener: Option[SparkAppListener], + extrasMap: Map[String, String]): SparkApp = { if (livyConf.isRunningOnYarn()) { new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf) } else if (livyConf.isRunningOnKubernetes()) { - new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf) + new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf, extrasMap) } else { require(process.isDefined, "process must not be None when Livy master is not YARN or" + "Kubernetes.") diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 22ab6b8f8..d1debbb15 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -19,8 +19,11 @@ package org.apache.livy.utils import java.net.URLEncoder import java.util.Collections import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.tailrec +import scala.collection.JavaConverters.asScalaSetConverter +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent._ import scala.concurrent.duration._ @@ -52,14 +55,17 @@ object SparkKubernetesApp extends Logging { val iter = leakedAppTags.entrySet().iterator() var isRemoved = false val now = System.currentTimeMillis() - val apps = withRetry(kubernetesClient.getApplications()) + val apps = appNamespaces.flatMap { namespace => + withRetry(kubernetesClient.inNamespace(namespace).getApplications()) + } while (iter.hasNext) { val entry = iter.next() apps.find(_.getApplicationTag.contains(entry.getKey)) .foreach({ app => info(s"Kill leaked app ${app.getApplicationId}") - withRetry(kubernetesClient.killApplication(app)) + withRetry(kubernetesClient.inNamespace(app.getApplicationNamespace) + .killApplication(app)) iter.remove() isRemoved = true }) @@ -138,6 +144,8 @@ object SparkKubernetesApp extends Logging { private var sessionLeakageCheckInterval: Long = _ var kubernetesClient: DefaultKubernetesClient = _ + var appNamespaces: mutable.Set[String] = + ConcurrentHashMap.newKeySet[String]().asScala private var appLookupThreadPoolSize: Long = _ private var appLookupMaxFailedTimes: Long = _ @@ -146,8 +154,7 @@ object SparkKubernetesApp extends Logging { this.livyConf = livyConf // KubernetesClient is thread safe. Create once, share it across threads. - kubernetesClient = - KubernetesClientFactory.createKubernetesClient(livyConf) + kubernetesClient = KubernetesClientFactory.createKubernetesClient(livyConf) cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds @@ -160,6 +167,11 @@ object SparkKubernetesApp extends Logging { livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL) sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT) + if (!livyConf.getBoolean(LivyConf.KUBERNETES_EXECUTOR_TRACKING_ENABLED)) { + info("Kubernetes executor tracking is disabled. Per-executor log URLs and " + + "per-executor entries in session diagnostics will be omitted.") + } + leakedAppsGCThread.setDaemon(true) leakedAppsGCThread.setName("LeakedAppsGCThread") leakedAppsGCThread.start() @@ -245,7 +257,9 @@ class SparkKubernetesApp private[utils] ( process: Option[LineBufferedProcess], listener: Option[SparkAppListener], livyConf: LivyConf, - kubernetesClient: => KubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit test. + extrasMap: Map[String, String], + // For unit test. + kubernetesClient: => DefaultKubernetesClient = SparkKubernetesApp.kubernetesClient) extends SparkApp with Logging { @@ -253,7 +267,8 @@ class SparkKubernetesApp private[utils] ( import SparkKubernetesApp._ appQueue.add(this) - private var killed = false + // Atomic so the monitor thread sees the write from kill() without locking. + private val killed = new AtomicBoolean(false) private val appPromise: Promise[KubernetesApplication] = Promise() private[utils] var state: SparkApp.State = SparkApp.State.STARTING private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] @@ -262,6 +277,8 @@ class SparkKubernetesApp private[utils] ( private var kubernetesTagToAppIdFailedTimes: Int = _ private var kubernetesAppMonitorFailedTimes: Int = _ + private var namespace: String = extrasMap(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY) + appNamespaces.add(namespace) private def failToMonitor(): Unit = { changeState(SparkApp.State.FAILED) process.foreach(_.destroy()) @@ -285,18 +302,24 @@ class SparkKubernetesApp private[utils] ( private def monitorSparkKubernetesApp(): Unit = { try { - if (killed) { + // Skip the body if the app already reached a terminal state. + // Falling through would re-fire appIdKnown. + if (killed.get()) { changeState(SparkApp.State.KILLED) - } else if (isProcessErrExit) { + return + } + if (isProcessErrExit) { changeState(SparkApp.State.FAILED) + return } // Get KubernetesApplication by appTag. val appOption: Option[KubernetesApplication] = try { - getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow) + getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow, namespace) } catch { case e: Exception => failToGetAppId() - appPromise.failure(e) + error(s"Exception getting app from tag $appTag in namespace $namespace with message: ", e) + appPromise.tryFailure(e) return } if (appOption.isEmpty) { @@ -311,7 +334,7 @@ class SparkKubernetesApp private[utils] ( listener.foreach(_.appIdKnown(appId)) if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) { - withRetry(kubernetesClient.createSparkUIIngress(app, livyConf)) + withRetry(kubernetesClient.inNamespace(namespace).createSparkUIIngress(app, livyConf)) } var appInfo = AppInfo() @@ -326,7 +349,7 @@ class SparkKubernetesApp private[utils] ( debug(s"getApplicationReport, applicationId: ${app.getApplicationId}, " + s"namespace: ${app.getApplicationNamespace} " + s"applicationTag: ${app.getApplicationTag}") - val report = kubernetesClient.getApplicationReport(livyConf, app, + val report = kubernetesClient.inNamespace(namespace).getApplicationReport(livyConf, app, cacheLogSize = cacheLogSize) report } @@ -383,7 +406,11 @@ class SparkKubernetesApp private[utils] ( ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics) override def kill(): Unit = synchronized { - killed = true + killed.set(true) + // Detach from the shared monitor queue so a polling tick cannot fall + // through to appIdKnown and resurrect recovery state after the owning + // session has been deleted. + appQueue.remove(this) if (!isRunning) { return @@ -399,7 +426,7 @@ class SparkKubernetesApp private[utils] ( def kubernetesApplication: KubernetesApplication = applicationDetails.get.get if (kubernetesApplication != null && kubernetesApplication.getApplicationId != null) { try { - withRetry(kubernetesClient.killApplication( + withRetry(kubernetesClient.inNamespace(namespace).killApplication( Await.result(appPromise.future, appLookupTimeout))) } catch { // We cannot kill the Kubernetes app without the appTag. @@ -440,10 +467,11 @@ class SparkKubernetesApp private[utils] ( private def getAppFromTag( appTag: String, pollInterval: duration.Duration, - deadline: Deadline): Option[KubernetesApplication] = { + deadline: Deadline, + namespace: String): Option[KubernetesApplication] = { import KubernetesExtensions._ - - withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))) + withRetry(kubernetesClient.inNamespace(namespace).getApplications() + .find(_.getApplicationTag.contains(appTag))) match { case Some(app) => Some(app) case None => @@ -686,15 +714,14 @@ private[utils] object KubernetesExtensions { appTagLabel: String = SPARK_APP_TAG_LABEL, appIdLabel: String = SPARK_APP_ID_LABEL ): Seq[KubernetesApplication] = { - client.pods.inAnyNamespace - .withLabels(labels.asJava) + client.pods.withLabels(labels.asJava) .withLabel(appTagLabel) .withLabel(appIdLabel) .list.getItems.asScala.map(new KubernetesApplication(_)) } def killApplication(app: KubernetesApplication): Boolean = { - client.pods.inAnyNamespace.delete(app.getApplicationPod) + client.pods.inNamespace(app.getApplicationNamespace).delete(app.getApplicationPod) } def getApplicationReport( @@ -703,12 +730,27 @@ private[utils] object KubernetesExtensions { cacheLogSize: Int, appTagLabel: String = SPARK_APP_TAG_LABEL ): KubernetesAppReport = { - val pods = client.pods.inNamespace(app.getApplicationNamespace) - .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) - .list.getItems.asScala - val driver = pods.find(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER) - val executors = - pods.filter(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR) + // Narrow the LIST to the driver pod; application state does not depend on executors. + val driver = client.pods.inNamespace(app.getApplicationNamespace) + .withLabels(Map( + appTagLabel -> app.getApplicationTag, + SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER + ).asJava) + .list.getItems.asScala.headOption + + // Executors are used only for log URLs and per-executor diagnostics; skip when disabled. + val executors: Seq[Pod] = + if (livyConf.getBoolean(LivyConf.KUBERNETES_EXECUTOR_TRACKING_ENABLED)) { + client.pods.inNamespace(app.getApplicationNamespace) + .withLabels(Map( + appTagLabel -> app.getApplicationTag, + SPARK_ROLE_LABEL -> SPARK_ROLE_EXECUTOR + ).asJava) + .list.getItems.asScala + } else { + Seq.empty + } + val appLog = Try( client.pods.inNamespace(app.getApplicationNamespace) .withName(app.getApplicationPod.getMetadata.getName) diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 401a8beb1..f143734c7 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -138,12 +138,24 @@ class BatchSessionSpec }) should be (true) } + it("should call kill on the app on stopSession") { + val conf = new LivyConf() + val mockApp = mock[SparkApp] + val m = BatchRecoveryMetadata(99, None, None, "appTag", null, None, "") + val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp)) + batch.start() + + batch.stopSession() + + verify(mockApp).kill() + } + def testRecoverSession(name: Option[String]): Unit = { val conf = new LivyConf() val req = new CreateBatchRequest() val name = Some("Test Batch Session") val mockApp = mock[SparkApp] - val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None) + val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None, "") val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp)) batch.state shouldBe (SessionState.Recovering) diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index e7d651f89..2b5ccf72a 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -279,7 +279,7 @@ class InteractiveSessionSpec extends FunSpec val m = InteractiveRecoveryMetadata( 78, Some("Test session"), None, "appTag", Spark, 0, null, None, None, None, None, None, None, Map.empty[String, String], List.empty[String], List.empty[String], - List.empty[String], None, List.empty[String], None, None, Some(URI.create(""))) + List.empty[String], None, List.empty[String], None, None, Some(URI.create("")), "") val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) s.start() @@ -298,7 +298,7 @@ class InteractiveSessionSpec extends FunSpec val m = InteractiveRecoveryMetadata( 78, None, None, "appTag", Spark, 0, null, None, None, None, None, None, None, Map.empty[String, String], List.empty[String], List.empty[String], - List.empty[String], None, List.empty[String], None, None, Some(URI.create(""))) + List.empty[String], None, List.empty[String], None, None, Some(URI.create("")), "") val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) s.start() @@ -315,7 +315,7 @@ class InteractiveSessionSpec extends FunSpec val m = InteractiveRecoveryMetadata( 78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None, None, None, None, Map.empty[String, String], List.empty[String], List.empty[String], - List.empty[String], None, List.empty[String], None, None, None) + List.empty[String], None, List.empty[String], None, None, None, "") val s = InteractiveSession.recover(m, conf, sessionStore, None) s.start() s.state shouldBe a[SessionState.Dead] diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 363b01f89..21798b48e 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -209,13 +209,23 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit when(session.lastActivity).thenReturn(System.nanoTime()) when(session.state).thenReturn(state) } + + it("should be idempotent on register() for a session id already tracked") { + val (livyConf, manager) = createSessionManager() + val session = new MockSession(manager.nextId(), null, livyConf, Some("foo")) + val first = manager.register(session) + val again = manager.register(session) + again should be theSameInstanceAs first + manager.size() should be (1) + session.stopped should be (false) + } } describe("BatchSessionManager") { implicit def executor: ExecutionContext = ExecutionContext.global def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = { - BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None) + BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None, "") } def mockSession(id: Int): BatchSession = { @@ -305,5 +315,94 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit verify(session, never).stop() } + + it("refresh should be a no-op when state store has no new sessions") { + val conf = new LivyConf() + val sessionStore = mock[SessionStore] + when(sessionStore.getNextSessionId("batch")).thenReturn(0) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq.empty) + + val sm = new BatchSessionManager(conf, sessionStore) + val result = sm.refresh() + result.added shouldBe 0 + result.total shouldBe 0 + result.failed shouldBe 0 + } + + it("refresh should import only previously-unseen sessions and skip already-known ids") { + val conf = new LivyConf() + conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster") + + val sessionStore = mock[SessionStore] + // Initial recovery: only id=0. + when(sessionStore.getNextSessionId("batch")).thenReturn(1) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq(Try(makeMetadata(0, "t0")))) + + val sm = new BatchSessionManager(conf, sessionStore) + sm.size() shouldBe 1 + sm.get(0) shouldBe defined + + // After a peer writes id=1 to the state store, refresh should add it without + // re-registering id=0. + when(sessionStore.getNextSessionId("batch")).thenReturn(2) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq(Try(makeMetadata(0, "t0")), Try(makeMetadata(1, "t1")))) + + val r1 = sm.refresh() + r1.added shouldBe 1 + r1.total shouldBe 2 + sm.get(1) shouldBe defined + + // A second refresh with no new entries should add nothing. + val r2 = sm.refresh() + r2.added shouldBe 0 + r2.total shouldBe 2 + } + + it("refresh should advance the id counter forward but never backward") { + val conf = new LivyConf() + val sessionStore = mock[SessionStore] + when(sessionStore.getNextSessionId("batch")).thenReturn(0) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq.empty) + + val sm = new BatchSessionManager(conf, sessionStore) + + // Local counter ahead of state store. + sm.nextId() shouldBe 0 + sm.nextId() shouldBe 1 + sm.nextId() shouldBe 2 + + // State store reports a smaller next id; refresh must NOT regress local counter. + when(sessionStore.getNextSessionId("batch")).thenReturn(1) + sm.refresh() + sm.nextId() shouldBe 3 + + // State store reports a larger next id; refresh SHOULD jump local counter forward. + when(sessionStore.getNextSessionId("batch")).thenReturn(99) + sm.refresh() + sm.nextId() shouldBe 99 + } + + it("refresh should report deserialization failures via failed count") { + val conf = new LivyConf() + conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster") + + val sessionStore = mock[SessionStore] + when(sessionStore.getNextSessionId("batch")).thenReturn(1) + when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) + .thenReturn(Seq( + Try(makeMetadata(0, "t0")), + Failure(new java.io.IOException("corrupted entry id=42")), + Failure(new java.io.IOException("corrupted entry id=43")))) + + val sm = new BatchSessionManager(conf, sessionStore) + val r = sm.refresh() + r.added shouldBe 0 + r.total shouldBe 1 + r.failed shouldBe 2 + } } } diff --git a/server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala new file mode 100644 index 000000000..fc668b558 --- /dev/null +++ b/server/src/test/scala/org/apache/livy/utils/SparkAppSpec.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.utils + +import java.io.{File, PrintWriter} +import java.nio.file.Files + +import org.scalatest.{BeforeAndAfterAll, FunSpec} + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} + +class SparkAppSpec extends FunSpec with LivyBaseUnitTestSuite with BeforeAndAfterAll { + + private def k8sConf(sparkHome: Option[String] = None): LivyConf = { + val conf = new LivyConf(false) + conf.set(LivyConf.LIVY_SPARK_MASTER, "k8s://https://kubernetes.default.svc:443") + sparkHome.foreach(conf.set(LivyConf.SPARK_HOME, _)) + conf + } + + /** Create a throwaway SPARK_HOME with the given spark-defaults.conf contents (or none). */ + private def withSparkHome(defaultsContent: Option[String])(f: String => Unit): Unit = { + val sparkHome = Files.createTempDirectory("livy-spark-home").toFile + try { + val confDir = new File(sparkHome, "conf") + assert(confDir.mkdirs()) + defaultsContent.foreach { content => + val writer = new PrintWriter(new File(confDir, "spark-defaults.conf")) + try writer.write(content) finally writer.close() + } + f(sparkHome.getAbsolutePath) + } finally { + deleteRecursively(sparkHome) + } + } + + private def deleteRecursively(file: File): Unit = { + if (file.isDirectory) { + Option(file.listFiles()).foreach(_.foreach(deleteRecursively)) + } + file.delete() + } + + describe("SparkApp.getNamespace") { + + it("should return an empty namespace when not running on Kubernetes") { + val conf = new LivyConf(false) + conf.set(LivyConf.LIVY_SPARK_MASTER, "yarn") + // A namespace in the conf must be ignored for non-Kubernetes masters. + val sparkConf = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "ignored") + assert(SparkApp.getNamespace(sparkConf, conf) === "") + } + + it("should prefer the namespace from the session Spark conf") { + val sparkConf = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "team-a") + assert(SparkApp.getNamespace(sparkConf, k8sConf()) === "team-a") + } + + it("should fall back to spark-defaults.conf when the conf has no namespace") { + withSparkHome(Some(s"${SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY} team-b\n")) { sparkHome => + assert(SparkApp.getNamespace(Map.empty, k8sConf(Some(sparkHome))) === "team-b") + } + } + + it("should fall back to the default namespace when spark-defaults.conf is absent") { + withSparkHome(None) { sparkHome => + assert(SparkApp.getNamespace(Map.empty, k8sConf(Some(sparkHome))) === + SparkApp.DEFAULT_KUBERNETES_NAMESPACE) + } + } + + it("should fall back to the default namespace when spark-defaults.conf lacks the key") { + withSparkHome(Some("spark.executor.memory 1g\n")) { sparkHome => + assert(SparkApp.getNamespace(Map.empty, k8sConf(Some(sparkHome))) === + SparkApp.DEFAULT_KUBERNETES_NAMESPACE) + } + } + + it("should fall back to the default namespace when SPARK_HOME is not set") { + // No SPARK_HOME on the conf; getNamespace must not throw and must use the default. + assert(SparkApp.getNamespace(Map.empty, k8sConf()) === + SparkApp.DEFAULT_KUBERNETES_NAMESPACE) + } + + it("should treat an empty namespace in the conf as unset") { + withSparkHome(Some(s"${SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY} team-c\n")) { sparkHome => + val sparkConf = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "") + assert(SparkApp.getNamespace(sparkConf, k8sConf(Some(sparkHome))) === "team-c") + } + } + } +} diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala index 24e66e0d9..a353c380c 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -18,6 +18,8 @@ package org.apache.livy.utils import java.util.Objects._ +import scala.concurrent.Promise + import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule, IngressSpec} import io.fabric8.kubernetes.client.KubernetesClient @@ -121,6 +123,32 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef .getExecutorsLogUrls.isEmpty) } + it("should return diagnostics without executor entries when executors is empty") { + // When livy.server.kubernetes.executor-tracking.enabled=false, the + // executor LIST in getApplicationReport is skipped and executors is + // passed in as Seq.empty. Diagnostics must still render the driver + // section without error. + val driverStatus = when(mock[PodStatus].getPhase).thenReturn("Running") + .getMock[PodStatus] + val driverMeta = when(mock[ObjectMeta].getName).thenReturn("driver-pod") + .getMock[ObjectMeta] + when(driverMeta.getNamespace).thenReturn("ns") + when(driverMeta.getLabels).thenReturn(Map.empty[String, String].asJava) + val driverSpec = when(mock[PodSpec].getNodeName).thenReturn("node-1") + .getMock[PodSpec] + when(driverSpec.getContainers).thenReturn(java.util.Collections.emptyList[Container]) + when(driverStatus.getConditions).thenReturn(java.util.Collections.emptyList[PodCondition]) + val driver = when(mock[Pod].getStatus).thenReturn(driverStatus).getMock[Pod] + when(driver.getMetadata).thenReturn(driverMeta) + when(driver.getSpec).thenReturn(driverSpec) + + val diagnostics = KubernetesAppReport( + Some(driver), Seq.empty, IndexedSeq.empty, None, new LivyConf(false) + ).getApplicationDiagnostics + assert(diagnostics.exists(_.contains("driver-pod"))) + assert(!diagnostics.exists(_.contains("executor"))) + } + it("should return driver ingress url") { def livyConf(protocol: Option[String]): LivyConf = { @@ -181,6 +209,50 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef } + describe("SparkKubernetesApp.appPromise") { + it("tryFailure must be a no-op after trySuccess (no IllegalStateException)") { + val p = Promise[KubernetesApplication]() + val app = mock[KubernetesApplication] + assert(p.trySuccess(app)) + assert(!p.tryFailure(new IllegalStateException("simulated k8s API error"))) + assert(p.future.value.exists(_.isSuccess)) + } + + it("trySuccess must be a no-op after tryFailure") { + val p = Promise[KubernetesApplication]() + assert(p.tryFailure(new IllegalStateException("simulated k8s API error"))) + val app = mock[KubernetesApplication] + assert(!p.trySuccess(app)) + assert(p.future.value.exists(_.isFailure)) + } + } + + describe("SparkKubernetesApp.kill") { + it("should remove the app from the monitor queue and be idempotent") { + // Drain anything left over from prior tests so we can assert exact size. + SparkKubernetesApp.clearApps + val sizeBefore = SparkKubernetesApp.getAppSize + val app = new SparkKubernetesApp( + "test-kill-tag", + None, + None, + None, + new LivyConf(false), + Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> "ns"), + SparkKubernetesApp.kubernetesClient) + + // Constructor enqueues itself. + assert(SparkKubernetesApp.getAppSize === sizeBefore + 1) + + app.kill() + assert(SparkKubernetesApp.getAppSize === sizeBefore) + + // Idempotent: a second call must be a no-op for the queue. + app.kill() + assert(SparkKubernetesApp.getAppSize === sizeBefore) + } + } + describe("KubernetesClientFactory") { it("should build KubernetesApi url from LivyConf masterUrl") { def actual(sparkMaster: String): String = @@ -208,6 +280,13 @@ class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with Bef KubernetesClientFactory.createKubernetesClient(conf) } } + + it("should enable executor tracking by default") { + // Preserve existing behavior: operators must opt in to skipping the + // executor LIST. This guards against an accidental default flip that + // would silently drop executor entries from session diagnostics. + assert(new LivyConf(false).getBoolean(LivyConf.KUBERNETES_EXECUTOR_TRACKING_ENABLED)) + } } describe("KubernetesClientExtensions") {