+ * Uses {@link EnterpriseAuthProvider} with an assertion callback that performs RFC
+ * 8693 token exchange at the IdP, then exchanges the ID-JAG for an access token at
+ * the MCP authorization server via RFC 7523 JWT Bearer grant.
+ * @param serverUrl the URL of the MCP server
+ * @throws Exception if any error occurs during execution
+ */
+ private static void runCrossAppAccessCompleteFlowScenario(String serverUrl) throws Exception {
+ String contextEnv = System.getenv("MCP_CONFORMANCE_CONTEXT");
+ if (contextEnv == null || contextEnv.isEmpty()) {
+ System.err.println("Error: MCP_CONFORMANCE_CONTEXT environment variable is not set");
+ System.exit(1);
+ }
+
+ CrossAppAccessContext ctx = new ObjectMapper().readValue(contextEnv, CrossAppAccessContext.class);
+
+ java.net.http.HttpClient httpClient = java.net.http.HttpClient.newHttpClient();
+
+ EnterpriseAuthProviderOptions options = EnterpriseAuthProviderOptions.builder()
+ .clientId(ctx.clientId())
+ .clientSecret(ctx.clientSecret())
+ .assertionCallback(assertionCtx -> {
+ // RFC 8693 token exchange at the IdP: ID Token → ID-JAG
+ DiscoverAndRequestJwtAuthGrantOptions jagOptions = DiscoverAndRequestJwtAuthGrantOptions
+ .builder()
+ .idpUrl(ctx.idpIssuer())
+ .idpTokenEndpoint(ctx.idpTokenEndpoint())
+ .idToken(ctx.idpIdToken())
+ .clientId(ctx.idpClientId())
+ .audience(assertionCtx.getAuthorizationServerUrl().toString())
+ .resource(assertionCtx.getResourceUrl().toString())
+ .build();
+ return EnterpriseAuth.discoverAndRequestJwtAuthorizationGrant(jagOptions, httpClient);
+ })
+ .build();
+
+ EnterpriseAuthProvider provider = new EnterpriseAuthProvider(options, httpClient);
+
+ HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(serverUrl)
+ .httpRequestCustomizer(provider)
+ .build();
+
+ McpSyncClient client = McpClient.sync(transport)
+ .clientInfo(new McpSchema.Implementation("test-client", "1.0.0"))
+ .requestTimeout(Duration.ofSeconds(30))
+ .build();
+
+ try {
+ client.initialize();
+ System.out.println("Successfully connected to MCP server");
+
+ client.listTools();
+ System.out.println("Successfully listed tools");
+ }
+ finally {
+ client.close();
+ System.out.println("Connection closed successfully");
+ }
+ }
+
+ /**
+ * Context provided by the conformance suite for the cross-app-access-complete-flow
+ * scenario via the {@code MCP_CONFORMANCE_CONTEXT} environment variable.
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ private record CrossAppAccessContext(@JsonProperty("client_id") String clientId,
+ @JsonProperty("client_secret") String clientSecret,
+ @JsonProperty("idp_client_id") String idpClientId,
+ @JsonProperty("idp_id_token") String idpIdToken, @JsonProperty("idp_issuer") String idpIssuer,
+ @JsonProperty("idp_token_endpoint") String idpTokenEndpoint) {
+ }
+
}
diff --git a/conformance-tests/client-spring-http-client/README.md b/conformance-tests/client-spring-http-client/README.md
index 876a86e1d..afbf64773 100644
--- a/conformance-tests/client-spring-http-client/README.md
+++ b/conformance-tests/client-spring-http-client/README.md
@@ -67,7 +67,7 @@ cd conformance-tests/client-spring-http-client
This creates an executable JAR at:
```
-target/client-spring-http-client-0.18.0-SNAPSHOT.jar
+target/client-spring-http-client-1.1.0-SNAPSHOT.jar
```
## Running Tests
@@ -79,7 +79,7 @@ Run the full auth suite:
```bash
npx @modelcontextprotocol/conformance@0.1.15 client \
--spec-version 2025-11-25 \
- --command "java -jar conformance-tests/client-spring-http-client/target/client-spring-http-client-0.18.0-SNAPSHOT.jar" \
+ --command "java -jar conformance-tests/client-spring-http-client/target/client-spring-http-client-1.1.0-SNAPSHOT.jar" \
--suite auth
```
@@ -88,7 +88,7 @@ Run a single scenario:
```bash
npx @modelcontextprotocol/conformance@0.1.15 client \
--spec-version 2025-11-25 \
- --command "java -jar conformance-tests/client-spring-http-client/target/client-spring-http-client-0.18.0-SNAPSHOT.jar" \
+ --command "java -jar conformance-tests/client-spring-http-client/target/client-spring-http-client-1.1.0-SNAPSHOT.jar" \
--scenario auth/metadata-default
```
@@ -97,7 +97,7 @@ Run with verbose output:
```bash
npx @modelcontextprotocol/conformance@0.1.15 client \
--spec-version 2025-11-25 \
- --command "java -jar conformance-tests/client-spring-http-client/target/client-spring-http-client-0.18.0-SNAPSHOT.jar" \
+ --command "java -jar conformance-tests/client-spring-http-client/target/client-spring-http-client-1.1.0-SNAPSHOT.jar" \
--scenario auth/metadata-default \
--verbose
```
@@ -108,7 +108,7 @@ You can also run the client manually if you have a test server:
```bash
export MCP_CONFORMANCE_SCENARIO=auth/metadata-default
-java -jar conformance-tests/client-spring-http-client/target/client-spring-http-client-0.18.0-SNAPSHOT.jar http://localhost:3000/mcp
+java -jar conformance-tests/client-spring-http-client/target/client-spring-http-client-1.1.0-SNAPSHOT.jar http://localhost:3000/mcp
```
## Known Issues
diff --git a/conformance-tests/client-spring-http-client/pom.xml b/conformance-tests/client-spring-http-client/pom.xml
index 94923fb5c..06b53887d 100644
--- a/conformance-tests/client-spring-http-client/pom.xml
+++ b/conformance-tests/client-spring-http-client/pom.xml
@@ -4,14 +4,11 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 4.0.2
-
+ io.modelcontextprotocol.sdk
+ conformance-tests
+ 2.0.0-SNAPSHOT
- io.modelcontextprotocol.sdkclient-spring-http-client
- 1.0.0-SNAPSHOTjarMCP Conformance Tests - Spring HTTP ClientSpring HTTP Client conformance tests for the Java MCP SDK
@@ -19,16 +16,29 @@
https://github.com/modelcontextprotocol/java-sdk
- git://github.com/modelcontextprotocol/java-sdk.git
- git@github.com/modelcontextprotocol/java-sdk.git
+ scm:git:git://github.com/modelcontextprotocol/java-sdk.git
+ scm:git:ssh://git@github.com/modelcontextprotocol/java-sdk.git17
+ 4.0.22.0.0-M2true
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${spring-boot.version}
+ pom
+ import
+
+
+
+
org.springframework.boot
@@ -63,6 +73,14 @@
org.springframework.bootspring-boot-maven-plugin
+ ${spring-boot.version}
+
+
+
+ repackage
+
+
+
@@ -88,4 +106,4 @@
-
+
\ No newline at end of file
diff --git a/conformance-tests/client-spring-http-client/src/main/java/io/modelcontextprotocol/conformance/client/configuration/DefaultConfiguration.java b/conformance-tests/client-spring-http-client/src/main/java/io/modelcontextprotocol/conformance/client/configuration/DefaultConfiguration.java
index acf26d94e..12a9c4a5c 100644
--- a/conformance-tests/client-spring-http-client/src/main/java/io/modelcontextprotocol/conformance/client/configuration/DefaultConfiguration.java
+++ b/conformance-tests/client-spring-http-client/src/main/java/io/modelcontextprotocol/conformance/client/configuration/DefaultConfiguration.java
@@ -24,8 +24,8 @@ public class DefaultConfiguration {
@Bean
DefaultScenario defaultScenario(McpClientRegistrationRepository clientRegistrationRepository,
- ServletWebServerApplicationContext serverCtx,
- OAuth2AuthorizedClientRepository oAuth2AuthorizedClientRepository) {
+ ServletWebServerApplicationContext serverCtx,
+ OAuth2AuthorizedClientRepository oAuth2AuthorizedClientRepository) {
return new DefaultScenario(clientRegistrationRepository, serverCtx, oAuth2AuthorizedClientRepository);
}
diff --git a/conformance-tests/client-spring-http-client/src/main/java/io/modelcontextprotocol/conformance/client/scenario/DefaultScenario.java b/conformance-tests/client-spring-http-client/src/main/java/io/modelcontextprotocol/conformance/client/scenario/DefaultScenario.java
index d82637de9..907cea10d 100644
--- a/conformance-tests/client-spring-http-client/src/main/java/io/modelcontextprotocol/conformance/client/scenario/DefaultScenario.java
+++ b/conformance-tests/client-spring-http-client/src/main/java/io/modelcontextprotocol/conformance/client/scenario/DefaultScenario.java
@@ -29,8 +29,7 @@
public class DefaultScenario implements Scenario {
- private static final Logger log = LoggerFactory
- .getLogger(DefaultScenario.class);
+ private static final Logger log = LoggerFactory.getLogger(DefaultScenario.class);
private final ServletWebServerApplicationContext serverCtx;
@@ -39,8 +38,8 @@ public class DefaultScenario implements Scenario {
private McpSyncClient client;
public DefaultScenario(McpClientRegistrationRepository clientRegistrationRepository,
- ServletWebServerApplicationContext serverCtx,
- OAuth2AuthorizedClientRepository oAuth2AuthorizedClientRepository) {
+ ServletWebServerApplicationContext serverCtx,
+ OAuth2AuthorizedClientRepository oAuth2AuthorizedClientRepository) {
this.serverCtx = serverCtx;
this.authorizedClientManager = new DefaultOAuth2AuthorizedClientManager(clientRegistrationRepository,
oAuth2AuthorizedClientRepository);
diff --git a/conformance-tests/conformance-baseline.yml b/conformance-tests/conformance-baseline.yml
index 4ab144063..d2990c155 100644
--- a/conformance-tests/conformance-baseline.yml
+++ b/conformance-tests/conformance-baseline.yml
@@ -2,11 +2,6 @@
# This file lists known failing scenarios that are expected to fail until fixed.
# See: https://github.com/modelcontextprotocol/conformance/blob/main/SDK_INTEGRATION.md
-server:
- # Resource subscription not implemented in SDK
- - resources-subscribe
- - resources-unsubscribe
-
client:
# SSE retry field handling not implemented
# - Client does not parse or respect retry: field timing
diff --git a/conformance-tests/pom.xml b/conformance-tests/pom.xml
index d1bef2a24..88ab7c4b0 100644
--- a/conformance-tests/pom.xml
+++ b/conformance-tests/pom.xml
@@ -6,7 +6,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 1.1.0-SNAPSHOT
+ 2.0.0-SNAPSHOTconformance-testspom
@@ -16,18 +16,18 @@
https://github.com/modelcontextprotocol/java-sdk
- git://github.com/modelcontextprotocol/java-sdk.git
- git@github.com/modelcontextprotocol/java-sdk.git
+ scm:git:git://github.com/modelcontextprotocol/java-sdk.git
+ scm:git:ssh://git@github.com/modelcontextprotocol/java-sdk.gittrue
-
+
client-jdk-http-clientclient-spring-http-clientserver-servlet
-
+
\ No newline at end of file
diff --git a/conformance-tests/server-servlet/README.md b/conformance-tests/server-servlet/README.md
index bd86636b6..ef327ecf6 100644
--- a/conformance-tests/server-servlet/README.md
+++ b/conformance-tests/server-servlet/README.md
@@ -4,7 +4,7 @@ This module contains a comprehensive MCP (Model Context Protocol) server impleme
## Conformance Test Results
-**Status: 37 out of 40 tests passing (92.5%)**
+**Status: 40 out of 40 tests passing (100%)**
The server has been validated against the official [MCP conformance test suite](https://github.com/modelcontextprotocol/conformance). See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for detailed results.
@@ -22,9 +22,8 @@ The server has been validated against the official [MCP conformance test suite](
- SEP-1034: Default values for all primitive types
- SEP-1330: All enum schema variants
-✅ **Resources** (4/6)
-- List, read text/binary, templates
-- ⚠️ Subscribe/unsubscribe (SDK limitation)
+✅ **Resources** (6/6)
+- List, read text/binary, templates, subscribe, unsubscribe
✅ **Prompts** (4/4)
- Simple, parameterized, embedded resources, images
@@ -191,12 +190,7 @@ curl -X POST http://localhost:8080/mcp \
## Known Limitations
-See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for details on:
-
-1. **Resource Subscriptions** - Not implemented in Java SDK
-2. **DNS Rebinding Protection** - Missing Host/Origin validation
-
-These are SDK-level limitations that require fixes in the core framework.
+See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for details on remaining client-side limitations.
## References
diff --git a/conformance-tests/server-servlet/pom.xml b/conformance-tests/server-servlet/pom.xml
index 68da42158..a80c7c4ec 100644
--- a/conformance-tests/server-servlet/pom.xml
+++ b/conformance-tests/server-servlet/pom.xml
@@ -6,7 +6,7 @@
io.modelcontextprotocol.sdkconformance-tests
- 1.1.0-SNAPSHOT
+ 2.0.0-SNAPSHOTserver-servletjar
@@ -16,8 +16,8 @@
https://github.com/modelcontextprotocol/java-sdk
- git://github.com/modelcontextprotocol/java-sdk.git
- git@github.com/modelcontextprotocol/java-sdk.git
+ scm:git:git://github.com/modelcontextprotocol/java-sdk.git
+ scm:git:ssh://git@github.com/modelcontextprotocol/java-sdk.git
@@ -28,7 +28,7 @@
io.modelcontextprotocol.sdkmcp
- 1.1.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
@@ -65,6 +65,7 @@
3.1.0io.modelcontextprotocol.conformance.server.ConformanceServlet
+ false
diff --git a/docs/client.md b/docs/client.md
index 6a99928c5..1702936f0 100644
--- a/docs/client.md
+++ b/docs/client.md
@@ -408,6 +408,49 @@ Resources represent server-side data sources that clients can access using URI t
.subscribe();
```
+### Resource Subscriptions
+
+When the server advertises `resources.subscribe` support, clients can subscribe to individual resources and receive a callback whenever the server pushes a `notifications/resources/updated` notification for that URI. The SDK automatically re-reads the resource on notification and delivers the updated contents to the registered consumer.
+
+Register a consumer on the client builder, then subscribe/unsubscribe at any time:
+
+=== "Sync API"
+
+ ```java
+ McpSyncClient client = McpClient.sync(transport)
+ .resourcesUpdateConsumer(contents -> {
+ // called with the updated resource contents after each notification
+ System.out.println("Resource updated: " + contents);
+ })
+ .build();
+
+ client.initialize();
+
+ // Subscribe to a specific resource URI
+ client.subscribeResource(new McpSchema.SubscribeRequest("custom://resource"));
+
+ // ... later, stop receiving updates
+ client.unsubscribeResource(new McpSchema.UnsubscribeRequest("custom://resource"));
+ ```
+
+=== "Async API"
+
+ ```java
+ McpAsyncClient client = McpClient.async(transport)
+ .resourcesUpdateConsumer(contents -> Mono.fromRunnable(() -> {
+ System.out.println("Resource updated: " + contents);
+ }))
+ .build();
+
+ client.initialize()
+ .then(client.subscribeResource(new McpSchema.SubscribeRequest("custom://resource")))
+ .subscribe();
+
+ // ... later, stop receiving updates
+ client.unsubscribeResource(new McpSchema.UnsubscribeRequest("custom://resource"))
+ .subscribe();
+ ```
+
### Prompt System
The prompt system enables interaction with server-side prompt templates. These templates can be discovered and executed with custom parameters, allowing for dynamic text generation based on predefined patterns.
diff --git a/docs/server.md b/docs/server.md
index 0753726e2..f9f3aa683 100644
--- a/docs/server.md
+++ b/docs/server.md
@@ -33,7 +33,7 @@ The server supports both synchronous and asynchronous APIs, allowing for flexibl
McpSyncServer syncServer = McpServer.sync(transportProvider)
.serverInfo("my-server", "1.0.0")
.capabilities(ServerCapabilities.builder()
- .resources(false, true) // Enable resource support with list changes
+ .resources(false, true) // Resource support: subscribe=false, listChanged=true
.tools(true) // Enable tool support with list changes
.prompts(true) // Enable prompt support with list changes
.completions() // Enable completions support
@@ -57,7 +57,7 @@ The server supports both synchronous and asynchronous APIs, allowing for flexibl
McpAsyncServer asyncServer = McpServer.async(transportProvider)
.serverInfo("my-server", "1.0.0")
.capabilities(ServerCapabilities.builder()
- .resources(false, true) // Enable resource support with list changes
+ .resources(false, true) // Resource support: subscribe=false, listChanged=true
.tools(true) // Enable tool support with list changes
.prompts(true) // Enable prompt support with list changes
.completions() // Enable completions support
@@ -319,7 +319,7 @@ The server can be configured with various capabilities:
```java
var capabilities = ServerCapabilities.builder()
- .resources(false, true) // Resource support (subscribe, listChanged)
+ .resources(true, true) // Resource support: subscribe=true, listChanged=true
.tools(true) // Tool support with list changes notifications
.prompts(true) // Prompt support with list changes notifications
.completions() // Enable completions support
@@ -438,6 +438,42 @@ Resources provide context to AI models by exposing data such as: File contents,
);
```
+### Resource Subscriptions
+
+When the `subscribe` capability is enabled, clients can subscribe to specific resources and receive targeted `notifications/resources/updated` notifications when those resources change. Only sessions that have explicitly subscribed to a given URI receive the notification — not every connected client.
+
+Enable subscription support in the server capabilities:
+
+```java
+McpSyncServer server = McpServer.sync(transportProvider)
+ .serverInfo("my-server", "1.0.0")
+ .capabilities(ServerCapabilities.builder()
+ .resources(true, false) // subscribe=true, listChanged=false
+ .build())
+ .resources(myResourceSpec)
+ .build();
+```
+
+When a subscribed resource changes, notify only the interested sessions:
+
+=== "Sync"
+
+ ```java
+ server.notifyResourcesUpdated(
+ new McpSchema.ResourcesUpdatedNotification("custom://resource")
+ );
+ ```
+
+=== "Async"
+
+ ```java
+ server.notifyResourcesUpdated(
+ new McpSchema.ResourcesUpdatedNotification("custom://resource")
+ ).subscribe();
+ ```
+
+If no sessions are subscribed to the given URI the call completes immediately without sending any messages. Subscription state is automatically cleaned up when a client session closes.
+
### Resource Template Specification
Resource templates allow servers to expose parameterized resources using URI templates:
diff --git a/mcp-bom/pom.xml b/mcp-bom/pom.xml
index fb6f3a32a..303520517 100644
--- a/mcp-bom/pom.xml
+++ b/mcp-bom/pom.xml
@@ -7,7 +7,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 1.1.0-SNAPSHOT
+ 2.0.0-SNAPSHOTmcp-bom
@@ -16,13 +16,13 @@
Java SDK MCP BOMJava SDK MCP Bill of Materials
- https://github.com/modelcontextprotocol/java-sdk
+ https://github.com/modelcontextprotocol/java-sdk
-
- https://github.com/modelcontextprotocol/java-sdk
- git://github.com/modelcontextprotocol/java-sdk.git
- git@github.com/modelcontextprotocol/java-sdk.git
-
+
+ https://github.com/modelcontextprotocol/java-sdk
+ scm:git:git://github.com/modelcontextprotocol/java-sdk.git
+ scm:git:ssh://git@github.com/modelcontextprotocol/java-sdk.git
+
@@ -47,6 +47,13 @@
${project.version}
+
+
+ io.modelcontextprotocol.sdk
+ mcp-json-jackson3
+ ${project.version}
+
+
io.modelcontextprotocol.sdk
diff --git a/mcp-core/pom.xml b/mcp-core/pom.xml
index 4de0fba2b..d622df0d1 100644
--- a/mcp-core/pom.xml
+++ b/mcp-core/pom.xml
@@ -6,7 +6,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 1.1.0-SNAPSHOT
+ 2.0.0-SNAPSHOTmcp-corejar
@@ -16,8 +16,8 @@
https://github.com/modelcontextprotocol/java-sdk
- git://github.com/modelcontextprotocol/java-sdk.git
- git@github.com/modelcontextprotocol/java-sdk.git
+ scm:git:git://github.com/modelcontextprotocol/java-sdk.git
+ scm:git:ssh://git@github.com/modelcontextprotocol/java-sdk.git
@@ -164,14 +164,14 @@
test
-
-
- com.google.code.gson
- gson
- 2.10.1
- test
-
+
+
+ com.google.code.gson
+ gson
+ 2.10.1
+ test
+
-
+
\ No newline at end of file
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/AuthServerMetadata.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/AuthServerMetadata.java
new file mode 100644
index 000000000..321943a41
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/AuthServerMetadata.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.auth;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * OAuth 2.0 Authorization Server Metadata as defined by RFC 8414.
+ *
+ * Used during Enterprise Managed Authorization (SEP-990) to discover the token endpoint
+ * of the enterprise Identity Provider and the MCP authorization server.
+ *
+ * @author MCP SDK Contributors
+ * @see RFC 8414
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AuthServerMetadata {
+
+ @JsonProperty("issuer")
+ private String issuer;
+
+ @JsonProperty("token_endpoint")
+ private String tokenEndpoint;
+
+ @JsonProperty("authorization_endpoint")
+ private String authorizationEndpoint;
+
+ @JsonProperty("jwks_uri")
+ private String jwksUri;
+
+ public String getIssuer() {
+ return issuer;
+ }
+
+ public void setIssuer(String issuer) {
+ this.issuer = issuer;
+ }
+
+ public String getTokenEndpoint() {
+ return tokenEndpoint;
+ }
+
+ public void setTokenEndpoint(String tokenEndpoint) {
+ this.tokenEndpoint = tokenEndpoint;
+ }
+
+ public String getAuthorizationEndpoint() {
+ return authorizationEndpoint;
+ }
+
+ public void setAuthorizationEndpoint(String authorizationEndpoint) {
+ this.authorizationEndpoint = authorizationEndpoint;
+ }
+
+ public String getJwksUri() {
+ return jwksUri;
+ }
+
+ public void setJwksUri(String jwksUri) {
+ this.jwksUri = jwksUri;
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/DiscoverAndRequestJwtAuthGrantOptions.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/DiscoverAndRequestJwtAuthGrantOptions.java
new file mode 100644
index 000000000..22f5921b3
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/DiscoverAndRequestJwtAuthGrantOptions.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.auth;
+
+import java.util.Objects;
+
+/**
+ * Options for {@link EnterpriseAuth#discoverAndRequestJwtAuthorizationGrant} — extends
+ * {@link RequestJwtAuthGrantOptions} with IdP discovery support.
+ *
+ * Performs step 1 of the Enterprise Managed Authorization (SEP-990) flow by first
+ * discovering the IdP token endpoint via RFC 8414 metadata discovery, then requesting the
+ * JAG.
+ *
+ * If {@link #getIdpTokenEndpoint()} is provided it is used directly and discovery is
+ * skipped.
+ *
+ * @author MCP SDK Contributors
+ */
+public class DiscoverAndRequestJwtAuthGrantOptions extends RequestJwtAuthGrantOptions {
+
+ /**
+ * The base URL of the enterprise IdP. Used as the root URL for RFC 8414 discovery
+ * ({@code /.well-known/oauth-authorization-server} or
+ * {@code /.well-known/openid-configuration}).
+ */
+ private final String idpUrl;
+
+ private DiscoverAndRequestJwtAuthGrantOptions(Builder builder) {
+ super(builder);
+ this.idpUrl = Objects.requireNonNull(builder.idpUrl, "idpUrl must not be null");
+ }
+
+ public String getIdpUrl() {
+ return idpUrl;
+ }
+
+ /**
+ * Returns the optional pre-configured IdP token endpoint. When non-null, RFC 8414
+ * discovery is skipped and this endpoint is used directly.
+ *
+ * This is a convenience method equivalent to {@link #getTokenEndpoint()}.
+ */
+ public String getIdpTokenEndpoint() {
+ return getTokenEndpoint();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder extends RequestJwtAuthGrantOptions.Builder {
+
+ private String idpUrl;
+
+ private Builder() {
+ }
+
+ public Builder idpUrl(String idpUrl) {
+ this.idpUrl = idpUrl;
+ return this;
+ }
+
+ /**
+ * Optional override for the IdP's token endpoint. When set, RFC 8414 discovery is
+ * skipped and this endpoint is used directly.
+ *
+ * Provides static async methods for each discrete step of the two-step enterprise auth
+ * protocol:
+ *
+ *
Step 1 — JAG request: Exchange an enterprise OIDC ID token for a JWT
+ * Authorization Grant (ID-JAG) at the enterprise IdP via RFC 8693 token exchange.
+ * Methods: {@link #requestJwtAuthorizationGrant} /
+ * {@link #discoverAndRequestJwtAuthorizationGrant}.
+ *
Step 2 — access token exchange: Exchange the JAG for an OAuth 2.0 access
+ * token at the MCP authorization server via RFC 7523 JWT Bearer grant. Method:
+ * {@link #exchangeJwtBearerGrant}.
+ *
+ *
+ * For a higher-level, stateful integration that handles both steps and caches the
+ * resulting access token, use {@link EnterpriseAuthProvider} instead.
+ *
+ * All methods return {@link Mono} and require a {@link java.net.http.HttpClient} to be
+ * provided by the caller. They do not manage the lifecycle of the client.
+ *
+ * @author MCP SDK Contributors
+ * @see EnterpriseAuthProvider
+ * @see RFC 8414 — Authorization
+ * Server Metadata
+ * @see RFC 8693 — Token
+ * Exchange
+ * @see RFC 7523 — JWT Bearer
+ * Grant
+ */
+public final class EnterpriseAuth {
+
+ private static final Logger logger = LoggerFactory.getLogger(EnterpriseAuth.class);
+
+ /**
+ * Token type URI for OIDC ID tokens, used as the {@code subject_token_type} in the
+ * RFC 8693 token exchange request.
+ */
+ public static final String TOKEN_TYPE_ID_TOKEN = "urn:ietf:params:oauth:token-type:id_token";
+
+ /**
+ * Token type URI for JWT Authorization Grants (ID-JAG), used as the
+ * {@code requested_token_type} in the token exchange request and validated as the
+ * {@code issued_token_type} in the response.
+ */
+ public static final String TOKEN_TYPE_ID_JAG = "urn:ietf:params:oauth:token-type:id-jag";
+
+ /**
+ * Grant type URI for RFC 8693 token exchange requests.
+ */
+ public static final String GRANT_TYPE_TOKEN_EXCHANGE = "urn:ietf:params:oauth:grant-type:token-exchange";
+
+ /**
+ * Grant type URI for RFC 7523 JWT Bearer grant requests.
+ */
+ public static final String GRANT_TYPE_JWT_BEARER = "urn:ietf:params:oauth:grant-type:jwt-bearer";
+
+ private static final String WELL_KNOWN_OAUTH = "/.well-known/oauth-authorization-server";
+
+ private static final String WELL_KNOWN_OPENID = "/.well-known/openid-configuration";
+
+ private EnterpriseAuth() {
+ }
+
+ // -----------------------------------------------------------------------
+ // Authorization server discovery (RFC 8414)
+ // -----------------------------------------------------------------------
+
+ /**
+ * Discovers the OAuth 2.0 authorization server metadata for the given base URL using
+ * RFC 8414.
+ *
+ * First attempts to retrieve metadata from
+ * {@code {url}/.well-known/oauth-authorization-server}. If that fails (non-200
+ * response or network error), falls back to
+ * {@code {url}/.well-known/openid-configuration}.
+ * @param url the base URL of the authorization server or resource server
+ * @param httpClient the HTTP client to use for the discovery request
+ * @return a {@link Mono} emitting the parsed {@link AuthServerMetadata}, or an error
+ * of type {@link EnterpriseAuthException} if discovery fails
+ */
+ public static Mono discoverAuthServerMetadata(String url, HttpClient httpClient) {
+ String baseUrl = url.endsWith("/") ? url.substring(0, url.length() - 1) : url;
+ String oauthDiscoveryUrl = baseUrl + WELL_KNOWN_OAUTH;
+ String openIdDiscoveryUrl = baseUrl + WELL_KNOWN_OPENID;
+ logger.debug("Discovering authorization server metadata for {}", baseUrl);
+ return fetchAuthServerMetadata(oauthDiscoveryUrl, httpClient)
+ .onErrorResume(e -> fetchAuthServerMetadata(openIdDiscoveryUrl, httpClient));
+ }
+
+ private static Mono fetchAuthServerMetadata(String url, HttpClient httpClient) {
+ return Mono.fromFuture(() -> {
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .GET()
+ .header("Accept", "application/json")
+ .build();
+ return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
+ }).flatMap(response -> {
+ if (response.statusCode() != 200) {
+ return Mono.error(new EnterpriseAuthException("Failed to discover authorization server metadata from "
+ + url + ": HTTP " + response.statusCode()));
+ }
+ try {
+ McpJsonMapper mapper = McpJsonDefaults.getMapper();
+ AuthServerMetadata metadata = mapper.readValue(response.body(), AuthServerMetadata.class);
+ logger.debug("Discovered authorization server metadata from {}: issuer={}, tokenEndpoint={}", url,
+ metadata.getIssuer(), metadata.getTokenEndpoint());
+ return Mono.just(metadata);
+ }
+ catch (Exception e) {
+ return Mono
+ .error(new EnterpriseAuthException("Failed to parse authorization server metadata from " + url, e));
+ }
+ });
+ }
+
+ // -----------------------------------------------------------------------
+ // Step 1 — JAG request (RFC 8693 token exchange)
+ // -----------------------------------------------------------------------
+
+ /**
+ * Requests a JWT Authorization Grant (ID-JAG) by performing an RFC 8693 token
+ * exchange at the specified token endpoint.
+ *
+ * Exchanges the enterprise OIDC ID token for an ID-JAG that can subsequently be
+ * presented to the MCP authorization server via {@link #exchangeJwtBearerGrant}.
+ *
+ * Validates that the response {@code issued_token_type} equals
+ * {@link #TOKEN_TYPE_ID_JAG} and that {@code token_type} is {@code N_A}
+ * (case-insensitive) per RFC 8693 §2.2.1.
+ * @param options request parameters including the IdP token endpoint, ID token, and
+ * client credentials
+ * @param httpClient the HTTP client to use
+ * @return a {@link Mono} emitting the JAG (the {@code access_token} value from the
+ * exchange response), or an error of type {@link EnterpriseAuthException}
+ */
+ public static Mono requestJwtAuthorizationGrant(RequestJwtAuthGrantOptions options, HttpClient httpClient) {
+ return Mono.defer(() -> {
+ List params = new ArrayList<>();
+ params.add(encodeParam("grant_type", GRANT_TYPE_TOKEN_EXCHANGE));
+ params.add(encodeParam("subject_token", options.getIdToken()));
+ params.add(encodeParam("subject_token_type", TOKEN_TYPE_ID_TOKEN));
+ params.add(encodeParam("requested_token_type", TOKEN_TYPE_ID_JAG));
+ params.add(encodeParam("client_id", options.getClientId()));
+ if (options.getClientSecret() != null) {
+ params.add(encodeParam("client_secret", options.getClientSecret()));
+ }
+ if (options.getAudience() != null) {
+ params.add(encodeParam("audience", options.getAudience()));
+ }
+ if (options.getResource() != null) {
+ params.add(encodeParam("resource", options.getResource()));
+ }
+ if (options.getScope() != null) {
+ params.add(encodeParam("scope", options.getScope()));
+ }
+ String body = String.join("&", params);
+ logger.debug("Requesting JAG token exchange at {}", options.getTokenEndpoint());
+ HttpRequest request = HttpRequest.newBuilder(URI.create(options.getTokenEndpoint()))
+ .POST(HttpRequest.BodyPublishers.ofString(body))
+ .header("Content-Type", "application/x-www-form-urlencoded")
+ .header("Accept", "application/json")
+ .build();
+ return Mono.fromFuture(() -> httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
+ }).flatMap(response -> {
+ if (response.statusCode() != 200) {
+ return Mono.error(new EnterpriseAuthException(
+ "JAG token exchange failed: HTTP " + response.statusCode() + " - " + response.body()));
+ }
+ try {
+ McpJsonMapper mapper = McpJsonDefaults.getMapper();
+ JagTokenExchangeResponse tokenResponse = mapper.readValue(response.body(),
+ JagTokenExchangeResponse.class);
+
+ // Validate per RFC 8693 §2.2.1
+ return validateJAGTokenExchangeResponse(tokenResponse)
+ .doOnNext(token -> logger.debug("JAG token exchange successful"));
+ }
+ catch (EnterpriseAuthException e) {
+ return Mono.error(e);
+ }
+ catch (Exception e) {
+ return Mono.error(new EnterpriseAuthException("Failed to parse JAG token exchange response", e));
+ }
+ });
+ }
+
+ /**
+ * Discovers the enterprise IdP's token endpoint via RFC 8414, then requests a JAG via
+ * RFC 8693 token exchange.
+ *
+ * If {@link DiscoverAndRequestJwtAuthGrantOptions#getIdpTokenEndpoint()} is set, the
+ * discovery step is skipped and the provided endpoint is used directly.
+ * @param options request parameters including the IdP base URL (for discovery), ID
+ * token, and client credentials
+ * @param httpClient the HTTP client to use
+ * @return a {@link Mono} emitting the JAG string, or an error of type
+ * {@link EnterpriseAuthException}
+ */
+ public static Mono discoverAndRequestJwtAuthorizationGrant(DiscoverAndRequestJwtAuthGrantOptions options,
+ HttpClient httpClient) {
+ Mono tokenEndpointMono;
+ // If the caller already discovered (or otherwise knows) the IdP token endpoint,
+ // skip RFC 8414 metadata discovery and use the pre-configured value directly.
+ if (options.getIdpTokenEndpoint() != null) {
+ tokenEndpointMono = Mono.just(options.getIdpTokenEndpoint());
+ }
+ else {
+ tokenEndpointMono = discoverAuthServerMetadata(options.getIdpUrl(), httpClient).flatMap(metadata -> {
+ if (metadata.getTokenEndpoint() == null) {
+ return Mono.error(new EnterpriseAuthException("No token_endpoint in IdP metadata at "
+ + options.getIdpUrl() + ". Ensure the IdP supports RFC 8414."));
+ }
+ return Mono.just(metadata.getTokenEndpoint());
+ });
+ }
+
+ return tokenEndpointMono.flatMap(tokenEndpoint -> {
+ RequestJwtAuthGrantOptions grantOptions = RequestJwtAuthGrantOptions.builder()
+ .tokenEndpoint(tokenEndpoint)
+ .idToken(options.getIdToken())
+ .clientId(options.getClientId())
+ .clientSecret(options.getClientSecret())
+ .audience(options.getAudience())
+ .resource(options.getResource())
+ .scope(options.getScope())
+ .build();
+ return requestJwtAuthorizationGrant(grantOptions, httpClient);
+ });
+ }
+
+ // -----------------------------------------------------------------------
+ // Step 2 — JWT Bearer grant exchange (RFC 7523)
+ // -----------------------------------------------------------------------
+
+ /**
+ * Exchanges a JWT Authorization Grant (ID-JAG) for an OAuth 2.0 access token at the
+ * MCP authorization server's token endpoint using RFC 7523.
+ *
+ * The returned {@link JwtBearerAccessTokenResponse} includes the access token and, if
+ * the server provided an {@code expires_in} value, an absolute
+ * {@link JwtBearerAccessTokenResponse#getExpiresAt() expiresAt} timestamp computed
+ * from the current system time.
+ * @param options request parameters including the MCP auth server token endpoint, JAG
+ * assertion, and client credentials
+ * @param httpClient the HTTP client to use
+ * @return a {@link Mono} emitting the {@link JwtBearerAccessTokenResponse}, or an
+ * error of type {@link EnterpriseAuthException}
+ */
+ public static Mono exchangeJwtBearerGrant(ExchangeJwtBearerGrantOptions options,
+ HttpClient httpClient) {
+ return Mono.defer(() -> {
+ List params = new ArrayList<>();
+ params.add(encodeParam("grant_type", GRANT_TYPE_JWT_BEARER));
+ params.add(encodeParam("assertion", options.getAssertion()));
+ if (options.getScope() != null) {
+ params.add(encodeParam("scope", options.getScope()));
+ }
+ String body = String.join("&", params);
+ // Use client_secret_basic (RFC 6749 §2.3.1): send credentials in the
+ // Authorization header rather than the request body. This matches the
+ // token_endpoint_auth_method declared by the provider and is required by
+ // SEP-990 conformance tests.
+ String secret = options.getClientSecret() != null ? options.getClientSecret() : "";
+ String credentials = Base64.getEncoder()
+ .encodeToString((options.getClientId() + ":" + secret).getBytes(StandardCharsets.UTF_8));
+ logger.debug("Exchanging JWT bearer grant at {}", options.getTokenEndpoint());
+ HttpRequest request = HttpRequest.newBuilder(URI.create(options.getTokenEndpoint()))
+ .POST(HttpRequest.BodyPublishers.ofString(body))
+ .header("Content-Type", "application/x-www-form-urlencoded")
+ .header("Accept", "application/json")
+ .header("Authorization", "Basic " + credentials)
+ .build();
+ return Mono.fromFuture(() -> httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
+ }).flatMap(response -> {
+ if (response.statusCode() != 200) {
+ return Mono.error(new EnterpriseAuthException(
+ "JWT bearer grant exchange failed: HTTP " + response.statusCode() + " - " + response.body()));
+ }
+ try {
+ McpJsonMapper mapper = McpJsonDefaults.getMapper();
+ JwtBearerAccessTokenResponse tokenResponse = mapper.readValue(response.body(),
+ JwtBearerAccessTokenResponse.class);
+
+ if (tokenResponse.getAccessToken() == null || tokenResponse.getAccessToken().isBlank()) {
+ return Mono.error(
+ new EnterpriseAuthException("JWT bearer grant exchange response is missing access_token"));
+ }
+ // Compute absolute expiry from relative expires_in
+ if (tokenResponse.getExpiresIn() != null) {
+ tokenResponse.setExpiresAt(Instant.now().plusSeconds(tokenResponse.getExpiresIn()));
+ }
+ // RFC 7523 (JWT Bearer Grant) is a stateless grant: the client presents a
+ // signed JWT assertion directly to obtain an access token, with no
+ // authorization code or refresh token involved. If the AS returns a
+ // refresh_token anyway, it is intentionally ignored — using it would
+ // allow the client to obtain new access tokens without re-validating the
+ // enterprise identity via the IdP, bypassing IdP session and revocation
+ // policies. When the access token expires, repeat the full enterprise
+ // auth flow to obtain a fresh token.
+ logger.debug("JWT bearer grant exchange successful; expires_in={}", tokenResponse.getExpiresIn());
+ return Mono.just(tokenResponse);
+ }
+ catch (EnterpriseAuthException e) {
+ return Mono.error(e);
+ }
+ catch (Exception e) {
+ return Mono.error(new EnterpriseAuthException("Failed to parse JWT bearer grant exchange response", e));
+ }
+ });
+ }
+
+ // -----------------------------------------------------------------------
+ // Internal helpers
+ // -----------------------------------------------------------------------
+
+ /**
+ * Validates the RFC 8693 token exchange response for a JAG request.
+ * @param tokenResponse the parsed response
+ * @return a {@link Mono} emitting the {@code access_token} value, or an error of type
+ * {@link EnterpriseAuthException} if any validation check fails
+ */
+ /**
+ * Validates the RFC 8693 token exchange response for a JAG request.
+ *
+ * Validates {@code issued_token_type} and the presence of {@code access_token}.
+ * {@code token_type} is intentionally not validated: per RFC 8693 §2.2.1 it is
+ * informational when the issued token is not an access token, and per RFC 6749 §5.1
+ * it is case-insensitive — strict {@code N_A} checking would reject conformant IdPs
+ * that omit or capitalise the field differently.
+ */
+ private static Mono validateJAGTokenExchangeResponse(JagTokenExchangeResponse tokenResponse) {
+ if (!TOKEN_TYPE_ID_JAG.equalsIgnoreCase(tokenResponse.getIssuedTokenType())) {
+ return Mono.error(new EnterpriseAuthException("Unexpected issued_token_type in JAG response: "
+ + tokenResponse.getIssuedTokenType() + " (expected " + TOKEN_TYPE_ID_JAG + ")"));
+ }
+ if (tokenResponse.getAccessToken() == null || tokenResponse.getAccessToken().isBlank()) {
+ return Mono.error(new EnterpriseAuthException("JAG token exchange response is missing access_token"));
+ }
+ return Mono.just(tokenResponse.getAccessToken());
+ }
+
+ /**
+ * URL-encodes a form parameter key-value pair as {@code key=value}.
+ */
+ private static String encodeParam(String key, String value) {
+ return encode(key) + "=" + encode(value);
+ }
+
+ private static String encode(String value) {
+ return URLEncoder.encode(value, StandardCharsets.UTF_8);
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthAssertionContext.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthAssertionContext.java
new file mode 100644
index 000000000..6726f6d00
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthAssertionContext.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.auth;
+
+import java.net.URI;
+import java.util.Objects;
+
+/**
+ * Context passed to the assertion callback in {@link EnterpriseAuthProvider}.
+ *
+ * Contains the resource URL of the MCP server and the URL of the authorization server
+ * that was discovered for that resource. The callback uses this context to obtain a
+ * suitable assertion (e.g., an OIDC ID token) from the enterprise IdP.
+ *
+ * @author MCP SDK Contributors
+ */
+public class EnterpriseAuthAssertionContext {
+
+ private final URI resourceUrl;
+
+ private final URI authorizationServerUrl;
+
+ /**
+ * Creates a new {@link EnterpriseAuthAssertionContext}.
+ * @param resourceUrl the URL of the MCP resource being accessed (must not be
+ * {@code null})
+ * @param authorizationServerUrl the URL of the MCP authorization server discovered
+ * for the resource (must not be {@code null})
+ */
+ public EnterpriseAuthAssertionContext(URI resourceUrl, URI authorizationServerUrl) {
+ this.resourceUrl = Objects.requireNonNull(resourceUrl, "resourceUrl must not be null");
+ this.authorizationServerUrl = Objects.requireNonNull(authorizationServerUrl,
+ "authorizationServerUrl must not be null");
+ }
+
+ /**
+ * Returns the URL of the MCP resource being accessed.
+ */
+ public URI getResourceUrl() {
+ return resourceUrl;
+ }
+
+ /**
+ * Returns the URL of the MCP authorization server for the resource.
+ */
+ public URI getAuthorizationServerUrl() {
+ return authorizationServerUrl;
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthException.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthException.java
new file mode 100644
index 000000000..26d4e87dd
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthException.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.auth;
+
+/**
+ * Exception thrown when an error occurs during the Enterprise Managed Authorization
+ * (SEP-990) flow.
+ *
+ * @author MCP SDK Contributors
+ */
+public class EnterpriseAuthException extends RuntimeException {
+
+ /**
+ * Creates a new {@code EnterpriseAuthException} with the given message.
+ * @param message the error message
+ */
+ public EnterpriseAuthException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a new {@code EnterpriseAuthException} with the given message and cause.
+ * @param message the error message
+ * @param cause the underlying cause
+ */
+ public EnterpriseAuthException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthProvider.java
new file mode 100644
index 000000000..d9c5e0172
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthProvider.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.auth;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.common.McpTransportContext;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+/**
+ * Layer 3 implementation of Enterprise Managed Authorization (SEP-990).
+ *
+ * Implements {@link McpAsyncHttpClientRequestCustomizer} so that it can be registered
+ * directly with any HTTP transport. On each request it:
+ *
+ *
Checks an in-memory access token cache.
+ *
If the cache is empty or the token is expired (within the
+ * {@code TOKEN_EXPIRY_BUFFER}), it performs the full enterprise auth flow:
+ *
+ *
Discovers the MCP authorization server metadata via RFC 8414.
+ *
Invokes the {@link EnterpriseAuthProviderOptions#getAssertionCallback() assertion
+ * callback} to obtain a JWT Authorization Grant (ID-JAG) from the enterprise IdP.
+ *
Exchanges the JAG for an OAuth 2.0 access token via RFC 7523 at the MCP
+ * authorization server's token endpoint.
+ *
Caches the access token.
+ *
+ *
+ *
Adds an {@code Authorization: Bearer {token}} header to the outgoing request.
+ *
+ *
+ *
Usage
+ *
+ *
{@code
+ * EnterpriseAuthProvider provider = new EnterpriseAuthProvider(
+ * EnterpriseAuthProviderOptions.builder()
+ * .clientId("my-client-id")
+ * .clientSecret("my-client-secret")
+ * .assertionCallback(ctx -> {
+ * // Step 1: exchange your enterprise ID token for a JAG
+ * return EnterpriseAuth.discoverAndRequestJwtAuthorizationGrant(
+ * DiscoverAndRequestJwtAuthGrantOptions.builder()
+ * .idpUrl(ctx.getAuthorizationServerUrl().toString())
+ * .idToken(myIdTokenSupplier.get())
+ * .clientId("idp-client-id")
+ * .clientSecret("idp-client-secret")
+ * .build(),
+ * httpClient);
+ * })
+ * .build());
+ *
+ * // Register with an HTTP transport
+ * HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(serverUrl)
+ * .httpRequestCustomizer(provider)
+ * .build();
+ * }
+ *
+ * @author MCP SDK Contributors
+ * @see EnterpriseAuth
+ * @see EnterpriseAuthProviderOptions
+ */
+public class EnterpriseAuthProvider implements McpAsyncHttpClientRequestCustomizer {
+
+ private static final Logger logger = LoggerFactory.getLogger(EnterpriseAuthProvider.class);
+
+ /**
+ * Proactive refresh buffer: treat a token as expired this many seconds before its
+ * actual expiry to avoid using a token that expires mid-flight.
+ */
+ private static final Duration TOKEN_EXPIRY_BUFFER = Duration.ofSeconds(30);
+
+ private final EnterpriseAuthProviderOptions options;
+
+ private final HttpClient httpClient;
+
+ private final AtomicReference cachedTokenRef = new AtomicReference<>();
+
+ /**
+ * Creates a new {@link EnterpriseAuthProvider} using the default {@link HttpClient}.
+ * @param options provider options including client credentials and the assertion
+ * callback (must not be {@code null})
+ */
+ public EnterpriseAuthProvider(EnterpriseAuthProviderOptions options) {
+ this(options, HttpClient.newHttpClient());
+ }
+
+ /**
+ * Creates a new {@link EnterpriseAuthProvider} with a custom {@link HttpClient}.
+ *
+ * Use this constructor when you need to configure TLS, proxies, or other HTTP client
+ * settings.
+ * @param options provider options (must not be {@code null})
+ * @param httpClient the HTTP client to use for token discovery and exchange requests
+ * (must not be {@code null})
+ */
+ public EnterpriseAuthProvider(EnterpriseAuthProviderOptions options, HttpClient httpClient) {
+ this.options = Objects.requireNonNull(options, "options must not be null");
+ this.httpClient = Objects.requireNonNull(httpClient, "httpClient must not be null");
+ }
+
+ /**
+ * Injects an {@code Authorization: Bearer} header into the outgoing HTTP request,
+ * obtaining or refreshing the access token as needed.
+ */
+ @Override
+ public Publisher customize(HttpRequest.Builder builder, String method, URI endpoint,
+ String body, McpTransportContext context) {
+ return getAccessToken(endpoint).map(token -> builder.header("Authorization", "Bearer " + token));
+ }
+
+ /**
+ * Invalidates the cached access token, forcing the next request to perform a full
+ * enterprise auth flow.
+ *
+ * Useful after receiving a {@code 401 Unauthorized} response from the MCP server.
+ */
+ public void invalidateCache() {
+ logger.debug("Invalidating cached enterprise auth token");
+ cachedTokenRef.set(null);
+ }
+
+ // -----------------------------------------------------------------------
+ // Private helpers
+ // -----------------------------------------------------------------------
+
+ private Mono getAccessToken(URI endpoint) {
+ JwtBearerAccessTokenResponse cached = cachedTokenRef.get();
+ if (cached != null && !isExpiredOrNearlyExpired(cached)) {
+ logger.debug("Using cached enterprise auth token");
+ return Mono.just(cached.getAccessToken());
+ }
+ logger.debug("Cached enterprise auth token is absent or expired; fetching new token");
+ return fetchNewToken(endpoint).doOnNext(response -> {
+ cachedTokenRef.set(response);
+ logger.debug("Cached new enterprise auth token; expires_in={}",
+ response.getExpiresIn() != null ? response.getExpiresIn() + "s" : "unknown");
+ }).map(JwtBearerAccessTokenResponse::getAccessToken);
+ }
+
+ private boolean isExpiredOrNearlyExpired(JwtBearerAccessTokenResponse token) {
+ Instant expiresAt = token.getExpiresAt();
+ if (expiresAt == null) {
+ return false;
+ }
+ return Instant.now().isAfter(expiresAt.minus(TOKEN_EXPIRY_BUFFER));
+ }
+
+ private Mono fetchNewToken(URI endpoint) {
+ URI resourceBaseUri = deriveBaseUri(endpoint);
+ logger.debug("Discovering MCP authorization server for resource {}", resourceBaseUri);
+
+ return EnterpriseAuth.discoverAuthServerMetadata(resourceBaseUri.toString(), httpClient).flatMap(metadata -> {
+ if (metadata.getTokenEndpoint() == null) {
+ return Mono.error(new EnterpriseAuthException("No token_endpoint in authorization server metadata for "
+ + resourceBaseUri + ". Ensure the MCP server supports RFC 8414."));
+ }
+
+ // Resolve the authorization server URL: prefer issuer, fall back to base URI
+ URI authServerUri;
+ if (metadata.getIssuer() != null && !metadata.getIssuer().isBlank()) {
+ authServerUri = URI.create(metadata.getIssuer());
+ }
+ else {
+ authServerUri = resourceBaseUri;
+ }
+
+ EnterpriseAuthAssertionContext assertionContext = new EnterpriseAuthAssertionContext(resourceBaseUri,
+ authServerUri);
+ logger.debug("Invoking assertion callback for resourceUrl={}, authServerUrl={}", resourceBaseUri,
+ authServerUri);
+
+ return options.getAssertionCallback().apply(assertionContext).flatMap(assertion -> {
+ // Note: the ID-JAG obtained from the assertionCallback is used
+ // immediately
+ // for a single access-token exchange and is not cached. If the access
+ // token
+ // is short-lived, caching the ID-JAG at the callback level can reduce IdP
+ // round-trips, as the JAG may still be valid when the access token
+ // expires.
+ ExchangeJwtBearerGrantOptions exchangeOptions = ExchangeJwtBearerGrantOptions.builder()
+ .tokenEndpoint(metadata.getTokenEndpoint())
+ .assertion(assertion)
+ .clientId(options.getClientId())
+ .clientSecret(options.getClientSecret())
+ .scope(options.getScope())
+ .build();
+ return EnterpriseAuth.exchangeJwtBearerGrant(exchangeOptions, httpClient);
+ });
+ });
+ }
+
+ /**
+ * Extracts the scheme+host+port from the given URI, dropping any path, query, or
+ * fragment. This is the URL against which RFC 8414 discovery is performed.
+ */
+ private static URI deriveBaseUri(URI uri) {
+ int port = uri.getPort();
+ String base = uri.getScheme() + "://" + uri.getHost() + (port != -1 ? ":" + port : "");
+ return URI.create(base);
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthProviderOptions.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthProviderOptions.java
new file mode 100644
index 000000000..2f696f1c0
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/EnterpriseAuthProviderOptions.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.auth;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Configuration options for {@link EnterpriseAuthProvider}.
+ *
+ * At minimum, {@link #clientId} and {@link #assertionCallback} are required.
+ *
+ * @author MCP SDK Contributors
+ */
+public class EnterpriseAuthProviderOptions {
+
+ /**
+ * The OAuth 2.0 client ID registered at the MCP authorization server. Required.
+ */
+ private final String clientId;
+
+ /**
+ * The OAuth 2.0 client secret. Optional for public clients.
+ */
+ private final String clientSecret;
+
+ /**
+ * The {@code scope} parameter to request when exchanging the JWT bearer grant.
+ * Optional.
+ */
+ private final String scope;
+
+ /**
+ * Callback that obtains an assertion (ID token / JAG) for the given context.
+ *
+ * The callback receives an {@link EnterpriseAuthAssertionContext} describing the MCP
+ * resource and its authorization server, and must return a {@link Mono} that emits
+ * the assertion string (e.g., an OIDC ID token from the enterprise IdP).
+ *
+ * Required.
+ */
+ private final Function> assertionCallback;
+
+ private EnterpriseAuthProviderOptions(Builder builder) {
+ this.clientId = Objects.requireNonNull(builder.clientId, "clientId must not be null");
+ this.clientSecret = builder.clientSecret;
+ this.scope = builder.scope;
+ this.assertionCallback = Objects.requireNonNull(builder.assertionCallback,
+ "assertionCallback must not be null");
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getClientSecret() {
+ return clientSecret;
+ }
+
+ public String getScope() {
+ return scope;
+ }
+
+ public Function> getAssertionCallback() {
+ return assertionCallback;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private String clientId;
+
+ private String clientSecret;
+
+ private String scope;
+
+ private Function> assertionCallback;
+
+ private Builder() {
+ }
+
+ public Builder clientId(String clientId) {
+ this.clientId = clientId;
+ return this;
+ }
+
+ public Builder clientSecret(String clientSecret) {
+ this.clientSecret = clientSecret;
+ return this;
+ }
+
+ public Builder scope(String scope) {
+ this.scope = scope;
+ return this;
+ }
+
+ public Builder assertionCallback(Function> assertionCallback) {
+ this.assertionCallback = assertionCallback;
+ return this;
+ }
+
+ public EnterpriseAuthProviderOptions build() {
+ return new EnterpriseAuthProviderOptions(this);
+ }
+
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/ExchangeJwtBearerGrantOptions.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/ExchangeJwtBearerGrantOptions.java
new file mode 100644
index 000000000..a3fae07ff
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/ExchangeJwtBearerGrantOptions.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.auth;
+
+import java.util.Objects;
+
+/**
+ * Options for {@link EnterpriseAuth#exchangeJwtBearerGrant} — performs step 2 of the
+ * Enterprise Managed Authorization (SEP-990) flow.
+ *
+ * Posts an RFC 7523 JWT Bearer grant exchange to the MCP authorization server's token
+ * endpoint, exchanging the JAG (JWT Authorization Grant / ID-JAG) for a standard OAuth
+ * 2.0 access token that can be used to call the MCP server.
+ *
+ * Client credentials are sent using {@code client_secret_basic} (RFC 6749 §2.3.1): the
+ * {@code client_id} and {@code client_secret} are Base64-encoded and sent in the
+ * {@code Authorization: Basic} header. This matches the
+ * {@code token_endpoint_auth_method} declared by {@code EnterpriseAuthProvider} and is
+ * required by SEP-990 conformance tests.
+ *
+ * @author MCP SDK Contributors
+ * @see RFC 7523
+ */
+public class ExchangeJwtBearerGrantOptions {
+
+ /** The full URL of the MCP authorization server's token endpoint. */
+ private final String tokenEndpoint;
+
+ /** The JWT Authorization Grant (ID-JAG) obtained from step 1. */
+ private final String assertion;
+
+ /** The OAuth 2.0 client ID registered at the MCP authorization server. */
+ private final String clientId;
+
+ /** The OAuth 2.0 client secret (may be {@code null} for public clients). */
+ private final String clientSecret;
+
+ /** The {@code scope} parameter for the token request (optional). */
+ private final String scope;
+
+ private ExchangeJwtBearerGrantOptions(Builder builder) {
+ this.tokenEndpoint = Objects.requireNonNull(builder.tokenEndpoint, "tokenEndpoint must not be null");
+ this.assertion = Objects.requireNonNull(builder.assertion, "assertion must not be null");
+ this.clientId = Objects.requireNonNull(builder.clientId, "clientId must not be null");
+ this.clientSecret = builder.clientSecret;
+ this.scope = builder.scope;
+ }
+
+ public String getTokenEndpoint() {
+ return tokenEndpoint;
+ }
+
+ public String getAssertion() {
+ return assertion;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getClientSecret() {
+ return clientSecret;
+ }
+
+ public String getScope() {
+ return scope;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private String tokenEndpoint;
+
+ private String assertion;
+
+ private String clientId;
+
+ private String clientSecret;
+
+ private String scope;
+
+ private Builder() {
+ }
+
+ public Builder tokenEndpoint(String tokenEndpoint) {
+ this.tokenEndpoint = tokenEndpoint;
+ return this;
+ }
+
+ public Builder assertion(String assertion) {
+ this.assertion = assertion;
+ return this;
+ }
+
+ public Builder clientId(String clientId) {
+ this.clientId = clientId;
+ return this;
+ }
+
+ public Builder clientSecret(String clientSecret) {
+ this.clientSecret = clientSecret;
+ return this;
+ }
+
+ public Builder scope(String scope) {
+ this.scope = scope;
+ return this;
+ }
+
+ public ExchangeJwtBearerGrantOptions build() {
+ return new ExchangeJwtBearerGrantOptions(this);
+ }
+
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/JagTokenExchangeResponse.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/JagTokenExchangeResponse.java
new file mode 100644
index 000000000..c6c7935b8
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/JagTokenExchangeResponse.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.auth;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * RFC 8693 Token Exchange response for the JAG (JWT Authorization Grant) flow.
+ *
+ * Returned by the enterprise IdP when exchanging an ID Token for a JWT Authorization
+ * Grant (ID-JAG) during Enterprise Managed Authorization (SEP-990).
+ *
+ * The key fields are:
+ *
+ *
{@code access_token} — the issued JAG (despite the name, not an OAuth access
+ * token)
+ *
{@code issued_token_type} — must be
+ * {@code urn:ietf:params:oauth:token-type:id-jag}
+ *
{@code token_type} — informational; per RFC 8693 §2.2.1 it SHOULD be {@code N_A}
+ * when the issued token is not an access token, but this is not strictly enforced as some
+ * conformant IdPs may omit or vary the casing
+ * This is the result of step 2 in the Enterprise Managed Authorization (SEP-990) flow:
+ * exchanging the JWT Authorization Grant (ID-JAG) for an access token at the MCP Server's
+ * authorization server.
+ *
+ * @author MCP SDK Contributors
+ * @see RFC 7523
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JwtBearerAccessTokenResponse {
+
+ @JsonProperty("access_token")
+ private String accessToken;
+
+ @JsonProperty("token_type")
+ private String tokenType;
+
+ @JsonProperty("expires_in")
+ private Integer expiresIn;
+
+ @JsonProperty("scope")
+ private String scope;
+
+ @JsonProperty("refresh_token")
+ private String refreshToken;
+
+ /**
+ * The absolute time at which this token expires. Computed from {@code expires_in}
+ * upon deserialization by {@link EnterpriseAuth}. Marked {@code transient} so that
+ * JSON mappers skip this field during deserialization.
+ */
+ private transient Instant expiresAt;
+
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ public void setAccessToken(String accessToken) {
+ this.accessToken = accessToken;
+ }
+
+ public String getTokenType() {
+ return tokenType;
+ }
+
+ public void setTokenType(String tokenType) {
+ this.tokenType = tokenType;
+ }
+
+ public Integer getExpiresIn() {
+ return expiresIn;
+ }
+
+ public void setExpiresIn(Integer expiresIn) {
+ this.expiresIn = expiresIn;
+ }
+
+ public String getScope() {
+ return scope;
+ }
+
+ public void setScope(String scope) {
+ this.scope = scope;
+ }
+
+ public String getRefreshToken() {
+ return refreshToken;
+ }
+
+ public void setRefreshToken(String refreshToken) {
+ this.refreshToken = refreshToken;
+ }
+
+ public Instant getExpiresAt() {
+ return expiresAt;
+ }
+
+ public void setExpiresAt(Instant expiresAt) {
+ this.expiresAt = expiresAt;
+ }
+
+ /**
+ * Returns {@code true} if this token has expired (or has no expiry information).
+ */
+ public boolean isExpired() {
+ if (expiresAt == null) {
+ return false;
+ }
+ return Instant.now().isAfter(expiresAt);
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/RequestJwtAuthGrantOptions.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/RequestJwtAuthGrantOptions.java
new file mode 100644
index 000000000..b16d3a0f8
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/auth/RequestJwtAuthGrantOptions.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.auth;
+
+import java.util.Objects;
+
+/**
+ * Options for {@link EnterpriseAuth#requestJwtAuthorizationGrant} — performs step 1 of
+ * the Enterprise Managed Authorization (SEP-990) flow using a known token endpoint.
+ *
+ * Posts an RFC 8693 token exchange request to the enterprise IdP's token endpoint and
+ * returns the JAG (JWT Authorization Grant / ID-JAG token).
+ *
+ * @author MCP SDK Contributors
+ */
+public class RequestJwtAuthGrantOptions {
+
+ /** The full URL of the enterprise IdP's token endpoint. */
+ private final String tokenEndpoint;
+
+ /** The ID token (assertion) issued by the enterprise IdP. */
+ private final String idToken;
+
+ /** The OAuth 2.0 client ID registered at the enterprise IdP. */
+ private final String clientId;
+
+ /** The OAuth 2.0 client secret (may be {@code null} for public clients). */
+ private final String clientSecret;
+
+ /** The {@code audience} parameter for the token exchange request (optional). */
+ private final String audience;
+
+ /** The {@code resource} parameter for the token exchange request (optional). */
+ private final String resource;
+
+ /** The {@code scope} parameter for the token exchange request (optional). */
+ private final String scope;
+
+ protected RequestJwtAuthGrantOptions(Builder builder) {
+ this.tokenEndpoint = builder.tokenEndpoint;
+ this.idToken = Objects.requireNonNull(builder.idToken, "idToken must not be null");
+ this.clientId = Objects.requireNonNull(builder.clientId, "clientId must not be null");
+ this.clientSecret = builder.clientSecret;
+ this.audience = builder.audience;
+ this.resource = builder.resource;
+ this.scope = builder.scope;
+ }
+
+ public String getTokenEndpoint() {
+ return tokenEndpoint;
+ }
+
+ public String getIdToken() {
+ return idToken;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getClientSecret() {
+ return clientSecret;
+ }
+
+ public String getAudience() {
+ return audience;
+ }
+
+ public String getResource() {
+ return resource;
+ }
+
+ public String getScope() {
+ return scope;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String tokenEndpoint;
+
+ private String idToken;
+
+ private String clientId;
+
+ private String clientSecret;
+
+ private String audience;
+
+ private String resource;
+
+ private String scope;
+
+ protected Builder() {
+ }
+
+ public Builder tokenEndpoint(String tokenEndpoint) {
+ this.tokenEndpoint = tokenEndpoint;
+ return this;
+ }
+
+ public Builder idToken(String idToken) {
+ this.idToken = idToken;
+ return this;
+ }
+
+ public Builder clientId(String clientId) {
+ this.clientId = clientId;
+ return this;
+ }
+
+ public Builder clientSecret(String clientSecret) {
+ this.clientSecret = clientSecret;
+ return this;
+ }
+
+ public Builder audience(String audience) {
+ this.audience = audience;
+ return this;
+ }
+
+ public Builder resource(String resource) {
+ this.resource = resource;
+ return this;
+ }
+
+ public Builder scope(String scope) {
+ this.scope = scope;
+ return this;
+ }
+
+ public RequestJwtAuthGrantOptions build() {
+ Objects.requireNonNull(tokenEndpoint, "tokenEndpoint must not be null");
+ return new RequestJwtAuthGrantOptions(this);
+ }
+
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
index d6b01e17f..57a27a3fd 100644
--- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2024-2025 the original author or authors.
+ * Copyright 2024-2026 the original author or authors.
*/
package io.modelcontextprotocol.client.transport;
@@ -23,6 +23,7 @@
import io.modelcontextprotocol.client.McpAsyncClient;
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler;
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.json.McpJsonDefaults;
@@ -50,6 +51,7 @@
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
+import reactor.util.retry.Retry;
/**
* An implementation of the Streamable HTTP protocol as defined by the
@@ -72,6 +74,7 @@
*
*
* @author Christian Tzolov
+ * @author Daniel Garnier-Moiroux
* @see Streamable
* HTTP transport specification
@@ -115,6 +118,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
private final boolean openConnectionOnStartup;
+ private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler;
+
private final boolean resumableStreams;
private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
@@ -132,7 +137,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
- List supportedProtocolVersions) {
+ McpHttpClientAuthorizationErrorHandler authorizationErrorHandler, List supportedProtocolVersions) {
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
@@ -140,6 +145,7 @@ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient h
this.endpoint = endpoint;
this.resumableStreams = resumableStreams;
this.openConnectionOnStartup = openConnectionOnStartup;
+ this.authorizationErrorHandler = authorizationErrorHandler;
this.activeSession.set(createTransportSession());
this.httpRequestCustomizer = httpRequestCustomizer;
this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
@@ -239,7 +245,6 @@ public Mono closeGracefully() {
}
private Mono reconnect(McpTransportStream stream) {
-
return Mono.deferContextual(ctx -> {
if (stream != null) {
@@ -275,121 +280,120 @@ private Mono reconnect(McpTransportStream stream) {
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
})
- .flatMapMany(
- requestBuilder -> Flux.create(
- sseSink -> this.httpClient
- .sendAsync(requestBuilder.build(),
- responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo,
- sseSink))
- .whenComplete((response, throwable) -> {
- if (throwable != null) {
- sseSink.error(throwable);
- }
- else {
- logger.debug("SSE connection established successfully");
- }
- }))
- .map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
- .flatMap(responseEvent -> {
- int statusCode = responseEvent.responseInfo().statusCode();
-
- if (statusCode >= 200 && statusCode < 300) {
-
- if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
- String data = responseEvent.sseEvent().data();
- // Per 2025-11-25 spec (SEP-1699), servers may
- // send SSE events
- // with empty data to prime the client for
- // reconnection.
- // Skip these events as they contain no JSON-RPC
- // message.
- if (data == null || data.isBlank()) {
- logger.debug("Skipping SSE event with empty data (stream primer)");
- return Flux.empty();
- }
- try {
- // We don't support batching ATM and probably
- // won't since the next version considers
- // removing it.
- McpSchema.JSONRPCMessage message = McpSchema
- .deserializeJsonRpcMessage(this.jsonMapper, data);
-
- Tuple2, Iterable> idWithMessages = Tuples
- .of(Optional.ofNullable(responseEvent.sseEvent().id()),
- List.of(message));
-
- McpTransportStream sessionStream = stream != null ? stream
- : new DefaultMcpTransportStream<>(this.resumableStreams,
- this::reconnect);
- logger.debug("Connected stream {}", sessionStream.streamId());
-
- return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
-
- }
- catch (IOException ioException) {
- return Flux.error(new McpTransportException(
- "Error parsing JSON-RPC message: " + responseEvent, ioException));
- }
- }
- else {
- logger.debug("Received SSE event with type: {}", responseEvent.sseEvent());
- return Flux.empty();
- }
- }
- else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
- logger
- .debug("The server does not support SSE streams, using request-response mode.");
+ .flatMapMany(requestBuilder -> Flux.create(sseSink -> this.httpClient
+ .sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(sseSink))
+ .whenComplete((response, throwable) -> {
+ if (throwable != null) {
+ sseSink.error(throwable);
+ }
+ else {
+ logger.debug("SSE connection established successfully");
+ }
+ })).flatMap(responseEvent -> {
+ int statusCode = responseEvent.responseInfo().statusCode();
+ if (statusCode == 401 || statusCode == 403) {
+ logger.debug("Authorization error in reconnect with code {}", statusCode);
+ return Mono.error(
+ new McpHttpClientTransportAuthorizationException(
+ "Authorization error connecting to SSE stream",
+ responseEvent.responseInfo()));
+ }
+
+ if (!(responseEvent instanceof ResponseSubscribers.SseResponseEvent sseResponseEvent)) {
+ return Flux.error(new McpTransportException(
+ "Unrecognized server error when connecting to SSE stream, status code: "
+ + statusCode));
+ }
+ else if (statusCode >= 200 && statusCode < 300) {
+ if (MESSAGE_EVENT_TYPE.equals(sseResponseEvent.sseEvent().event())) {
+ String data = sseResponseEvent.sseEvent().data();
+ // Per 2025-11-25 spec (SEP-1699), servers may
+ // send SSE events
+ // with empty data to prime the client for
+ // reconnection.
+ // Skip these events as they contain no JSON-RPC
+ // message.
+ if (data == null || data.isBlank()) {
+ logger.debug("Skipping SSE event with empty data (stream primer)");
return Flux.empty();
}
- else if (statusCode == NOT_FOUND) {
-
- if (transportSession != null && transportSession.sessionId().isPresent()) {
- // only if the request was sent with a session id
- // and the response is 404, we consider it a
- // session not found error.
- logger.debug("Session not found for session ID: {}",
- transportSession.sessionId().get());
- String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
- McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
- "Session not found for session ID: " + sessionIdRepresentation);
- return Flux.error(exception);
- }
- return Flux.error(
- new McpTransportException("Server Not Found. Status code:" + statusCode
- + ", response-event:" + responseEvent));
- }
- else if (statusCode == BAD_REQUEST) {
- if (transportSession != null && transportSession.sessionId().isPresent()) {
- // only if the request was sent with a session id
- // and thre response is 404, we consider it a
- // session not found error.
- String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
- McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
- "Session not found for session ID: " + sessionIdRepresentation);
- return Flux.error(exception);
- }
- return Flux.error(
- new McpTransportException("Bad Request. Status code:" + statusCode
- + ", response-event:" + responseEvent));
+ try {
+ // We don't support batching ATM and probably
+ // won't since the next version considers
+ // removing it.
+ McpSchema.JSONRPCMessage message = McpSchema
+ .deserializeJsonRpcMessage(this.jsonMapper, data);
- }
+ Tuple2, Iterable> idWithMessages = Tuples
+ .of(Optional.ofNullable(sseResponseEvent.sseEvent().id()), List.of(message));
+
+ McpTransportStream sessionStream = stream != null ? stream
+ : new DefaultMcpTransportStream<>(this.resumableStreams, this::reconnect);
+ logger.debug("Connected stream {}", sessionStream.streamId());
- return Flux.error(new McpTransportException(
- "Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
- }).flatMap(
- jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
- .onErrorMap(CompletionException.class, t -> t.getCause())
- .onErrorComplete(t -> {
- this.handleException(t);
- return true;
- })
- .doFinally(s -> {
- Disposable ref = disposableRef.getAndSet(null);
- if (ref != null) {
- transportSession.removeConnection(ref);
+ return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
+
+ }
+ catch (IOException ioException) {
+ return Flux.error(new McpTransportException(
+ "Error parsing JSON-RPC message: " + responseEvent, ioException));
}
- }))
+ }
+ else {
+ logger.debug("Received SSE event with type: {}", sseResponseEvent.sseEvent());
+ return Flux.empty();
+ }
+ }
+ else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
+ logger.debug("The server does not support SSE streams, using request-response mode.");
+ return Flux.empty();
+ }
+ else if (statusCode == NOT_FOUND) {
+
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ // only if the request was sent with a session id
+ // and the response is 404, we consider it a
+ // session not found error.
+ logger.debug("Session not found for session ID: {}",
+ transportSession.sessionId().get());
+ String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
+ "Session not found for session ID: " + sessionIdRepresentation);
+ return Flux.error(exception);
+ }
+ return Flux.error(
+ new McpTransportException("Server Not Found. Status code:" + statusCode
+ + ", response-event:" + responseEvent));
+ }
+ else if (statusCode == BAD_REQUEST) {
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ // only if the request was sent with a session id
+ // and thre response is 404, we consider it a
+ // session not found error.
+ String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
+ "Session not found for session ID: " + sessionIdRepresentation);
+ return Flux.error(exception);
+ }
+ return Flux.error(new McpTransportException(
+ "Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent));
+ }
+ return Flux.error(new McpTransportException(
+ "Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
+ })
+ .retryWhen(authorizationErrorRetrySpec())
+ .flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
+ .onErrorMap(CompletionException.class, t -> t.getCause())
+ .onErrorComplete(t -> {
+ this.handleException(t);
+ return true;
+ })
+ .doFinally(s -> {
+ Disposable ref = disposableRef.getAndSet(null);
+ if (ref != null) {
+ transportSession.removeConnection(ref);
+ }
+ }))
.contextWrite(ctx)
.subscribe();
@@ -400,6 +404,25 @@ else if (statusCode == BAD_REQUEST) {
}
+ private Retry authorizationErrorRetrySpec() {
+ return Retry.from(companion -> companion.flatMap(retrySignal -> {
+ if (!(retrySignal.failure() instanceof McpHttpClientTransportAuthorizationException authException)) {
+ return Mono.error(retrySignal.failure());
+ }
+ if (retrySignal.totalRetriesInARow() >= this.authorizationErrorHandler.maxRetries()) {
+ return Mono.error(retrySignal.failure());
+ }
+ return Mono.deferContextual(ctx -> {
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono
+ .from(this.authorizationErrorHandler.handle(authException.getResponseInfo(), transportContext))
+ .switchIfEmpty(Mono.just(false))
+ .flatMap(shouldRetry -> shouldRetry ? Mono.just(retrySignal.totalRetries())
+ : Mono.error(retrySignal.failure()));
+ });
+ }));
+ }
+
private BodyHandler toSendMessageBodySubscriber(FluxSink sink) {
BodyHandler responseBodyHandler = responseInfo -> {
@@ -478,6 +501,13 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) {
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();
})).flatMap(responseEvent -> {
+ int statusCode = responseEvent.responseInfo().statusCode();
+ if (statusCode == 401 || statusCode == 403) {
+ logger.debug("Authorization error in sendMessage with code {}", statusCode);
+ return Mono.error(new McpHttpClientTransportAuthorizationException(
+ "Authorization error when sending message", responseEvent.responseInfo()));
+ }
+
if (transportSession.markInitialized(
responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
// Once we have a session, we try to open an async stream for
@@ -488,8 +518,6 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) {
String sessionRepresentation = sessionIdOrPlaceholder(transportSession);
- int statusCode = responseEvent.responseInfo().statusCode();
-
if (statusCode >= 200 && statusCode < 300) {
String contentType = responseEvent.responseInfo()
@@ -605,6 +633,7 @@ else if (statusCode == BAD_REQUEST) {
return Flux.error(
new RuntimeException("Failed to send message: " + responseEvent));
})
+ .retryWhen(authorizationErrorRetrySpec())
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
.onErrorMap(CompletionException.class, t -> t.getCause())
.onErrorComplete(t -> {
@@ -664,6 +693,8 @@ public static class Builder {
private List supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25);
+ private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler.NOOP;
+
/**
* Creates a new builder with the specified base URI.
* @param baseUri the base URI of the MCP server
@@ -801,6 +832,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
return this;
}
+ /**
+ * Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
+ * when sending a message.
+ * @param authorizationErrorHandler the handler
+ * @return this builder
+ */
+ public Builder authorizationErrorHandler(McpHttpClientAuthorizationErrorHandler authorizationErrorHandler) {
+ this.authorizationErrorHandler = authorizationErrorHandler;
+ return this;
+ }
+
/**
* Sets the connection timeout for the HTTP client.
* @param connectTimeout the connection timeout duration
@@ -845,7 +887,7 @@ public HttpClientStreamableHttpTransport build() {
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonDefaults.getMapper() : jsonMapper,
httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
- httpRequestCustomizer, supportedProtocolVersions);
+ httpRequestCustomizer, authorizationErrorHandler, supportedProtocolVersions);
}
}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java
new file mode 100644
index 000000000..31e5ae95e
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2026-2026 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport;
+
+import java.net.http.HttpResponse;
+
+import io.modelcontextprotocol.spec.McpTransportException;
+
+/**
+ * Thrown when the MCP server responds with an authorization error (HTTP 401 or HTTP 403).
+ * Subclass of {@link McpTransportException} for targeted retry handling in
+ * {@link HttpClientStreamableHttpTransport}.
+ *
+ * @author Daniel Garnier-Moiroux
+ */
+public class McpHttpClientTransportAuthorizationException extends McpTransportException {
+
+ private final HttpResponse.ResponseInfo responseInfo;
+
+ public McpHttpClientTransportAuthorizationException(String message, HttpResponse.ResponseInfo responseInfo) {
+ super(message);
+ this.responseInfo = responseInfo;
+ }
+
+ public HttpResponse.ResponseInfo getResponseInfo() {
+ return responseInfo;
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java
new file mode 100644
index 000000000..c98fac61d
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2026-2026 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport.customizer;
+
+import java.net.http.HttpResponse;
+
+import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException;
+import io.modelcontextprotocol.common.McpTransportContext;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+/**
+ * Handle security-related errors in HTTP-client based transports. This class handles MCP
+ * server responses with status code 401 and 403.
+ *
+ * @see MCP
+ * Specification: Authorization
+ * @author Daniel Garnier-Moiroux
+ */
+public interface McpHttpClientAuthorizationErrorHandler {
+
+ /**
+ * Handle authorization error (HTTP 401 or 403), and signal whether the HTTP request
+ * should be retried or not. If the publisher returns true, the original transport
+ * method (connect, sendMessage) will be replayed with the original arguments.
+ * Otherwise, the transport will throw an
+ * {@link McpHttpClientTransportAuthorizationException}, indicating the error status.
+ *
+ * If the returned {@link Publisher} errors, the error will be propagated to the
+ * calling method, to be handled by the caller.
+ *
+ * The number of retries is bounded by {@link #maxRetries()}.
+ * @param responseInfo the HTTP response information
+ * @param context the MCP client transport context
+ * @return {@link Publisher} emitting true if the original request should be replayed,
+ * false otherwise.
+ */
+ Publisher handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);
+
+ /**
+ * Maximum number of authorization error retries the transport will attempt. When the
+ * handler signals a retry via {@link #handle}, the transport will replay the original
+ * request at most this many times. If the authorization error persists after
+ * exhausting all retries, the transport will propagate the
+ * {@link McpHttpClientTransportAuthorizationException}.
+ *
+ * Defaults to {@code 1}.
+ * @return the maximum number of retries
+ */
+ default int maxRetries() {
+ return 1;
+ }
+
+ /**
+ * A no-op handler, used in the default use-case.
+ */
+ McpHttpClientAuthorizationErrorHandler NOOP = new Noop();
+
+ /**
+ * Create a {@link McpHttpClientAuthorizationErrorHandler} from a synchronous handler.
+ * Will be subscribed on {@link Schedulers#boundedElastic()}. The handler may be
+ * blocking.
+ * @param handler the synchronous handler
+ * @return an async handler
+ */
+ static McpHttpClientAuthorizationErrorHandler fromSync(Sync handler) {
+ return (info, context) -> Mono.fromCallable(() -> handler.handle(info, context))
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ /**
+ * Synchronous authorization error handler.
+ */
+ interface Sync {
+
+ /**
+ * Handle authorization error (HTTP 401 or 403), and signal whether the HTTP
+ * request should be retried or not. If the return value is true, the original
+ * transport method (connect, sendMessage) will be replayed with the original
+ * arguments. Otherwise, the transport will throw an
+ * {@link McpHttpClientTransportAuthorizationException}, indicating the error
+ * status.
+ * @param responseInfo the HTTP response information
+ * @param context the MCP client transport context
+ * @return true if the original request should be replayed, false otherwise.
+ */
+ boolean handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);
+
+ }
+
+ class Noop implements McpHttpClientAuthorizationErrorHandler {
+
+ @Override
+ public Publisher handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context) {
+ return Mono.just(false);
+ }
+
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
index 32256987a..b078493ef 100644
--- a/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
@@ -5,10 +5,12 @@
package io.modelcontextprotocol.server;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -25,7 +27,6 @@
import io.modelcontextprotocol.spec.McpSchema.CompleteResult.CompleteCompletion;
import io.modelcontextprotocol.spec.McpSchema.ErrorCodes;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
-import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpSchema.PromptReference;
import io.modelcontextprotocol.spec.McpSchema.ResourceReference;
import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest;
@@ -111,12 +112,10 @@ public class McpAsyncServer {
private final ConcurrentHashMap prompts = new ConcurrentHashMap<>();
- // FIXME: this field is deprecated and should be remvoed together with the
- // broadcasting loggingNotification.
- private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG;
-
private final ConcurrentHashMap completions = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap> resourceSubscriptions = new ConcurrentHashMap<>();
+
private List protocolVersions;
private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DefaultMcpUriTemplateManagerFactory();
@@ -149,8 +148,11 @@ public class McpAsyncServer {
this.protocolVersions = mcpTransportProvider.protocolVersions();
- mcpTransportProvider.setSessionFactory(transport -> new McpServerSession(UUID.randomUUID().toString(),
- requestTimeout, transport, this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
+ mcpTransportProvider.setSessionFactory(transport -> {
+ String sessionId = UUID.randomUUID().toString();
+ return new McpServerSession(sessionId, requestTimeout, transport, this::asyncInitializeRequestHandler,
+ requestHandlers, notificationHandlers, () -> this.cleanupForSession(sessionId));
+ });
}
McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
@@ -174,8 +176,9 @@ public class McpAsyncServer {
this.protocolVersions = mcpTransportProvider.protocolVersions();
- mcpTransportProvider.setSessionFactory(new DefaultMcpStreamableServerSessionFactory(requestTimeout,
- this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
+ mcpTransportProvider.setSessionFactory(
+ new DefaultMcpStreamableServerSessionFactory(requestTimeout, this::asyncInitializeRequestHandler,
+ requestHandlers, notificationHandlers, sessionId -> this.cleanupForSession(sessionId)));
}
private Map prepareNotificationHandlers(McpServerFeatures.Async features) {
@@ -215,6 +218,10 @@ private Map> prepareRequestHandlers() {
requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());
requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());
requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());
+ if (Boolean.TRUE.equals(this.serverCapabilities.resources().subscribe())) {
+ requestHandlers.put(McpSchema.METHOD_RESOURCES_SUBSCRIBE, resourcesSubscribeRequestHandler());
+ requestHandlers.put(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, resourcesUnsubscribeRequestHandler());
+ }
}
// Add prompts API handlers if provider exists
@@ -685,12 +692,73 @@ public Mono notifyResourcesListChanged() {
}
/**
- * Notifies clients that the resources have updated.
- * @return A Mono that completes when all clients have been notified
+ * Notifies only the sessions that have subscribed to the updated resource URI.
+ * @param resourcesUpdatedNotification the notification containing the updated
+ * resource URI
+ * @return A Mono that completes when all subscribed sessions have been notified
*/
public Mono notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) {
- return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
- resourcesUpdatedNotification);
+ return Mono.defer(() -> {
+ String uri = resourcesUpdatedNotification.uri();
+ Set subscribedSessions = this.resourceSubscriptions.get(uri);
+ if (subscribedSessions == null || subscribedSessions.isEmpty()) {
+ logger.debug("No sessions subscribed to resource URI: {}", uri);
+ return Mono.empty();
+ }
+ return Flux.fromIterable(subscribedSessions)
+ .flatMap(sessionId -> this.mcpTransportProvider
+ .notifyClient(sessionId, McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
+ resourcesUpdatedNotification)
+ .doOnError(e -> logger.error("Failed to notify session {} of resource update for {}", sessionId,
+ uri, e))
+ .onErrorComplete())
+ .then();
+ });
+ }
+
+ private Mono cleanupForSession(String sessionId) {
+ return Mono.fromRunnable(() -> {
+ removeSessionSubscriptions(sessionId);
+ });
+ }
+
+ private void removeSessionSubscriptions(String sessionId) {
+ this.resourceSubscriptions.forEach((uri, sessions) -> sessions.remove(sessionId));
+ this.resourceSubscriptions.entrySet().removeIf(entry -> entry.getValue().isEmpty());
+ }
+
+ private McpRequestHandler