diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/FileJwtRetriever.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/FileJwtRetriever.java
index eeaee1cfb53e3..d22e98c267c88 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/FileJwtRetriever.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/FileJwtRetriever.java
@@ -31,8 +31,8 @@
import static org.apache.kafka.common.security.oauthbearer.internals.secured.CachedFile.STRING_JSON_VALIDATING_TRANSFORMER;
/**
- * FileJwtRetriever is an {@link JwtRetriever} that will load the contents
- * of a file, interpreting them as a JWT access key in the serialized form.
+ * FileJwtRetriever is a {@link JwtRetriever} that loads the contents
+ * of a file, interpreting them as a JWT access token in serialized form.
*/
public class FileJwtRetriever implements JwtRetriever {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
index ddbbd1a787a1a..83a3d278014e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -343,10 +343,6 @@ private void identifyExtensions() throws LoginException {
extensionsRequiringCommit = EMPTY_EXTENSIONS;
log.debug("CallbackHandler {} does not support SASL extensions. No extensions will be added", callbackHandler.getClass().getName());
}
- if (extensionsRequiringCommit == null) {
- log.error("SASL Extensions cannot be null. Check whether your callback handler is explicitly setting them as null.");
- throw new LoginException("Extensions cannot be null.");
- }
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerRefreshingLogin.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerRefreshingLogin.java
index 9c8ee63e4f2d7..f562871e61cd7 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerRefreshingLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerRefreshingLogin.java
@@ -20,16 +20,11 @@
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
-import org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredential;
import org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshConfig;
import org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kafka.common.security.oauthbearer.internals.expiring.OAuthBearerExpiringCredentialRefreshingLogin;
import java.util.Map;
-import java.util.Set;
import javax.security.auth.Subject;
import javax.security.auth.login.Configuration;
@@ -77,7 +72,6 @@
* @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
*/
public class OAuthBearerRefreshingLogin implements Login {
- private static final Logger log = LoggerFactory.getLogger(OAuthBearerRefreshingLogin.class);
private ExpiringCredentialRefreshingLogin expiringCredentialRefreshingLogin = null;
@Override
@@ -92,41 +86,9 @@ public void configure(Map configs, String contextName, Configuration
* reasonable.
*/
Class classToSynchronizeOnPriorToRefresh = OAuthBearerRefreshingLogin.class;
- expiringCredentialRefreshingLogin = new ExpiringCredentialRefreshingLogin(contextName, configuration,
+ expiringCredentialRefreshingLogin = new OAuthBearerExpiringCredentialRefreshingLogin(contextName, configuration,
new ExpiringCredentialRefreshConfig(configs, true), loginCallbackHandler,
- classToSynchronizeOnPriorToRefresh) {
- @Override
- public ExpiringCredential expiringCredential() {
- Set privateCredentialTokens = expiringCredentialRefreshingLogin.subject()
- .getPrivateCredentials(OAuthBearerToken.class);
- if (privateCredentialTokens.isEmpty())
- return null;
- final OAuthBearerToken token = privateCredentialTokens.iterator().next();
- if (log.isDebugEnabled())
- log.debug("Found expiring credential with principal '{}'.", token.principalName());
- return new ExpiringCredential() {
- @Override
- public String principalName() {
- return token.principalName();
- }
-
- @Override
- public Long startTimeMs() {
- return token.startTimeMs();
- }
-
- @Override
- public long expireTimeMs() {
- return token.lifetimeMs();
- }
-
- @Override
- public Long absoluteLastRefreshTimeMs() {
- return null;
- }
- };
- }
- };
+ classToSynchronizeOnPriorToRefresh);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
index b12ee7ffe6146..3591ad2b409dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
@@ -157,7 +157,7 @@ public ExpiringCredentialRefreshingLogin(String contextName, Configuration confi
mandatoryClassToSynchronizeOnPriorToRefresh, new LoginContextFactory(), Time.SYSTEM);
}
- public ExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
+ ExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig,
AuthenticateCallbackHandler callbackHandler, Class> mandatoryClassToSynchronizeOnPriorToRefresh,
LoginContextFactory loginContextFactory, Time time) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/OAuthBearerExpiringCredentialRefreshingLogin.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/OAuthBearerExpiringCredentialRefreshingLogin.java
new file mode 100644
index 0000000000000..89172ec3ecc2c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/OAuthBearerExpiringCredentialRefreshingLogin.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.internals.expiring;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.utils.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+import javax.security.auth.login.Configuration;
+
+public class OAuthBearerExpiringCredentialRefreshingLogin extends ExpiringCredentialRefreshingLogin {
+
+ private static final Logger log = LoggerFactory.getLogger(OAuthBearerExpiringCredentialRefreshingLogin.class);
+
+ public OAuthBearerExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
+ ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig,
+ AuthenticateCallbackHandler callbackHandler,
+ Class> mandatoryClassToSynchronizeOnPriorToRefresh) {
+ super(contextName, configuration, expiringCredentialRefreshConfig, callbackHandler,
+ mandatoryClassToSynchronizeOnPriorToRefresh);
+ }
+
+ OAuthBearerExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
+ ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig,
+ AuthenticateCallbackHandler callbackHandler,
+ Class> mandatoryClassToSynchronizeOnPriorToRefresh,
+ ExpiringCredentialRefreshingLogin.LoginContextFactory loginContextFactory,
+ Time time) {
+ super(contextName, configuration, expiringCredentialRefreshConfig, callbackHandler,
+ mandatoryClassToSynchronizeOnPriorToRefresh, loginContextFactory, time);
+ }
+
+ @Override
+ public ExpiringCredential expiringCredential() {
+ Set privateCredentialTokens = this.subject()
+ .getPrivateCredentials(OAuthBearerToken.class);
+ if (privateCredentialTokens.isEmpty())
+ return null;
+ final OAuthBearerToken token = privateCredentialTokens.iterator().next();
+ if (log.isDebugEnabled())
+ log.debug("Found expiring credential with principal '{}'.", token.principalName());
+ return new ExpiringCredential() {
+ @Override
+ public String principalName() {
+ return token.principalName();
+ }
+
+ @Override
+ public Long startTimeMs() {
+ return token.startTimeMs();
+ }
+
+ @Override
+ public long expireTimeMs() {
+ return token.lifetimeMs();
+ }
+
+ @Override
+ public Long absoluteLastRefreshTimeMs() {
+ return null;
+ }
+ };
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
index 3eebecf8fde10..40ff05beb0228 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
@@ -221,12 +221,7 @@ public URL validateUrl(String name) {
throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that is malformed: %s", name, value, e.getMessage()));
}
- String protocol = url.getProtocol();
-
- if (protocol == null || protocol.trim().isEmpty())
- throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that is missing the protocol", name, value));
-
- protocol = protocol.toLowerCase(Locale.ROOT);
+ String protocol = url.getProtocol().toLowerCase(Locale.ROOT);
if (!(protocol.equals("http") || protocol.equals("https") || protocol.equals("file")))
throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that contains an invalid protocol (%s); only \"http\", \"https\", and \"file\" protocol are supported", name, value, protocol));
@@ -414,4 +409,8 @@ private void throwIfResourceIsNotAllowed(String resourceType,
throw new ConfigException(configName, configValue, message);
}
}
+
+ String prefix() {
+ return prefix;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksVerificationKeyResolver.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksVerificationKeyResolver.java
index d6f6a01089419..27b82e4d37481 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksVerificationKeyResolver.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksVerificationKeyResolver.java
@@ -41,11 +41,11 @@
* RefreshingHttpsJwksVerificationKeyResolver is a
* {@link VerificationKeyResolver} implementation that will periodically refresh the
* JWKS using its {@link HttpsJwks} instance.
- *
+ *
* A JWKS (JSON Web Key Set)
* is a JSON document provided by the OAuth/OIDC provider that lists the keys used to sign the JWTs
* it issues.
- *
+ *
* Here is a sample JWKS JSON document:
*
*
@@ -76,7 +76,7 @@
* order to match up the JWT's signing key with the key in the JWKS. During the validation step of
* the broker, the jose4j OAuth library will use the contents of the appropriate key in the JWKS
* to validate the signature.
- *
+ *
* Given that the JWKS is referenced by the JWT, the JWKS must be made available by the
* OAuth/OIDC provider so that a JWT can be validated.
*
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/FileJwtRetrieverTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/FileJwtRetrieverTest.java
new file mode 100644
index 0000000000000..7ca1ace4f2e8b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/FileJwtRetrieverTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer;
+
+import org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+
+class FileJwtRetrieverTest extends OAuthBearerTest {
+
+ @AfterEach
+ void tearDown() {
+ System.clearProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+ }
+
+ @Test
+ public void testRetrieveCalledBeforeConfigure() throws IOException {
+ try (FileJwtRetriever retriever = new FileJwtRetriever()) {
+
+ Assertions.assertThrows(
+ IllegalStateException.class,
+ retriever::retrieve
+ );
+ }
+ }
+
+ @Test
+ public void testRetrieveReturnsTokenFromFile() throws Exception {
+ String jwtFileContent = createJwt("test");
+ String jwtFileURI = TestUtils.tempFile(jwtFileContent).toURI().toString();
+ System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, jwtFileURI);
+
+ try (FileJwtRetriever retriever = new FileJwtRetriever()) {
+ retriever.configure(
+ Map.of(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, jwtFileURI),
+ OAUTHBEARER_MECHANISM,
+ Collections.emptyList()
+ );
+
+ Assertions.assertEquals(jwtFileContent, retriever.retrieve());
+ }
+ }
+
+ @Test
+ public void testRetrieveThrowsIfFileIsMissing() throws Exception {
+ String jwtFileContent = createJwt("test");
+ File jwtFile = TestUtils.tempFile(jwtFileContent);
+ System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, jwtFile.toURI().toString());
+
+ try (FileJwtRetriever retriever = new FileJwtRetriever()) {
+ retriever.configure(
+ Map.of(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, jwtFile.toURI().toString()),
+ OAUTHBEARER_MECHANISM,
+ Collections.emptyList()
+ );
+ Files.delete(jwtFile.toPath());
+
+ Assertions.assertThrows(
+ Exception.class,
+ retriever::retrieve
+ );
+ }
+ }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
index 4a4e567dedfdf..c7dd8d2d0be1e 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
@@ -148,4 +148,11 @@ public void testConfigureWithInvalidPassphrase() throws Exception {
assertInstanceOf(IOException.class, e.getCause());
}
}
+
+ @Test
+ public void testRetrieveCalledBeforeConfigure() throws IOException {
+ try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) {
+ assertThrows(IllegalStateException.class, jwtRetriever::retrieve);
+ }
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
index 4efbc21072280..0519624fbef20 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -33,15 +34,21 @@
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.LoginException;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -49,6 +56,23 @@ public class OAuthBearerLoginModuleTest {
public static final SaslExtensions RAISE_UNSUPPORTED_CB_EXCEPTION_FLAG = null;
+ private OAuthBearerToken[] tokens;
+ private SaslExtensions[] extensions;
+
+ @BeforeEach
+ public void setup() {
+ tokens = new OAuthBearerToken[] {
+ mock(OAuthBearerToken.class),
+ mock(OAuthBearerToken.class),
+ mock(OAuthBearerToken.class)
+ };
+ extensions = new SaslExtensions[] {
+ saslExtensions(),
+ saslExtensions(),
+ saslExtensions()
+ };
+ }
+
private static class TestCallbackHandler implements AuthenticateCallbackHandler {
private final OAuthBearerToken[] tokens;
private int index = 0;
@@ -126,10 +150,6 @@ public void login1Commit1Login2Commit2Logout1Login3Commit3Logout2() throws Login
Set