Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@
# details, routes, etc...)
# livy.server.kubernetes.poll-interval = 15s

# Whether Livy fetches executor pods each poll cycle (used for executor log URLs and
# per-executor diagnostics). Disable on large clusters; driver pod is polled regardless.
# livy.server.kubernetes.executor-tracking.enabled = true

# Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, configure the desired
# options below
# livy.server.kubernetes.ingress.create = false
Expand Down
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<slf4j.version>1.7.36</slf4j.version>
<spark.scala-2.12.version>3.3.4</spark.scala-2.12.version>
<spark.version>${spark.scala-2.12.version}</spark.version>
<kubernetes.client.version>5.6.0</kubernetes.client.version>
<kubernetes.client.version>6.8.1</kubernetes.client.version>
<hive.version>3.0.0</hive.version>
<commons-codec.version>1.15</commons-codec.version>
<commons-lang3.version>3.17.0</commons-lang3.version>
Expand Down Expand Up @@ -1399,6 +1399,13 @@
</reporting>

<profiles>
<profile>
<id>hadoop3</id>
<properties>
<hadoop.major-minor.version>3</hadoop.major-minor.version>
<hadoop.version>3.4.0</hadoop.version>
</properties>
</profile>
<profile>
<id>hadoop2</id>
<properties>
Expand Down Expand Up @@ -1432,7 +1439,7 @@
<java.version>1.8</java.version>
<py4j.version>0.10.9.7</py4j.version>
<json4s.version>3.7.0-M11</json4s.version>
<netty.version>4.1.96.Final</netty.version>
<netty.version>4.1.108.Final</netty.version>
<jackson.version>2.15.2</jackson.version>
<jackson-databind.version>2.15.2</jackson-databind.version>
<spark.bin.name>spark-${spark.version}-bin-hadoop${hadoop.major-minor.version}</spark.bin.name>
Expand Down
13 changes: 9 additions & 4 deletions rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,18 @@ void dispose() {
//Note. Your compiler or IDE may identify this method as unused
//tests fail without it
public void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = insocket.getAddress().getHostAddress();
ContextInfo info = new ContextInfo(ip, msg.port, clientId, secret);
String driverHost;
if (conf.getBoolean(DRIVER_ADDRESS_USE_HOSTNAME)) {
driverHost = msg.host;
} else {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
driverHost = insocket.getAddress().getHostAddress();
}
ContextInfo info = new ContextInfo(driverHost, msg.port, clientId, secret);
if (promise.trySuccess(info)) {
timeout.cancel(true);
LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(),
msg.host, msg.port);
driverHost, msg.port);
} else {
LOG.warn("Connection established but promise is already finalized.");
}
Expand Down
3 changes: 3 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public enum Entry implements ConfEntry {
// How long will the RSC wait for a connection for a Livy server before shutting itself down.
SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"),

// Use driver's reported hostname instead of socket IP. Useful for Kubernetes + Istio mode.
DRIVER_ADDRESS_USE_HOSTNAME("driver.address.use-hostname", false),

PROXY_USER("proxy-user", null),

RPC_SERVER_ADDRESS("rpc.server.address", null),
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ object LivyConf {
// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")

// Whether Livy fetches executor pods each poll cycle (used for executor log URLs
// and per-executor diagnostics). Driver pod is polled regardless.
val KUBERNETES_EXECUTOR_TRACKING_ENABLED =
Entry("livy.server.kubernetes.executor-tracking.enabled", true)

// How long to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
Expand Down
37 changes: 36 additions & 1 deletion server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.livy.server.batch.BatchSessionServlet
import org.apache.livy.server.interactive.InteractiveSessionServlet
import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManager}
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager, SessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
import org.apache.livy.utils.LivySparkUtils._
Expand Down Expand Up @@ -176,6 +176,39 @@ class LivyServer extends Logging {
}
}

// Operator endpoint: re-scan the recovery state store and import any sessions
// written by another Livy server. Guarded by livy.superusers.
val recoveryServlet = new JsonServlet {
before() {
contentType = "application/json"
val user = request.getRemoteUser()
if (!accessManager.checkSuperUser(user)) {
halt(403, Map("msg" -> s"User '$user' not authorized for recovery endpoints."))
}
}

private def resultMap(r: SessionManager.RefreshResult): Map[String, Int] =
Map("added" -> r.added, "total" -> r.total, "failed" -> r.failed)

post("/sessions/refresh") {
info(s"Interactive session refresh triggered by user='${request.getRemoteUser()}'")
resultMap(interactiveSessionManager.refresh())
}

post("/batches/refresh") {
info(s"Batch session refresh triggered by user='${request.getRemoteUser()}'")
resultMap(batchSessionManager.refresh())
}

post("/refresh") {
info(s"Full session refresh triggered by user='${request.getRemoteUser()}'")
Map(
"batches" -> resultMap(batchSessionManager.refresh()),
"sessions" -> resultMap(interactiveSessionManager.refresh())
)
}
}

// Servlet for hosting static files such as html, css, and js
// Necessary since Jetty cannot set it's resource base inside a jar
// Returns 404 if the file does not exist
Expand Down Expand Up @@ -254,6 +287,8 @@ class LivyServer extends Logging {
context.mountMetricsAdminServlet("/metrics")

mount(context, livyVersionServlet, "/version/*")

mount(context, recoveryServlet, "/recovery/*")
} catch {
case e: Throwable =>
error("Exception thrown when initializing server", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ case class BatchRecoveryMetadata(
appTag: String,
owner: String,
proxyUser: Option[String],
namespace: String,
version: Int = 1)
extends RecoveryMetadata

Expand All @@ -64,7 +65,7 @@ object BatchSession extends Logging {
mockApp: Option[SparkApp] = None): BatchSession = {
val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)

val namespace = SparkApp.getNamespace(request.conf, livyConf)
def createSparkApp(s: BatchSession): SparkApp = {
val conf = SparkApp.prepareSparkConf(
appTag,
Expand Down Expand Up @@ -106,7 +107,8 @@ object BatchSession extends Logging {
childProcesses.decrementAndGet()
}
}
SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s))
val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace)
SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s), extrasMap)
}

info(s"Creating batch session $id: [owner: $owner, request: $request]")
Expand All @@ -120,6 +122,7 @@ object BatchSession extends Logging {
owner,
impersonatedUser,
sessionStore,
namespace,
mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
}

Expand All @@ -137,8 +140,10 @@ object BatchSession extends Logging {
m.owner,
m.proxyUser,
sessionStore,
m.namespace,
mockApp.map { m => (_: BatchSession) => m }.getOrElse { s =>
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s))
val extrasMap = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> m.namespace)
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s), extrasMap)
})
}
}
Expand All @@ -152,6 +157,7 @@ class BatchSession(
owner: String,
override val proxyUser: Option[String],
sessionStore: SessionStore,
namespace: String,
sparkApp: BatchSession => SparkApp)
extends Session(id, name, owner, livyConf) with SparkAppListener {
import BatchSession._
Expand Down Expand Up @@ -204,5 +210,5 @@ class BatchSession(
override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }

override def recoveryMetadata: RecoveryMetadata =
BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser)
BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, namespace)
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ case class InteractiveRecoveryMetadata(
// proxyUser is deprecated. It is available here only for backward compatibility
proxyUser: Option[String],
rscDriverUri: Option[URI],
namespace: String,
version: Int = 1)
extends RecoveryMetadata

Expand All @@ -93,6 +94,7 @@ object InteractiveSession extends Logging {
mockClient: Option[RSCClient] = None): InteractiveSession = {
val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)
val namespace = SparkApp.getNamespace(request.conf, livyConf)

val client = mockClient.orElse {
val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
Expand Down Expand Up @@ -153,6 +155,7 @@ object InteractiveSession extends Logging {
request.numExecutors,
request.pyFiles,
request.queue,
namespace,
mockApp)
}

Expand Down Expand Up @@ -193,6 +196,7 @@ object InteractiveSession extends Logging {
metadata.numExecutors,
metadata.pyFiles,
metadata.queue,
metadata.namespace,
mockApp)
}

Expand Down Expand Up @@ -432,6 +436,7 @@ class InteractiveSession(
val numExecutors: Option[Int],
val pyFiles: List[String],
val queue: Option[String],
val namespace: String,
mockApp: Option[SparkApp]) // For unit test.
extends Session(id, name, owner, ttl, idleTimeout, livyConf)
with SessionHeartbeat
Expand Down Expand Up @@ -461,11 +466,14 @@ class InteractiveSession(
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
val extrasMap = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace)
if (!livyConf.isRunningOnKubernetes()) {
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
driverProcess.map { _ =>
SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap)
}
} else {
// Create SparkApp for Kubernetes anyway
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap))
}
}

Expand Down Expand Up @@ -534,7 +542,7 @@ class InteractiveSession(
heartbeatTimeout.toSeconds.toInt, owner, ttl, idleTimeout,
driverMemory, driverCores, executorMemory, executorCores, conf,
archives, files, jars, numExecutors, pyFiles, queue,
proxyUser, rscDriverUri)
proxyUser, rscDriverUri, namespace)

override def state: SessionState = {
if (serverSideState == SessionState.Running) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import org.apache.livy.sessions.Session.RecoveryMetadata
object SessionManager {
val SESSION_RECOVERY_MODE_OFF = "off"
val SESSION_RECOVERY_MODE_RECOVERY = "recovery"

/** Outcome of a [[SessionManager.refresh]] call. */
case class RefreshResult(added: Int, total: Int, failed: Int)
}

class BatchSessionManager(
Expand Down Expand Up @@ -100,8 +103,14 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
}

def register(session: S): S = {
info(s"Registering new session ${session.id}")
synchronized {
sessions.get(session.id) match {
case Some(existing) =>
debug(s"Session ${session.id} already registered; skipping duplicate registration.")
return existing
case None =>
}
info(s"Registering new session ${session.id}")
session.name.foreach { sessionName =>
if (sessionsByName.contains(sessionName)) {
val errMsg = s"Duplicate session name: ${session.name} for session ${session.id}"
Expand Down Expand Up @@ -229,6 +238,38 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
recoveredSessions
}

/**
* Re-scan the state store and import sessions written by another Livy server.
* Add-only: in-memory sessions are not removed even if their state-store entry is
* gone. The id counter is advanced forward only.
*/
def refresh(): RefreshResult = {
// Read the state store outside the SessionManager monitor so we don't block the
// garbage collector and heartbeat watchdog while doing N small EFS / ZK reads.
val storeNextId = sessionStore.getNextSessionId(sessionType)
val sessionMetadata = sessionStore.getAllSessions[R](sessionType)

val recoveryFailure = sessionMetadata.filter(_.isFailure).map(_.failed.get)
recoveryFailure.foreach(ex => warn(s"Refresh failure for $sessionType: ${ex.getMessage}", ex))

synchronized {
if (storeNextId > idCounter.get) {
idCounter.set(storeNextId)
}

val before = sessions.size
sessionMetadata.flatMap(_.toOption)
.filterNot(m => sessions.contains(m.id))
.map(sessionRecovery)
.foreach(register)

val added = sessions.size - before
info(s"Refreshed $sessionType sessions: added=$added, total=${sessions.size}," +
s" failed=${recoveryFailure.size}, next session id=$idCounter")
RefreshResult(added, sessions.size, recoveryFailure.size)
}
}

private class GarbageCollector extends Thread("session gc thread") {

setDaemon(true)
Expand Down
Loading
Loading