Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import static org.apache.kafka.common.security.oauthbearer.internals.secured.CachedFile.STRING_JSON_VALIDATING_TRANSFORMER;

/**
* <code>FileJwtRetriever</code> is an {@link JwtRetriever} that will load the contents
* of a file, interpreting them as a JWT access key in the serialized form.
* <code>FileJwtRetriever</code> is a {@link JwtRetriever} that loads the contents
* of a file, interpreting them as a JWT access token in serialized form.
*/
public class FileJwtRetriever implements JwtRetriever {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,6 @@ private void identifyExtensions() throws LoginException {
extensionsRequiringCommit = EMPTY_EXTENSIONS;
log.debug("CallbackHandler {} does not support SASL extensions. No extensions will be added", callbackHandler.getClass().getName());
}
if (extensionsRequiringCommit == null) {
log.error("SASL Extensions cannot be null. Check whether your callback handler is explicitly setting them as null.");
throw new LoginException("Extensions cannot be null.");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredential;
import org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshConfig;
import org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.common.security.oauthbearer.internals.expiring.OAuthBearerExpiringCredentialRefreshingLogin;

import java.util.Map;
import java.util.Set;

import javax.security.auth.Subject;
import javax.security.auth.login.Configuration;
Expand Down Expand Up @@ -77,7 +72,6 @@
* @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
*/
public class OAuthBearerRefreshingLogin implements Login {
private static final Logger log = LoggerFactory.getLogger(OAuthBearerRefreshingLogin.class);
private ExpiringCredentialRefreshingLogin expiringCredentialRefreshingLogin = null;

@Override
Expand All @@ -92,41 +86,9 @@ public void configure(Map<String, ?> configs, String contextName, Configuration
* reasonable.
*/
Class<OAuthBearerRefreshingLogin> classToSynchronizeOnPriorToRefresh = OAuthBearerRefreshingLogin.class;
expiringCredentialRefreshingLogin = new ExpiringCredentialRefreshingLogin(contextName, configuration,
expiringCredentialRefreshingLogin = new OAuthBearerExpiringCredentialRefreshingLogin(contextName, configuration,
new ExpiringCredentialRefreshConfig(configs, true), loginCallbackHandler,
classToSynchronizeOnPriorToRefresh) {
@Override
public ExpiringCredential expiringCredential() {
Set<OAuthBearerToken> privateCredentialTokens = expiringCredentialRefreshingLogin.subject()
.getPrivateCredentials(OAuthBearerToken.class);
if (privateCredentialTokens.isEmpty())
return null;
final OAuthBearerToken token = privateCredentialTokens.iterator().next();
if (log.isDebugEnabled())
log.debug("Found expiring credential with principal '{}'.", token.principalName());
return new ExpiringCredential() {
@Override
public String principalName() {
return token.principalName();
}

@Override
public Long startTimeMs() {
return token.startTimeMs();
}

@Override
public long expireTimeMs() {
return token.lifetimeMs();
}

@Override
public Long absoluteLastRefreshTimeMs() {
return null;
}
};
}
};
classToSynchronizeOnPriorToRefresh);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public ExpiringCredentialRefreshingLogin(String contextName, Configuration confi
mandatoryClassToSynchronizeOnPriorToRefresh, new LoginContextFactory(), Time.SYSTEM);
}

public ExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
ExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig,
AuthenticateCallbackHandler callbackHandler, Class<?> mandatoryClassToSynchronizeOnPriorToRefresh,
LoginContextFactory loginContextFactory, Time time) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.security.oauthbearer.internals.expiring;

import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.utils.Time;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

import javax.security.auth.login.Configuration;

public class OAuthBearerExpiringCredentialRefreshingLogin extends ExpiringCredentialRefreshingLogin {

private static final Logger log = LoggerFactory.getLogger(OAuthBearerExpiringCredentialRefreshingLogin.class);

public OAuthBearerExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig,
AuthenticateCallbackHandler callbackHandler,
Class<?> mandatoryClassToSynchronizeOnPriorToRefresh) {
super(contextName, configuration, expiringCredentialRefreshConfig, callbackHandler,
mandatoryClassToSynchronizeOnPriorToRefresh);
}

OAuthBearerExpiringCredentialRefreshingLogin(String contextName, Configuration configuration,
ExpiringCredentialRefreshConfig expiringCredentialRefreshConfig,
AuthenticateCallbackHandler callbackHandler,
Class<?> mandatoryClassToSynchronizeOnPriorToRefresh,
ExpiringCredentialRefreshingLogin.LoginContextFactory loginContextFactory,
Time time) {
super(contextName, configuration, expiringCredentialRefreshConfig, callbackHandler,
mandatoryClassToSynchronizeOnPriorToRefresh, loginContextFactory, time);
}

@Override
public ExpiringCredential expiringCredential() {
Set<OAuthBearerToken> privateCredentialTokens = this.subject()
.getPrivateCredentials(OAuthBearerToken.class);
if (privateCredentialTokens.isEmpty())
return null;
final OAuthBearerToken token = privateCredentialTokens.iterator().next();
if (log.isDebugEnabled())
log.debug("Found expiring credential with principal '{}'.", token.principalName());
return new ExpiringCredential() {
@Override
public String principalName() {
return token.principalName();
}

@Override
public Long startTimeMs() {
return token.startTimeMs();
}

@Override
public long expireTimeMs() {
return token.lifetimeMs();
}

@Override
public Long absoluteLastRefreshTimeMs() {
return null;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,7 @@ public URL validateUrl(String name) {
throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that is malformed: %s", name, value, e.getMessage()));
}

String protocol = url.getProtocol();

if (protocol == null || protocol.trim().isEmpty())
throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that is missing the protocol", name, value));

protocol = protocol.toLowerCase(Locale.ROOT);
String protocol = url.getProtocol().toLowerCase(Locale.ROOT);

if (!(protocol.equals("http") || protocol.equals("https") || protocol.equals("file")))
throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that contains an invalid protocol (%s); only \"http\", \"https\", and \"file\" protocol are supported", name, value, protocol));
Expand Down Expand Up @@ -414,4 +409,8 @@ private void throwIfResourceIsNotAllowed(String resourceType,
throw new ConfigException(configName, configValue, message);
}
}

String prefix() {
return prefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
* <code>RefreshingHttpsJwksVerificationKeyResolver</code> is a
* {@link VerificationKeyResolver} implementation that will periodically refresh the
* JWKS using its {@link HttpsJwks} instance.
*
* <p/>
* A <a href="https://datatracker.ietf.org/doc/html/rfc7517#section-5">JWKS (JSON Web Key Set)</a>
* is a JSON document provided by the OAuth/OIDC provider that lists the keys used to sign the JWTs
* it issues.
*
* <p/>
* Here is a sample JWKS JSON document:
*
* <pre>
Expand Down Expand Up @@ -76,7 +76,7 @@
* order to match up the JWT's signing key with the key in the JWKS. During the validation step of
* the broker, the jose4j OAuth library will use the contents of the appropriate key in the JWKS
* to validate the signature.
*
* <p/>
* Given that the JWKS is referenced by the JWT, the JWKS must be made available by the
* OAuth/OIDC provider so that a JWT can be validated.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.security.oauthbearer;

import org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Map;

import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;

class FileJwtRetrieverTest extends OAuthBearerTest {

@AfterEach
void tearDown() {
System.clearProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
}

@Test
public void testRetrieveCalledBeforeConfigure() throws IOException {
try (FileJwtRetriever retriever = new FileJwtRetriever()) {

Assertions.assertThrows(
IllegalStateException.class,
retriever::retrieve
);
}
}

@Test
public void testRetrieveReturnsTokenFromFile() throws Exception {
String jwtFileContent = createJwt("test");
String jwtFileURI = TestUtils.tempFile(jwtFileContent).toURI().toString();
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, jwtFileURI);

try (FileJwtRetriever retriever = new FileJwtRetriever()) {
retriever.configure(
Map.of(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, jwtFileURI),
OAUTHBEARER_MECHANISM,
Collections.emptyList()
);

Assertions.assertEquals(jwtFileContent, retriever.retrieve());
}
}

@Test
public void testRetrieveThrowsIfFileIsMissing() throws Exception {
String jwtFileContent = createJwt("test");
File jwtFile = TestUtils.tempFile(jwtFileContent);
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, jwtFile.toURI().toString());

try (FileJwtRetriever retriever = new FileJwtRetriever()) {
retriever.configure(
Map.of(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, jwtFile.toURI().toString()),
OAUTHBEARER_MECHANISM,
Collections.emptyList()
);
Files.delete(jwtFile.toPath());

Assertions.assertThrows(
Exception.class,
retriever::retrieve
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,11 @@ public void testConfigureWithInvalidPassphrase() throws Exception {
assertInstanceOf(IOException.class, e.getCause());
}
}

@Test
public void testRetrieveCalledBeforeConfigure() throws IOException {
try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) {
assertThrows(IllegalStateException.class, jwtRetriever::retrieve);
}
}
}
Loading