diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala index 3c84b4a1d..f129eb0af 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala @@ -359,7 +359,8 @@ class LivyCLIService(server: LivyThriftServer) authFactory: AuthFactory, owner: String, renewer: String): String = { - throw new HiveSQLException("Operation not yet supported.") + debug(sessionHandle + ": getDelegationToken()") + authFactory.getDelegationToken(owner, renewer) } @throws[HiveSQLException] @@ -371,14 +372,16 @@ class LivyCLIService(server: LivyThriftServer) sessionHandle: SessionHandle, authFactory: AuthFactory, tokenStr: String): Unit = { - throw new HiveSQLException("Operation not yet supported.") + debug(sessionHandle + ": cancelDelegationToken()") + authFactory.cancelDelegationToken(tokenStr) } def renewDelegationToken( sessionHandle: SessionHandle, authFactory: AuthFactory, tokenStr: String): Unit = { - throw new HiveSQLException("Operation not yet supported.") + debug(sessionHandle + ": renewDelegationToken()") + authFactory.renewDelegationToken(tokenStr) } @throws[HiveSQLException] diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthFactory.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthFactory.scala index 6ac61d29a..10b34d878 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthFactory.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthFactory.scala @@ -149,6 +149,55 @@ class AuthFactory(val conf: LivyConf) extends Logging { throw new HiveSQLException(msg, "08S01", e) } } + + @throws[HiveSQLException] + def cancelDelegationToken(delegationToken: String): Unit = { + if (secretManager.isEmpty) { + throw new HiveSQLException( + "Delegation token only supported over kerberos authentication", "08S01") + } + try { + secretManager.get.cancelDelegationToken(delegationToken) + } catch { + case e: IOException => + val msg = s"Error canceling delegation token $delegationToken" + error(msg, e) + throw new HiveSQLException(msg, "08S01", e) + } + } + + @throws[HiveSQLException] + def getDelegationToken(ownerStr: String, renewer: String): String = { + if (secretManager.isEmpty) { + throw new HiveSQLException( + "Delegation token only supported over kerberos authentication", "08S01") + } + try { + secretManager.get.getDelegationToken(ownerStr, renewer) + } catch { + case e: IOException => + val msg = s"Error getting delegation owner $ownerStr" + error(msg, e) + throw new HiveSQLException(msg, "08S01", e) + } + } + + @throws[HiveSQLException] + def renewDelegationToken(delegationToken: String): Unit = { + if (secretManager.isEmpty) { + throw new HiveSQLException( + "Delegation token only supported over kerberos authentication", "08S01") + } + try { + secretManager.get.renewDelegationToken(delegationToken) + } catch { + case e: IOException => + val msg = s"Error renewing delegation token $delegationToken" + error(msg, e) + throw new HiveSQLException(msg, "08S01", e) + } + } + } class SQLPlainProcessorFactory(val service: Iface) extends TProcessorFactory(null) { diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/LivyDelegationTokenSecretManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/LivyDelegationTokenSecretManager.scala index e34306eb7..25c58667a 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/LivyDelegationTokenSecretManager.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/LivyDelegationTokenSecretManager.scala @@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, DataInputStream, IOException} import org.apache.hadoop.io.Text import org.apache.hadoop.security.token.Token import org.apache.hadoop.security.token.delegation.{AbstractDelegationTokenIdentifier, AbstractDelegationTokenSecretManager} +import org.apache.hadoop.security.UserGroupInformation import org.apache.livy.LivyConf @@ -49,6 +50,14 @@ class LivyDelegationTokenSecretManager(val livyConf: LivyConf) id.getUser.getShortUserName } + @throws[IOException] + def renewDelegationToken(tokenStrForm: String): Unit = { + val t = new Token[LivyDelegationTokenIdentifier] + t.decodeFromUrlString(tokenStrForm) + val user = UserGroupInformation.getCurrentUser().getShortUserName() + renewToken(t, user) + } + @throws[IOException] protected def getTokenIdentifier( token: Token[LivyDelegationTokenIdentifier]): LivyDelegationTokenIdentifier = { @@ -59,6 +68,27 @@ class LivyDelegationTokenSecretManager(val livyConf: LivyConf) id.readFields(in) id } + + @throws[IOException] + def cancelDelegationToken(tokenStrForm: String): Unit = { + val t = new Token[LivyDelegationTokenIdentifier] + t.decodeFromUrlString(tokenStrForm) + val user = UserGroupInformation.getCurrentUser.getUserName + cancelToken(t, user) + } + + @throws[IOException] + def getDelegationToken(ownerStr: String, renewer: String): String = { + if (ownerStr == null) throw new RuntimeException("Delegation token owner is null") + val owner = new Text(ownerStr) + var realUser: Text = null + val currentUgi = UserGroupInformation.getCurrentUser + if (currentUgi.getUserName != null) realUser = new Text(currentUgi.getUserName) + val ident = new LivyDelegationTokenIdentifier(owner, new Text(renewer), realUser) + val t = new Token[LivyDelegationTokenIdentifier](ident, this) + t.encodeToUrlString() + } + } /**