@@ -78,7 +78,7 @@ public class JSONRPCTransportConfigBuilder extends ClientTransportConfigBuilder< *
- * If not specified, the default {@link JdkA2AHttpClient} is used. + * If not specified, a client is auto-selected via {@link A2AHttpClientFactory}. *
* Example: *
{@code
@@ -101,16 +101,16 @@ public JSONRPCTransportConfigBuilder httpClient(A2AHttpClient httpClient) {
/**
* Build the JSON-RPC transport configuration.
*
- * If no HTTP client was configured, the default {@link JdkA2AHttpClient} is used.
+ * If no HTTP client was configured, one is auto-selected via {@link A2AHttpClientFactory}.
* Any configured interceptors are transferred to the configuration.
*
* @return the configured JSON-RPC transport configuration
*/
@Override
public JSONRPCTransportConfig build() {
- // No HTTP client provided, fallback to the default one (JDK-based implementation)
+ // No HTTP client provided, use factory to get best available implementation
if (httpClient == null) {
- httpClient = new JdkA2AHttpClient();
+ httpClient = A2AHttpClientFactory.create();
}
JSONRPCTransportConfig config = new JSONRPCTransportConfig(httpClient);
diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java
index 5e9266a50..2dc1a9733 100644
--- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java
+++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java
@@ -1,6 +1,6 @@
package io.a2a.client.transport.jsonrpc;
-import io.a2a.client.http.JdkA2AHttpClient;
+import io.a2a.client.http.A2AHttpClientFactory;
import io.a2a.client.transport.spi.ClientTransportProvider;
import io.a2a.spec.A2AClientException;
import io.a2a.spec.AgentCard;
@@ -14,7 +14,7 @@ public class JSONRPCTransportProvider implements ClientTransportProvider interceptors) {
- this.httpClient = httpClient == null ? new JdkA2AHttpClient() : httpClient;
+ this.httpClient = httpClient == null ? A2AHttpClientFactory.create() : httpClient;
this.agentCard = agentCard;
this.agentInterface = agentInterface;
this.interceptors = interceptors;
diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java
index 855de0ca6..257836199 100644
--- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java
+++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java
@@ -1,7 +1,7 @@
package io.a2a.client.transport.rest;
import io.a2a.client.http.A2AHttpClient;
-import io.a2a.client.http.JdkA2AHttpClient;
+import io.a2a.client.http.A2AHttpClientFactory;
import io.a2a.client.transport.spi.ClientTransportConfigBuilder;
import org.jspecify.annotations.Nullable;
@@ -11,7 +11,7 @@
* This builder provides a fluent API for configuring the REST transport protocol.
* All configuration options are optional - if not specified, sensible defaults are used:
*
- * - HTTP client: {@link JdkA2AHttpClient} (JDK's built-in HTTP client)
+ * - HTTP client: Auto-selected via {@link A2AHttpClientFactory} (prefers Vert.x, falls back to JDK)
* - Interceptors: None
*
*
@@ -78,7 +78,7 @@ public class RestTransportConfigBuilder extends ClientTransportConfigBuilderCustom header handling
*
*
- * If not specified, the default {@link JdkA2AHttpClient} is used.
+ * If not specified, a client is auto-selected via {@link A2AHttpClientFactory}.
*
* Example:
*
{@code
@@ -101,16 +101,16 @@ public RestTransportConfigBuilder httpClient(A2AHttpClient httpClient) {
/**
* Build the REST transport configuration.
*
- * If no HTTP client was configured, the default {@link JdkA2AHttpClient} is used.
+ * If no HTTP client was configured, one is auto-selected via {@link A2AHttpClientFactory}.
* Any configured interceptors are transferred to the configuration.
*
* @return the configured REST transport configuration
*/
@Override
public RestTransportConfig build() {
- // No HTTP client provided, fallback to the default one (JDK-based implementation)
+ // No HTTP client provided, use factory to get best available implementation
if (httpClient == null) {
- httpClient = new JdkA2AHttpClient();
+ httpClient = A2AHttpClientFactory.create();
}
RestTransportConfig config = new RestTransportConfig(httpClient);
diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java
index 5b5af20c9..b6e6d7a47 100644
--- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java
+++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java
@@ -1,6 +1,6 @@
package io.a2a.client.transport.rest;
-import io.a2a.client.http.JdkA2AHttpClient;
+import io.a2a.client.http.A2AHttpClientFactory;
import io.a2a.client.transport.spi.ClientTransportProvider;
import io.a2a.spec.A2AClientException;
import io.a2a.spec.AgentCard;
@@ -18,9 +18,9 @@ public String getTransportProtocol() {
public RestTransport create(RestTransportConfig clientTransportConfig, AgentCard agentCard, AgentInterface agentInterface) throws A2AClientException {
RestTransportConfig transportConfig = clientTransportConfig;
if (transportConfig == null) {
- transportConfig = new RestTransportConfig(new JdkA2AHttpClient());
+ transportConfig = new RestTransportConfig(A2AHttpClientFactory.create());
}
- return new RestTransport(clientTransportConfig.getHttpClient(), agentCard, agentInterface, transportConfig.getInterceptors());
+ return new RestTransport(transportConfig.getHttpClient(), agentCard, agentInterface, transportConfig.getInterceptors());
}
@Override
diff --git a/extras/http-client-vertx/pom.xml b/extras/http-client-vertx/pom.xml
new file mode 100644
index 000000000..59bbf1141
--- /dev/null
+++ b/extras/http-client-vertx/pom.xml
@@ -0,0 +1,44 @@
+
+
+ 4.0.0
+
+
+ io.github.a2asdk
+ a2a-java-sdk-parent
+ 1.0.0.Alpha1-SNAPSHOT
+ ../../pom.xml
+
+ a2a-java-sdk-http-client-vertx
+
+ jar
+
+ Java SDK A2A HTTP Client - Vert.x Implementation
+ Vert.x implementation for A2A HTTP Client
+
+
+
+ ${project.groupId}
+ a2a-java-sdk-http-client
+
+
+ io.vertx
+ vertx-web-client
+ provided
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+ org.mock-server
+ mockserver-netty
+ test
+
+
+
+
diff --git a/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClient.java b/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClient.java
new file mode 100644
index 000000000..b6afce027
--- /dev/null
+++ b/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClient.java
@@ -0,0 +1,554 @@
+package io.a2a.client.http;
+
+import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
+import static java.net.HttpURLConnection.HTTP_MULT_CHOICE;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import org.jspecify.annotations.Nullable;
+
+import io.a2a.common.A2AErrorMessages;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+import io.vertx.ext.web.codec.BodyCodec;
+import jakarta.enterprise.context.spi.CreationalContext;
+import jakarta.enterprise.inject.spi.Bean;
+import jakarta.enterprise.inject.spi.BeanManager;
+import jakarta.enterprise.inject.spi.CDI;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Vert.x WebClient-based implementation of {@link A2AHttpClient}.
+ *
+ *
+ * This implementation uses Vert.x's reactive HTTP client to execute requests.
+ * For synchronous methods ({@link GetBuilder#get()}, {@link PostBuilder#post()}, {@link DeleteBuilder#delete()}),
+ * the implementation blocks the calling thread until the asynchronous operation completes.
+ * For SSE streaming methods, the implementation returns immediately with a
+ * {@link CompletableFuture} and streams events asynchronously via callbacks.
+ *
+ *
Lifecycle Management
+ *
+ * This client implements {@link AutoCloseable} and should be closed when no longer needed:
+ *
{@code
+ * try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ * A2AHttpResponse response = client.createGet()
+ * .url("https://example.com/api")
+ * .get();
+ * // Use response
+ * }
+ * }
+ *
+ *
+ * If constructed with the no-args constructor, the client creates and owns a
+ * {@link Vertx} instance which will be closed when {@link #close()} is called.
+ * If constructed with an external {@link Vertx} instance, only the WebClient is
+ * closed, leaving the Vertx instance management to the caller.
+ *
+ *
Thread Safety
+ *
+ * This client is thread-safe. Multiple threads can create and execute requests
+ * concurrently. However, individual builder instances are NOT thread-safe and should
+ * not be shared across threads.
+ *
+ *
HTTP/2 Support
+ *
+ * Vert.x WebClient automatically negotiates HTTP/2 when supported by the server
+ * via ALPN. No explicit configuration is required.
+ *
+ *
Usage Examples
+ *
+ * Simple GET Request
+ * {@code
+ * try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ * A2AHttpResponse response = client.createGet()
+ * .url("https://api.example.com/data")
+ * .addHeader("Authorization", "Bearer token")
+ * .get();
+ *
+ * if (response.success()) {
+ * System.out.println(response.body());
+ * }
+ * }
+ * }
+ *
+ * POST Request with JSON Body
+ * {@code
+ * try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ * A2AHttpResponse response = client.createPost()
+ * .url("https://api.example.com/submit")
+ * .addHeader("Content-Type", "application/json")
+ * .body("{\"key\":\"value\"}")
+ * .post();
+ *
+ * System.out.println("Status: " + response.status());
+ * }
+ * }
+ *
+ * Async SSE Streaming
+ * {@code
+ * try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ * CompletableFuture future = client.createGet()
+ * .url("https://api.example.com/stream")
+ * .getAsyncSSE(
+ * message -> System.out.println("Received: " + message),
+ * error -> error.printStackTrace(),
+ * () -> System.out.println("Stream complete")
+ * );
+ *
+ * // Do other work while streaming...
+ * future.join(); // Wait for completion if needed
+ * }
+ * }
+ */
+public class VertxA2AHttpClient implements A2AHttpClient, AutoCloseable {
+
+ private final Vertx vertx;
+ private final WebClient webClient;
+ private boolean ownsVertx;
+ private static final Logger log = Logger.getLogger(VertxA2AHttpClient.class.getName());
+
+ /**
+ * Creates a new VertxA2AHttpClient with an internally managed Vert.x instance.
+ *
+ *
+ * The client creates a new {@link Vertx} instance and {@link WebClient} configured
+ * with HTTP keep-alive and automatic redirect following. When {@link #close()} is called,
+ * both the WebClient and Vertx instance are closed.
+ *
+ *
+ * Important: Always call {@link #close()} when done with this client
+ * to prevent resource leaks.
+ *
+ * @see #VertxA2AHttpClient(Vertx) for using an externally managed Vertx instance
+ */
+ public VertxA2AHttpClient() {
+ this.vertx = createVertx();
+ WebClientOptions options = new WebClientOptions()
+ .setFollowRedirects(true)
+ .setKeepAlive(true);
+ this.webClient = WebClient.create(vertx, options);
+ log.fine("Vert.x client is ready.");
+ }
+
+ private Vertx createVertx() {
+ try {
+ BeanManager beanManager = CDI.current().getBeanManager();
+ Set> beans = beanManager.getBeans(Vertx.class);
+ if (beans != null && !beans.isEmpty()) {
+ this.ownsVertx = false;
+ Bean> bean = beans.iterator().next();
+ CreationalContext> context = beanManager.createCreationalContext(bean);
+ return (Vertx) beanManager.getReference(bean, Vertx.class, context);
+ }
+ } catch (Exception ex) {
+ log.log(Level.FINE, "Error loading vertx from CDI error details", ex);
+ }
+ this.ownsVertx = true;
+ return Vertx.vertx();
+ }
+
+ /**
+ * Creates a new VertxA2AHttpClient using an externally managed Vert.x instance.
+ *
+ *
+ * The client creates a {@link WebClient} using the provided {@link Vertx} instance.
+ * When {@link #close()} is called, only the WebClient is closed; the Vertx instance
+ * remains open and must be managed by the caller.
+ *
+ *
+ * This constructor is useful in environments where Vert.x is already managed,
+ * such as Quarkus applications.
+ *
+ * @param vertx the Vert.x instance to use; must not be null
+ * @throws NullPointerException if vertx is null
+ */
+ public VertxA2AHttpClient(Vertx vertx) {
+ if (vertx == null) {
+ throw new NullPointerException("vertx must not be null");
+ }
+ this.vertx = vertx;
+ this.ownsVertx = false;
+ WebClientOptions options = new WebClientOptions()
+ .setFollowRedirects(true)
+ .setKeepAlive(true);
+ this.webClient = WebClient.create(vertx, options);
+ log.fine("Vert.x client is ready.");
+ }
+
+ /**
+ * Closes this HTTP client and releases associated resources.
+ *
+ *
+ * This method always closes the WebClient. If the client was created with the
+ * no-args constructor (and thus owns the Vert.x instance), the Vertx instance is
+ * also closed. Otherwise, the Vertx instance is left open for the caller to manage.
+ */
+ @Override
+ public void close() {
+ webClient.close();
+ if (ownsVertx) {
+ vertx.close();
+ }
+ }
+
+ @Override
+ public GetBuilder createGet() {
+ return new VertxGetBuilder();
+ }
+
+ @Override
+ public PostBuilder createPost() {
+ return new VertxPostBuilder();
+ }
+
+ @Override
+ public DeleteBuilder createDelete() {
+ return new VertxDeleteBuilder();
+ }
+
+ private abstract class VertxBuilder> implements Builder {
+
+ protected String url = "";
+ protected Map headers = new HashMap<>();
+
+ @Override
+ public T url(String url) {
+ this.url = url;
+ return self();
+ }
+
+ @Override
+ public T addHeader(String name, String value) {
+ headers.put(name, value);
+ return self();
+ }
+
+ @Override
+ public T addHeaders(Map headers) {
+ if (headers != null && !headers.isEmpty()) {
+ for (Map.Entry entry : headers.entrySet()) {
+ addHeader(entry.getKey(), entry.getValue());
+ }
+ }
+ return self();
+ }
+
+ @SuppressWarnings("unchecked")
+ T self() {
+ return (T) this;
+ }
+ }
+
+ /**
+ * Common method to execute synchronous HTTP requests (GET, POST, DELETE).
+ *
+ * @param request the HTTP request configured with method and URL
+ * @param headers custom headers to add to the request
+ * @param bodyBuffer optional body buffer for POST requests (null for GET/DELETE)
+ * @return the HTTP response
+ * @throws IOException if the request fails or returns 401/403
+ * @throws InterruptedException if the thread is interrupted while waiting
+ */
+ private A2AHttpResponse executeSyncRequest(
+ HttpRequest request,
+ Map headers,
+ @Nullable Buffer bodyBuffer) throws IOException, InterruptedException {
+
+ // Add headers
+ for (Map.Entry entry : headers.entrySet()) {
+ request.putHeader(entry.getKey(), entry.getValue());
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference responseRef = new AtomicReference<>();
+ AtomicReference errorRef = new AtomicReference<>();
+
+ // Send with or without body
+ if (bodyBuffer != null) {
+ request.sendBuffer(bodyBuffer, ar -> handleResponse(ar, responseRef, errorRef, latch));
+ } else {
+ request.send(ar -> handleResponse(ar, responseRef, errorRef, latch));
+ }
+
+ latch.await();
+
+ if (errorRef.get() != null) {
+ Throwable error = errorRef.get();
+ if (error instanceof IOException) {
+ throw (IOException) error;
+ }
+ if (error instanceof InterruptedException) {
+ throw (InterruptedException) error;
+ }
+ throw new IOException("Request failed", error);
+ }
+ A2AHttpResponse finalResponse = responseRef.get();
+ if(finalResponse == null) {
+ throw new IllegalStateException("No response from http request");
+ }
+ return finalResponse;
+ }
+
+ /**
+ * Handles the HTTP response callback, checking for auth errors and populating response/error refs.
+ */
+ private void handleResponse(
+ io.vertx.core.AsyncResult> ar,
+ AtomicReference responseRef,
+ AtomicReference errorRef,
+ CountDownLatch latch) {
+
+ if (ar.succeeded()) {
+ HttpResponse response = ar.result();
+ int status = response.statusCode();
+
+ // Check for authentication/authorization errors
+ switch (status) {
+ case HTTP_UNAUTHORIZED -> errorRef.set(new IOException(A2AErrorMessages.AUTHENTICATION_FAILED));
+ case HTTP_FORBIDDEN -> errorRef.set(new IOException(A2AErrorMessages.AUTHORIZATION_FAILED));
+ default -> {
+ String body = response.bodyAsString();
+ responseRef.set(new VertxHttpResponse(status, body != null ? body : ""));
+ }
+ }
+ } else {
+ errorRef.set(ar.cause());
+ }
+ latch.countDown();
+ }
+
+ /**
+ * Common method to execute async SSE requests (GET or POST).
+ *
+ * @param baseRequest the base HTTP request (HttpRequest<Buffer>) configured with method and URL
+ * @param headers custom headers to add to the request
+ * @param bodyBuffer optional body buffer for POST requests (null for GET)
+ * @param messageConsumer callback for each SSE message received
+ * @param errorConsumer callback for errors
+ * @param completeRunnable callback when stream completes successfully
+ * @return CompletableFuture that completes when the stream ends
+ */
+ private CompletableFuture executeAsyncSSE(
+ HttpRequest baseRequest,
+ Map headers,
+ @Nullable Buffer bodyBuffer,
+ Consumer messageConsumer,
+ Consumer errorConsumer,
+ Runnable completeRunnable) {
+
+ CompletableFuture future = new CompletableFuture<>();
+ AtomicBoolean successOccurred = new AtomicBoolean(false);
+ AtomicBoolean streamEnded = new AtomicBoolean(false);
+ AtomicBoolean futureCompleted = new AtomicBoolean(false);
+
+ HttpRequest request = baseRequest
+ .putHeader(ACCEPT, EVENT_STREAM)
+ .as(BodyCodec.sseStream(stream -> {
+ stream.handler(event -> {
+ String data = event.data();
+ if (data != null) {
+ data = data.trim();
+ if (!data.isEmpty()) {
+ messageConsumer.accept(data);
+ }
+ }
+ });
+
+ stream.endHandler(v -> {
+ streamEnded.set(true);
+ // Only complete if we've validated success and haven't completed yet
+ if (successOccurred.get() && futureCompleted.compareAndSet(false, true)) {
+ completeRunnable.run();
+ future.complete(null);
+ }
+ });
+
+ stream.exceptionHandler(error -> {
+ if (futureCompleted.compareAndSet(false, true)) {
+ errorConsumer.accept(error);
+ future.complete(null);
+ }
+ });
+ }));
+
+ // Add custom headers
+ for (Map.Entry entry : headers.entrySet()) {
+ request.putHeader(entry.getKey(), entry.getValue());
+ }
+
+ // Send with or without body
+ var sendFuture = (bodyBuffer != null) ? request.sendBuffer(bodyBuffer) : request.send();
+
+ sendFuture
+ .onSuccess(response -> {
+ // Validate status code manually since .expecting() doesn't work with SSE streams
+ int statusCode = response.statusCode();
+ if (statusCode < 200 || statusCode >= 300) {
+ // Error - don't set successOccurred, just report error
+ if (futureCompleted.compareAndSet(false, true)) {
+ // Use same error messages as sync requests for consistency
+ IOException error = switch (statusCode) {
+ case HTTP_UNAUTHORIZED -> new IOException(A2AErrorMessages.AUTHENTICATION_FAILED);
+ case HTTP_FORBIDDEN -> new IOException(A2AErrorMessages.AUTHORIZATION_FAILED);
+ default -> new IOException("HTTP " + statusCode + ": " + response.bodyAsString());
+ };
+ errorConsumer.accept(error);
+ future.complete(null);
+ }
+ } else {
+ // Success - mark as successful
+ successOccurred.set(true);
+ // If stream already ended, complete now
+ if (streamEnded.get() && futureCompleted.compareAndSet(false, true)) {
+ completeRunnable.run();
+ future.complete(null);
+ }
+ }
+ })
+ .onFailure(cause -> {
+ if (futureCompleted.compareAndSet(false, true)) {
+ errorConsumer.accept(cause);
+ future.complete(null);
+ }
+ });
+
+ return future;
+ }
+
+ private class VertxGetBuilder extends VertxBuilder implements A2AHttpClient.GetBuilder {
+
+ /**
+ * {@inheritDoc}
+ *
+ *
+ * Implementation Note: This method blocks the calling thread until
+ * the asynchronous HTTP request completes. The underlying Vert.x operation executes
+ * asynchronously on the Vert.x event loop.
+ *
+ * @throws IOException if the request fails, including:
+ *
+ * - Network errors (connection refused, timeout, etc.)
+ * - HTTP 401 Unauthorized - with message from {@link A2AErrorMessages#AUTHENTICATION_FAILED}
+ * - HTTP 403 Forbidden - with message from {@link A2AErrorMessages#AUTHORIZATION_FAILED}
+ *
+ * @throws InterruptedException if the thread is interrupted while waiting
+ */
+ @Override
+ public A2AHttpResponse get() throws IOException, InterruptedException {
+ return executeSyncRequest(webClient.getAbs(url), headers, null);
+ }
+
+ @Override
+ public CompletableFuture getAsyncSSE(
+ Consumer messageConsumer,
+ Consumer errorConsumer,
+ Runnable completeRunnable) throws IOException, InterruptedException {
+
+ HttpRequest request = webClient.getAbs(url);
+ return executeAsyncSSE(request, headers, null, messageConsumer, errorConsumer, completeRunnable);
+ }
+ }
+
+ private class VertxPostBuilder extends VertxBuilder implements A2AHttpClient.PostBuilder {
+
+ private String body = "";
+
+ @Override
+ public PostBuilder body(String body) {
+ this.body = body;
+ return self();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ *
+ * Implementation Note: This method blocks the calling thread until
+ * the asynchronous HTTP request completes. The underlying Vert.x operation executes
+ * asynchronously on the Vert.x event loop.
+ *
+ * @throws IOException if the request fails, including:
+ *
+ * - Network errors (connection refused, timeout, etc.)
+ * - HTTP 401 Unauthorized - with message from {@link A2AErrorMessages#AUTHENTICATION_FAILED}
+ * - HTTP 403 Forbidden - with message from {@link A2AErrorMessages#AUTHORIZATION_FAILED}
+ *
+ * @throws InterruptedException if the thread is interrupted while waiting
+ */
+ @Override
+ public A2AHttpResponse post() throws IOException, InterruptedException {
+ Buffer bodyBuffer = Buffer.buffer(body, StandardCharsets.UTF_8.name());
+ return executeSyncRequest(webClient.postAbs(url), headers, bodyBuffer);
+ }
+
+ @Override
+ public CompletableFuture postAsyncSSE(
+ Consumer messageConsumer,
+ Consumer errorConsumer,
+ Runnable completeRunnable) throws IOException, InterruptedException {
+
+ HttpRequest request = webClient.postAbs(url);
+ Buffer bodyBuffer = Buffer.buffer(body, StandardCharsets.UTF_8.name());
+ return executeAsyncSSE(request, headers, bodyBuffer, messageConsumer, errorConsumer, completeRunnable);
+ }
+ }
+
+ private class VertxDeleteBuilder extends VertxBuilder implements A2AHttpClient.DeleteBuilder {
+
+ /**
+ * {@inheritDoc}
+ *
+ *
+ * Implementation Note: This method blocks the calling thread until
+ * the asynchronous HTTP request completes. The underlying Vert.x operation executes
+ * asynchronously on the Vert.x event loop.
+ *
+ * @throws IOException if the request fails, including:
+ *
+ * - Network errors (connection refused, timeout, etc.)
+ * - HTTP 401 Unauthorized - with message from {@link A2AErrorMessages#AUTHENTICATION_FAILED}
+ * - HTTP 403 Forbidden - with message from {@link A2AErrorMessages#AUTHORIZATION_FAILED}
+ *
+ * @throws InterruptedException if the thread is interrupted while waiting
+ */
+ @Override
+ public A2AHttpResponse delete() throws IOException, InterruptedException {
+ return executeSyncRequest(webClient.deleteAbs(url), headers, null);
+ }
+ }
+
+ private record VertxHttpResponse(int status, String body) implements A2AHttpResponse {
+
+ @Override
+ public int status() {
+ return status;
+ }
+
+ @Override
+ public boolean success() {
+ return status >= HTTP_OK && status < HTTP_MULT_CHOICE;
+ }
+
+ @Override
+ public String body() {
+ return body;
+ }
+ }
+}
diff --git a/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClientProvider.java b/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClientProvider.java
new file mode 100644
index 000000000..e802e1df1
--- /dev/null
+++ b/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClientProvider.java
@@ -0,0 +1,59 @@
+package io.a2a.client.http;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Service provider for {@link VertxA2AHttpClient}.
+ *
+ *
+ * This provider has a higher priority (100) than the JDK implementation and will be
+ * preferred when the Vert.x dependencies are available on the classpath.
+ *
+ *
+ * If Vert.x classes are not available at runtime, this provider will check for their
+ * presence and throw an {@link IllegalStateException} when attempting to create a client.
+ * The ServiceLoader mechanism will skip this provider and fall back to the JDK implementation.
+ */
+public final class VertxA2AHttpClientProvider implements A2AHttpClientProvider {
+
+ private static final boolean VERTX_AVAILABLE = isVertxAvailable();
+ private static final Logger log = Logger.getLogger(VertxA2AHttpClientProvider.class.getName());
+
+ private static boolean isVertxAvailable() {
+ try {
+ Class.forName("io.vertx.core.Vertx");
+ Class.forName("io.vertx.ext.web.client.WebClient");
+ return true;
+ } catch (ClassNotFoundException ex) {
+ Logger.getLogger(VertxA2AHttpClientProvider.class.getName()).log(Level.FINE, "Vert.x classes are not available on the classpath. Falling back to other providers.", ex);
+ return false;
+ }
+ }
+
+ @Override
+ public A2AHttpClient create() {
+ if (!VERTX_AVAILABLE) {
+ throw new IllegalStateException(
+ "Vert.x classes are not available on the classpath. "
+ + "Add io.vertx:vertx-web-client dependency or use the JDK HTTP client implementation.");
+ }
+
+ try {
+ Class> clientClass = Class.forName("io.a2a.client.http.VertxA2AHttpClient");
+ return (A2AHttpClient) clientClass.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to create VertxA2AHttpClient instance", e);
+ }
+ }
+
+ @Override
+ public int priority() {
+ return VERTX_AVAILABLE ? 100 : -1; // Higher priority when available, negative when not
+ }
+
+ @Override
+ public String name() {
+ return "vertx";
+ }
+}
diff --git a/extras/http-client-vertx/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider b/extras/http-client-vertx/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider
new file mode 100644
index 000000000..9d6a67c3b
--- /dev/null
+++ b/extras/http-client-vertx/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider
@@ -0,0 +1 @@
+io.a2a.client.http.VertxA2AHttpClientProvider
diff --git a/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientFactoryTest.java b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientFactoryTest.java
new file mode 100644
index 000000000..2ac3eac94
--- /dev/null
+++ b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientFactoryTest.java
@@ -0,0 +1,66 @@
+package io.a2a.client.http;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+public class VertxA2AHttpClientFactoryTest {
+
+ @Test
+ public void testCreateReturnsVertxClient() {
+ // When both JDK and Vertx are on classpath, Vertx should be preferred due to higher priority
+ A2AHttpClient client = A2AHttpClientFactory.create();
+ assertNotNull(client);
+ assertInstanceOf(VertxA2AHttpClient.class, client,
+ "Factory should return VertxA2AHttpClient when Vertx is available");
+ // Clean up
+ if (client instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) client).close();
+ } catch (Exception e) {
+ fail("Failed to close client: " + e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testCreateWithVertxProviderName() {
+ A2AHttpClient client = A2AHttpClientFactory.create("vertx");
+ assertNotNull(client);
+ assertInstanceOf(VertxA2AHttpClient.class, client,
+ "Factory should return VertxA2AHttpClient when 'vertx' provider is requested");
+ // Clean up
+ if (client instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) client).close();
+ } catch (Exception e) {
+ fail("Failed to close client: " + e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testVertxClientIsUsable() {
+ A2AHttpClient client = A2AHttpClientFactory.create("vertx");
+ assertNotNull(client);
+
+ // Verify we can create builders
+ A2AHttpClient.GetBuilder getBuilder = client.createGet();
+ assertNotNull(getBuilder, "Should be able to create GET builder");
+
+ A2AHttpClient.PostBuilder postBuilder = client.createPost();
+ assertNotNull(postBuilder, "Should be able to create POST builder");
+
+ A2AHttpClient.DeleteBuilder deleteBuilder = client.createDelete();
+ assertNotNull(deleteBuilder, "Should be able to create DELETE builder");
+
+ // Clean up
+ if (client instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) client).close();
+ } catch (Exception e) {
+ fail("Failed to close client: " + e.getMessage());
+ }
+ }
+ }
+}
diff --git a/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientIntegrationTest.java b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientIntegrationTest.java
new file mode 100644
index 000000000..f945aecde
--- /dev/null
+++ b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientIntegrationTest.java
@@ -0,0 +1,212 @@
+package io.a2a.client.http;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+import io.a2a.common.A2AErrorMessages;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.integration.ClientAndServer;
+
+public class VertxA2AHttpClientIntegrationTest {
+
+ private ClientAndServer mockServer;
+ private VertxA2AHttpClient client;
+
+ @BeforeEach
+ public void setup() {
+ mockServer = ClientAndServer.startClientAndServer(0); // Use random port
+ client = new VertxA2AHttpClient();
+ }
+
+ @AfterEach
+ public void teardown() {
+ if (client != null) {
+ client.close();
+ }
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ }
+
+ private String getBaseUrl() {
+ return "http://localhost:" + mockServer.getPort();
+ }
+
+ @Test
+ public void testGetRequestSuccess() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/test"))
+ .respond(response().withStatusCode(200).withBody("success"));
+
+ A2AHttpResponse response = client.createGet()
+ .url(getBaseUrl() + "/test")
+ .get();
+
+ assertEquals(200, response.status());
+ assertTrue(response.success());
+ assertEquals("success", response.body());
+ }
+
+ @Test
+ public void testPostRequestSuccess() throws Exception {
+ mockServer
+ .when(request()
+ .withMethod("POST")
+ .withPath("/test")
+ .withBody("{\"key\":\"value\"}"))
+ .respond(response().withStatusCode(201).withBody("created"));
+
+ A2AHttpResponse response = client.createPost()
+ .url(getBaseUrl() + "/test")
+ .body("{\"key\":\"value\"}")
+ .post();
+
+ assertEquals(201, response.status());
+ assertTrue(response.success());
+ assertEquals("created", response.body());
+ }
+
+ @Test
+ public void testDeleteRequestSuccess() throws Exception {
+ mockServer
+ .when(request().withMethod("DELETE").withPath("/test"))
+ .respond(response().withStatusCode(204));
+
+ A2AHttpResponse response = client.createDelete()
+ .url(getBaseUrl() + "/test")
+ .delete();
+
+ assertEquals(204, response.status());
+ assertTrue(response.success());
+ }
+
+ @Test
+ public void test401AuthenticationErrorOnGet() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/test"))
+ .respond(response().withStatusCode(401));
+
+ Exception exception = assertThrows(java.io.IOException.class, () -> {
+ client.createGet()
+ .url(getBaseUrl() + "/test")
+ .get();
+ });
+
+ assertEquals(A2AErrorMessages.AUTHENTICATION_FAILED, exception.getMessage());
+ }
+
+ @Test
+ public void test403AuthorizationErrorOnGet() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/test"))
+ .respond(response().withStatusCode(403));
+
+ Exception exception = assertThrows(java.io.IOException.class, () -> {
+ client.createGet()
+ .url(getBaseUrl() + "/test")
+ .get();
+ });
+
+ assertEquals(A2AErrorMessages.AUTHORIZATION_FAILED, exception.getMessage());
+ }
+
+ @Test
+ public void test401AuthenticationErrorOnPost() throws Exception {
+ mockServer
+ .when(request().withMethod("POST").withPath("/test"))
+ .respond(response().withStatusCode(401));
+
+ Exception exception = assertThrows(java.io.IOException.class, () -> {
+ client.createPost()
+ .url(getBaseUrl() + "/test")
+ .body("{}")
+ .post();
+ });
+
+ assertEquals(A2AErrorMessages.AUTHENTICATION_FAILED, exception.getMessage());
+ }
+
+ @Test
+ public void test403AuthorizationErrorOnPost() throws Exception {
+ mockServer
+ .when(request().withMethod("POST").withPath("/test"))
+ .respond(response().withStatusCode(403));
+
+ Exception exception = assertThrows(java.io.IOException.class, () -> {
+ client.createPost()
+ .url(getBaseUrl() + "/test")
+ .body("{}")
+ .post();
+ });
+
+ assertEquals(A2AErrorMessages.AUTHORIZATION_FAILED, exception.getMessage());
+ }
+
+ @Test
+ public void test401AuthenticationErrorOnDelete() throws Exception {
+ mockServer
+ .when(request().withMethod("DELETE").withPath("/test"))
+ .respond(response().withStatusCode(401));
+
+ Exception exception = assertThrows(java.io.IOException.class, () -> {
+ client.createDelete()
+ .url(getBaseUrl() + "/test")
+ .delete();
+ });
+
+ assertEquals(A2AErrorMessages.AUTHENTICATION_FAILED, exception.getMessage());
+ }
+
+ @Test
+ public void testHeaderPropagation() throws Exception {
+ mockServer
+ .when(request()
+ .withMethod("GET")
+ .withPath("/test")
+ .withHeader("Authorization", "Bearer token")
+ .withHeader("X-Custom-Header", "custom-value"))
+ .respond(response().withStatusCode(200).withBody("ok"));
+
+ A2AHttpResponse response = client.createGet()
+ .url(getBaseUrl() + "/test")
+ .addHeader("Authorization", "Bearer token")
+ .addHeader("X-Custom-Header", "custom-value")
+ .get();
+
+ assertEquals(200, response.status());
+ assertEquals("ok", response.body());
+ }
+
+ @Test
+ public void testNonSuccessStatusCode() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/test"))
+ .respond(response().withStatusCode(500).withBody("Internal Server Error"));
+
+ A2AHttpResponse response = client.createGet()
+ .url(getBaseUrl() + "/test")
+ .get();
+
+ assertEquals(500, response.status());
+ assertFalse(response.success());
+ assertEquals("Internal Server Error", response.body());
+ }
+
+ @Test
+ public void test404NotFound() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/test"))
+ .respond(response().withStatusCode(404).withBody("Not Found"));
+
+ A2AHttpResponse response = client.createGet()
+ .url(getBaseUrl() + "/test")
+ .get();
+
+ assertEquals(404, response.status());
+ assertFalse(response.success());
+ assertEquals("Not Found", response.body());
+ }
+}
diff --git a/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientSSETest.java b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientSSETest.java
new file mode 100644
index 000000000..64b1dec54
--- /dev/null
+++ b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientSSETest.java
@@ -0,0 +1,253 @@
+package io.a2a.client.http;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+import io.a2a.common.A2AErrorMessages;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.integration.ClientAndServer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class VertxA2AHttpClientSSETest {
+
+ private ClientAndServer mockServer;
+ private VertxA2AHttpClient client;
+
+ @BeforeEach
+ public void setup() {
+ mockServer = ClientAndServer.startClientAndServer(0); // Use random port
+ client = new VertxA2AHttpClient();
+ }
+
+ @AfterEach
+ public void teardown() {
+ if (client != null) {
+ client.close();
+ }
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ }
+
+ private String getBaseUrl() {
+ return "http://localhost:" + mockServer.getPort();
+ }
+
+ @Test
+ public void testGetAsyncSSE() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/sse"))
+ .respond(response()
+ .withStatusCode(200)
+ .withHeader("Content-Type", "text/event-stream")
+ .withBody("data: event1\n\ndata: event2\n\ndata: event3\n\n"));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ List events = new ArrayList<>();
+ AtomicReference error = new AtomicReference<>();
+
+ CompletableFuture future = client.createGet()
+ .url(getBaseUrl() + "/sse")
+ .getAsyncSSE(
+ events::add,
+ error::set,
+ latch::countDown
+ );
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS), "Expected completion handler to be called");
+ assertNull(error.get(), "Expected no errors");
+ assertFalse(events.isEmpty(), "Expected to receive events");
+ assertTrue(events.contains("event1"), "Expected event1");
+ assertTrue(events.contains("event2"), "Expected event2");
+ assertTrue(events.contains("event3"), "Expected event3");
+ }
+
+ @Test
+ public void testPostAsyncSSE() throws Exception {
+ mockServer
+ .when(request()
+ .withMethod("POST")
+ .withPath("/sse")
+ .withBody("{\"subscribe\":true}"))
+ .respond(response()
+ .withStatusCode(200)
+ .withHeader("Content-Type", "text/event-stream")
+ .withBody("data: message1\n\ndata: message2\n\n"));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ List events = new ArrayList<>();
+ AtomicReference error = new AtomicReference<>();
+
+ CompletableFuture future = client.createPost()
+ .url(getBaseUrl() + "/sse")
+ .body("{\"subscribe\":true}")
+ .postAsyncSSE(
+ events::add,
+ error::set,
+ latch::countDown
+ );
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS), "Expected completion handler to be called");
+ assertNull(error.get(), "Expected no errors");
+ assertFalse(events.isEmpty(), "Expected to receive events");
+ assertTrue(events.contains("message1"), "Expected message1");
+ assertTrue(events.contains("message2"), "Expected message2");
+ }
+
+ @Test
+ public void testSSEDataPrefixStripping() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/sse"))
+ .respond(response()
+ .withStatusCode(200)
+ .withHeader("Content-Type", "text/event-stream")
+ .withBody("data: content here\n\ndata:no space\n\ndata: extra spaces \n\n"));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ List events = new ArrayList<>();
+ AtomicReference error = new AtomicReference<>();
+
+ CompletableFuture future = client.createGet()
+ .url(getBaseUrl() + "/sse")
+ .getAsyncSSE(
+ events::add,
+ error::set,
+ latch::countDown
+ );
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertNull(error.get());
+ assertTrue(events.contains("content here"), "Should have stripped 'data: ' prefix");
+ assertTrue(events.contains("no space"), "Should handle 'data:' without space");
+ assertTrue(events.contains("extra spaces"), "Should trim whitespace");
+ }
+
+ @Test
+ public void testSSEAuthenticationError() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/sse"))
+ .respond(response().withStatusCode(401));
+
+ CountDownLatch errorLatch = new CountDownLatch(1);
+ AtomicReference error = new AtomicReference<>();
+ AtomicBoolean completed = new AtomicBoolean(false);
+
+ CompletableFuture future = client.createGet()
+ .url(getBaseUrl() + "/sse")
+ .getAsyncSSE(
+ msg -> {},
+ e -> {
+ error.set(e);
+ errorLatch.countDown();
+ },
+ () -> completed.set(true)
+ );
+
+ assertTrue(errorLatch.await(5, TimeUnit.SECONDS), "Expected error handler to be called");
+ assertNotNull(error.get(), "Expected an error");
+ assertTrue(error.get() instanceof IOException, "Expected IOException");
+ assertTrue(error.get().getMessage().contains("Authentication failed"),
+ "Expected authentication error message but got: " + error.get().getMessage());
+ assertFalse(completed.get(), "Should not call completion handler on error");
+ }
+
+ @Test
+ public void testSSEAuthorizationError() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/sse"))
+ .respond(response().withStatusCode(403));
+
+ CountDownLatch errorLatch = new CountDownLatch(1);
+ AtomicReference error = new AtomicReference<>();
+ AtomicBoolean completed = new AtomicBoolean(false);
+
+ CompletableFuture future = client.createGet()
+ .url(getBaseUrl() + "/sse")
+ .getAsyncSSE(
+ msg -> {},
+ e -> {
+ error.set(e);
+ errorLatch.countDown();
+ },
+ () -> completed.set(true)
+ );
+
+ assertTrue(errorLatch.await(5, TimeUnit.SECONDS), "Expected error handler to be called");
+ assertNotNull(error.get(), "Expected an error");
+ assertTrue(error.get() instanceof IOException, "Expected IOException");
+ assertTrue(error.get().getMessage().contains("Authorization failed"),
+ "Expected authorization error message but got: " + error.get().getMessage());
+ assertFalse(completed.get(), "Should not call completion handler on error");
+ }
+
+ @Test
+ public void testSSEEmptyLinesIgnored() throws Exception {
+ mockServer
+ .when(request().withMethod("GET").withPath("/sse"))
+ .respond(response()
+ .withStatusCode(200)
+ .withHeader("Content-Type", "text/event-stream")
+ .withBody("data: first\n\n\n\ndata: second\n\ndata: \n\ndata: third\n\n"));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ List events = new ArrayList<>();
+ AtomicReference error = new AtomicReference<>();
+
+ CompletableFuture future = client.createGet()
+ .url(getBaseUrl() + "/sse")
+ .getAsyncSSE(
+ events::add,
+ error::set,
+ latch::countDown
+ );
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertNull(error.get());
+ assertEquals(3, events.size(), "Should have received 3 non-empty events");
+ assertTrue(events.contains("first"));
+ assertTrue(events.contains("second"));
+ assertTrue(events.contains("third"));
+ }
+
+ @Test
+ public void testSSEHeaderPropagation() throws Exception {
+ mockServer
+ .when(request()
+ .withMethod("GET")
+ .withPath("/sse")
+ .withHeader("Accept", "text/event-stream")
+ .withHeader("Authorization", "Bearer token"))
+ .respond(response()
+ .withStatusCode(200)
+ .withHeader("Content-Type", "text/event-stream")
+ .withBody("data: authenticated\n\n"));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ List events = new ArrayList<>();
+ AtomicReference error = new AtomicReference<>();
+
+ CompletableFuture future = client.createGet()
+ .url(getBaseUrl() + "/sse")
+ .addHeader("Authorization", "Bearer token")
+ .getAsyncSSE(
+ events::add,
+ error::set,
+ latch::countDown
+ );
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertNull(error.get());
+ assertTrue(events.contains("authenticated"));
+ }
+}
diff --git a/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientTest.java b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientTest.java
new file mode 100644
index 000000000..55e9afd94
--- /dev/null
+++ b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientTest.java
@@ -0,0 +1,94 @@
+package io.a2a.client.http;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import io.vertx.core.Vertx;
+import org.junit.jupiter.api.Test;
+
+public class VertxA2AHttpClientTest {
+
+ @Test
+ public void testNoArgsConstructor() {
+ VertxA2AHttpClient client = new VertxA2AHttpClient();
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ public void testVertxParameterConstructor() {
+ Vertx vertx = Vertx.vertx();
+ VertxA2AHttpClient client = new VertxA2AHttpClient(vertx);
+ assertNotNull(client);
+ client.close();
+ vertx.close();
+ }
+
+ @Test
+ public void testVertxParameterConstructorNullThrows() {
+ assertThrows(NullPointerException.class, () -> {
+ new VertxA2AHttpClient(null);
+ });
+ }
+
+ @Test
+ public void testCreateGet() {
+ try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ A2AHttpClient.GetBuilder builder = client.createGet();
+ assertNotNull(builder);
+ }
+ }
+
+ @Test
+ public void testCreatePost() {
+ try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ A2AHttpClient.PostBuilder builder = client.createPost();
+ assertNotNull(builder);
+ }
+ }
+
+ @Test
+ public void testCreateDelete() {
+ try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ A2AHttpClient.DeleteBuilder builder = client.createDelete();
+ assertNotNull(builder);
+ }
+ }
+
+ @Test
+ public void testBuilderUrlSetting() {
+ try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ A2AHttpClient.GetBuilder builder = client.createGet();
+ A2AHttpClient.GetBuilder result = builder.url("https://example.com");
+ assertSame(builder, result, "Builder should return itself for method chaining");
+ }
+ }
+
+ @Test
+ public void testBuilderHeaderSetting() {
+ try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ A2AHttpClient.GetBuilder builder = client.createGet();
+ A2AHttpClient.GetBuilder result = builder.addHeader("Accept", "application/json");
+ assertSame(builder, result, "Builder should return itself for method chaining");
+ }
+ }
+
+ @Test
+ public void testBuilderMethodChaining() {
+ try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ A2AHttpClient.GetBuilder builder = client.createGet()
+ .url("https://example.com")
+ .addHeader("Accept", "application/json")
+ .addHeader("Authorization", "Bearer token");
+ assertNotNull(builder);
+ }
+ }
+
+ @Test
+ public void testPostBuilderBody() {
+ try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
+ A2AHttpClient.PostBuilder builder = client.createPost();
+ A2AHttpClient.PostBuilder result = builder.body("{\"key\":\"value\"}");
+ assertSame(builder, result, "Builder should return itself for method chaining");
+ }
+ }
+}
diff --git a/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java b/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java
index 6d2e5f55f..b4847cd40 100644
--- a/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java
+++ b/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java
@@ -27,18 +27,18 @@ public class A2ACardResolver {
private static final String DEFAULT_AGENT_CARD_PATH = "/.well-known/agent-card.json";
/**
- * Get the agent card for an A2A agent. The {@code JdkA2AHttpClient} will be used to fetch the agent card.
+ * Get the agent card for an A2A agent. An HTTP client will be auto-selected via {@link A2AHttpClientFactory}.
*
* @param baseUrl the base URL for the agent whose agent card we want to retrieve, must not be null
* @throws A2AClientError if the URL for the agent is invalid
* @throws IllegalArgumentException if baseUrl is null
*/
public A2ACardResolver(String baseUrl) throws A2AClientError {
- this(new JdkA2AHttpClient(), baseUrl, null, null);
+ this(A2AHttpClientFactory.create(), baseUrl, null, null);
}
/**
- * Get the agent card for an A2A agent. The {@code JdkA2AHttpClient} will be used to fetch the agent card.
+ * Get the agent card for an A2A agent. An HTTP client will be auto-selected via {@link A2AHttpClientFactory}.
*
* @param baseUrl the base URL for the agent whose agent card we want to retrieve, must not be null
* @param tenant the tenant path to use when fetching the agent card, may be null for no tenant
@@ -46,7 +46,7 @@ public A2ACardResolver(String baseUrl) throws A2AClientError {
* @throws IllegalArgumentException if baseUrl is null
*/
public A2ACardResolver(String baseUrl, @Nullable String tenant) throws A2AClientError {
- this(new JdkA2AHttpClient(), baseUrl, tenant, null);
+ this(A2AHttpClientFactory.create(), baseUrl, tenant, null);
}
/**
diff --git a/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java b/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java
index 8d7f2d0f9..9e4f5f705 100644
--- a/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java
+++ b/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java
@@ -9,6 +9,8 @@ public interface A2AHttpClient {
String CONTENT_TYPE= "Content-Type";
String APPLICATION_JSON= "application/json";
+ String ACCEPT = "Accept";
+ String EVENT_STREAM = "text/event-stream";
GetBuilder createGet();
diff --git a/http-client/src/main/java/io/a2a/client/http/A2AHttpClientFactory.java b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientFactory.java
new file mode 100644
index 000000000..05bac3112
--- /dev/null
+++ b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientFactory.java
@@ -0,0 +1,89 @@
+package io.a2a.client.http;
+
+import java.util.Comparator;
+import java.util.ServiceLoader;
+import java.util.stream.StreamSupport;
+
+/**
+ * Factory for creating {@link A2AHttpClient} instances using the ServiceLoader mechanism.
+ *
+ *
+ * This factory discovers available {@link A2AHttpClientProvider} implementations at runtime
+ * and selects the one with the highest priority. If no providers are found, it falls back
+ * to creating a {@link JdkA2AHttpClient}.
+ *
+ *
Usage
+ * {@code
+ * // Get the default client (highest priority provider)
+ * A2AHttpClient client = A2AHttpClientFactory.create();
+ *
+ * // Use with try-with-resources if the client implements AutoCloseable
+ * try (A2AHttpClient client = A2AHttpClientFactory.create()) {
+ * A2AHttpResponse response = client.createGet()
+ * .url("https://example.com")
+ * .get();
+ * }
+ * }
+ *
+ * Priority System
+ *
+ * Providers are selected based on their priority value (higher is better):
+ *
+ * - JdkA2AHttpClient: priority 0 (fallback)
+ * - VertxA2AHttpClient: priority 100 (preferred when available)
+ *
+ *
+ * Custom Providers
+ *
+ * To add a custom provider, implement {@link A2AHttpClientProvider} and register it
+ * in {@code META-INF/services/io.a2a.client.http.A2AHttpClientProvider}.
+ */
+public final class A2AHttpClientFactory {
+
+ private A2AHttpClientFactory() {
+ // Utility class
+ }
+
+ /**
+ * Creates a new A2AHttpClient instance using the highest priority provider available.
+ *
+ *
+ * This method uses the ServiceLoader mechanism to discover providers at runtime.
+ * If no providers are found, it falls back to creating a {@link JdkA2AHttpClient}.
+ *
+ * @return a new A2AHttpClient instance
+ */
+ public static A2AHttpClient create() {
+ ServiceLoader loader = ServiceLoader.load(A2AHttpClientProvider.class);
+
+ return StreamSupport.stream(loader.spliterator(), false)
+ .max(Comparator.comparingInt(A2AHttpClientProvider::priority))
+ .map(A2AHttpClientProvider::create)
+ .orElseGet(JdkA2AHttpClient::new);
+ }
+
+ /**
+ * Creates a new A2AHttpClient instance using a specific provider by name.
+ *
+ *
+ * This method is useful for testing or when you need to force a specific implementation.
+ *
+ * @param providerName the name of the provider to use
+ * @return a new A2AHttpClient instance from the specified provider
+ * @throws IllegalArgumentException if no provider with the given name is found
+ */
+ public static A2AHttpClient create(String providerName) {
+ if (providerName == null || providerName.isEmpty()) {
+ throw new IllegalArgumentException("Provider name must not be null or empty");
+ }
+
+ ServiceLoader loader = ServiceLoader.load(A2AHttpClientProvider.class);
+
+ return StreamSupport.stream(loader.spliterator(), false)
+ .filter(provider -> providerName.equals(provider.name()))
+ .findFirst()
+ .map(A2AHttpClientProvider::create)
+ .orElseThrow(() -> new IllegalArgumentException(
+ "No A2AHttpClientProvider found with name: " + providerName));
+ }
+}
diff --git a/http-client/src/main/java/io/a2a/client/http/A2AHttpClientProvider.java b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientProvider.java
new file mode 100644
index 000000000..8c33c66e6
--- /dev/null
+++ b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientProvider.java
@@ -0,0 +1,48 @@
+package io.a2a.client.http;
+
+/**
+ * Service provider interface for creating {@link A2AHttpClient} instances.
+ *
+ *
+ * Implementations of this interface can be registered via the Java ServiceLoader
+ * mechanism. The {@link A2AHttpClientFactory} will discover and use the highest
+ * priority provider available.
+ *
+ *
+ * To register a provider, create a file named
+ * {@code META-INF/services/io.a2a.client.http.A2AHttpClientProvider} containing
+ * the fully qualified class name of your provider implementation.
+ */
+public interface A2AHttpClientProvider {
+
+ /**
+ * Creates a new instance of an A2AHttpClient.
+ *
+ * @return a new A2AHttpClient instance
+ */
+ A2AHttpClient create();
+
+ /**
+ * Returns the priority of this provider. Higher priority providers are
+ * preferred over lower priority ones.
+ *
+ *
+ * Default priorities:
+ *
+ * - JdkA2AHttpClient: 0 (fallback)
+ * - VertxA2AHttpClient: 100 (preferred when available)
+ *
+ *
+ * @return the priority value (higher is better)
+ */
+ default int priority() {
+ return 0;
+ }
+
+ /**
+ * Returns the name of this provider for logging and debugging purposes.
+ *
+ * @return the provider name
+ */
+ String name();
+}
diff --git a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java
index 9b8003741..d5bc68651 100644
--- a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java
+++ b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java
@@ -199,7 +199,7 @@ private class JdkGetBuilder extends JdkBuilder implements A2AHttpCli
private HttpRequest.Builder createRequestBuilder(boolean SSE) throws IOException {
HttpRequest.Builder builder = super.createRequestBuilder().GET();
if (SSE) {
- builder.header("Accept", "text/event-stream");
+ builder.header(ACCEPT, EVENT_STREAM);
}
return builder;
}
@@ -250,7 +250,7 @@ private HttpRequest.Builder createRequestBuilder(boolean SSE) throws IOException
HttpRequest.Builder builder = super.createRequestBuilder()
.POST(HttpRequest.BodyPublishers.ofString(body, StandardCharsets.UTF_8));
if (SSE) {
- builder.header("Accept", "text/event-stream");
+ builder.header(ACCEPT, EVENT_STREAM);
}
return builder;
}
diff --git a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClientProvider.java b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClientProvider.java
new file mode 100644
index 000000000..1bf388286
--- /dev/null
+++ b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClientProvider.java
@@ -0,0 +1,27 @@
+package io.a2a.client.http;
+
+/**
+ * Service provider for {@link JdkA2AHttpClient}.
+ *
+ *
+ * This provider has the lowest priority (0) and serves as the fallback implementation
+ * when no other providers are available. The JDK HTTP client is always available as it
+ * uses only standard Java libraries.
+ */
+public final class JdkA2AHttpClientProvider implements A2AHttpClientProvider {
+
+ @Override
+ public A2AHttpClient create() {
+ return new JdkA2AHttpClient();
+ }
+
+ @Override
+ public int priority() {
+ return 0; // Lowest priority - fallback
+ }
+
+ @Override
+ public String name() {
+ return "jdk";
+ }
+}
diff --git a/http-client/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider b/http-client/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider
new file mode 100644
index 000000000..78dbb361e
--- /dev/null
+++ b/http-client/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider
@@ -0,0 +1 @@
+io.a2a.client.http.JdkA2AHttpClientProvider
diff --git a/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryTest.java b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryTest.java
new file mode 100644
index 000000000..72de52230
--- /dev/null
+++ b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryTest.java
@@ -0,0 +1,88 @@
+package io.a2a.client.http;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+public class A2AHttpClientFactoryTest {
+
+ @Test
+ public void testCreateReturnsNonNull() {
+ A2AHttpClient client = A2AHttpClientFactory.create();
+ assertNotNull(client, "Factory should return a non-null client");
+ }
+
+ @Test
+ public void testCreateReturnsJdkClient() {
+ // When Vertx is not on classpath, JDK client should be used
+ A2AHttpClient client = A2AHttpClientFactory.create();
+ assertNotNull(client);
+ assertInstanceOf(JdkA2AHttpClient.class, client,
+ "Factory should return JdkA2AHttpClient when Vertx is not available");
+ }
+
+ @Test
+ public void testCreateWithJdkProviderName() {
+ A2AHttpClient client = A2AHttpClientFactory.create("jdk");
+ assertNotNull(client);
+ assertInstanceOf(JdkA2AHttpClient.class, client,
+ "Factory should return JdkA2AHttpClient when 'jdk' provider is requested");
+ }
+
+ @Test
+ public void testCreateWithVertxProviderNameThrows() {
+ // Vertx provider is not available in the core module
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> A2AHttpClientFactory.create("vertx"),
+ "Factory should throw IllegalArgumentException when vertx provider is not found"
+ );
+ assertTrue(exception.getMessage().contains("vertx"),
+ "Exception message should mention the provider name");
+ }
+
+ @Test
+ public void testCreateWithInvalidProviderNameThrows() {
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> A2AHttpClientFactory.create("nonexistent"),
+ "Factory should throw IllegalArgumentException for unknown provider"
+ );
+ assertTrue(exception.getMessage().contains("nonexistent"),
+ "Exception message should mention the provider name");
+ }
+
+ @Test
+ public void testCreateWithNullProviderNameThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> A2AHttpClientFactory.create(null),
+ "Factory should throw IllegalArgumentException for null provider name"
+ );
+ }
+
+ @Test
+ public void testCreateWithEmptyProviderNameThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> A2AHttpClientFactory.create(""),
+ "Factory should throw IllegalArgumentException for empty provider name"
+ );
+ }
+
+ @Test
+ public void testCreatedClientIsUsable() {
+ A2AHttpClient client = A2AHttpClientFactory.create();
+ assertNotNull(client);
+
+ // Verify we can create builders
+ A2AHttpClient.GetBuilder getBuilder = client.createGet();
+ assertNotNull(getBuilder, "Should be able to create GET builder");
+
+ A2AHttpClient.PostBuilder postBuilder = client.createPost();
+ assertNotNull(postBuilder, "Should be able to create POST builder");
+
+ A2AHttpClient.DeleteBuilder deleteBuilder = client.createDelete();
+ assertNotNull(deleteBuilder, "Should be able to create DELETE builder");
+ }
+}
diff --git a/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryUsageExample.java b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryUsageExample.java
new file mode 100644
index 000000000..86da8c8cf
--- /dev/null
+++ b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryUsageExample.java
@@ -0,0 +1,122 @@
+package io.a2a.client.http;
+
+import java.io.IOException;
+
+/**
+ * Example demonstrating how to use {@link A2AHttpClientFactory} to obtain HTTP client instances.
+ *
+ *
+ * This class shows various usage patterns for the factory-based approach to creating
+ * A2AHttpClient instances.
+ */
+public class A2AHttpClientFactoryUsageExample {
+
+ /**
+ * Example 1: Basic usage with automatic selection of best available client.
+ */
+ public void basicUsage() throws IOException, InterruptedException {
+ // The factory automatically selects the best available implementation:
+ // - VertxA2AHttpClient (priority 100) if Vert.x is on the classpath
+ // - JdkA2AHttpClient (priority 0) as fallback
+ A2AHttpClient client = A2AHttpClientFactory.create();
+
+ try {
+ A2AHttpResponse response = client.createGet()
+ .url("https://api.example.com/data")
+ .addHeader("Accept", "application/json")
+ .get();
+
+ if (response.success()) {
+ System.out.println("Response: " + response.body());
+ }
+ } finally {
+ // Close if the client supports AutoCloseable (Vertx does, JDK doesn't)
+ if (client instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) client).close();
+ } catch (Exception e) {
+ // Handle close exception
+ }
+ }
+ }
+ }
+
+ /**
+ * Example 2: Try-with-resources pattern (recommended for AutoCloseable clients).
+ */
+ public void tryWithResourcesUsage() throws Exception {
+ A2AHttpClient client = A2AHttpClientFactory.create();
+
+ // Only use try-with-resources if the client is AutoCloseable
+ if (client instanceof AutoCloseable) {
+ try (AutoCloseable closeableClient = (AutoCloseable) client) {
+ A2AHttpResponse response = client.createPost()
+ .url("https://api.example.com/submit")
+ .addHeader("Content-Type", "application/json")
+ .body("{\"key\":\"value\"}")
+ .post();
+
+ System.out.println("Status: " + response.status());
+ }
+ } else {
+ // Non-closeable client, use normally
+ A2AHttpResponse response = client.createPost()
+ .url("https://api.example.com/submit")
+ .addHeader("Content-Type", "application/json")
+ .body("{\"key\":\"value\"}")
+ .post();
+
+ System.out.println("Status: " + response.status());
+ }
+ }
+
+ /**
+ * Example 3: Explicitly selecting a specific implementation.
+ */
+ public void specificProviderUsage() throws IOException, InterruptedException {
+ // Force the use of JDK client even if Vert.x is available
+ A2AHttpClient jdkClient = A2AHttpClientFactory.create("jdk");
+ A2AHttpResponse response = jdkClient.createGet()
+ .url("https://api.example.com/data")
+ .get();
+
+ System.out.println("Using JDK client: " + response.status());
+
+ // Or explicitly use Vert.x client
+ try (AutoCloseable vertxClient = (AutoCloseable) A2AHttpClientFactory.create("vertx")) {
+ A2AHttpResponse vertxResponse = ((A2AHttpClient) vertxClient).createGet()
+ .url("https://api.example.com/data")
+ .get();
+
+ System.out.println("Using Vert.x client: " + vertxResponse.status());
+ } catch (Exception e) {
+ // Handle exceptions
+ }
+ }
+
+ /**
+ * Example 4: Defensive programming - handling unknown implementations.
+ */
+ public void defensiveUsage() throws IOException, InterruptedException {
+ A2AHttpClient client = null;
+ try {
+ client = A2AHttpClientFactory.create();
+
+ A2AHttpResponse response = client.createGet()
+ .url("https://api.example.com/data")
+ .get();
+
+ System.out.println("Response: " + response.body());
+
+ } finally {
+ // Safely close if possible
+ if (client instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) client).close();
+ } catch (Exception e) {
+ System.err.println("Warning: Failed to close client: " + e.getMessage());
+ }
+ }
+ }
+ }
+}
diff --git a/http-client/src/test/java/io/a2a/client/http/A2AHttpClientProviderTest.java b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientProviderTest.java
new file mode 100644
index 000000000..b2e222517
--- /dev/null
+++ b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientProviderTest.java
@@ -0,0 +1,28 @@
+package io.a2a.client.http;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+public class A2AHttpClientProviderTest {
+
+ @Test
+ public void testJdkProviderCreatesClient() {
+ JdkA2AHttpClientProvider provider = new JdkA2AHttpClientProvider();
+ A2AHttpClient client = provider.create();
+ assertNotNull(client);
+ assertInstanceOf(JdkA2AHttpClient.class, client);
+ }
+
+ @Test
+ public void testJdkProviderPriority() {
+ JdkA2AHttpClientProvider provider = new JdkA2AHttpClientProvider();
+ assertEquals(0, provider.priority(), "JDK provider should have priority 0");
+ }
+
+ @Test
+ public void testJdkProviderName() {
+ JdkA2AHttpClientProvider provider = new JdkA2AHttpClientProvider();
+ assertEquals("jdk", provider.name(), "JDK provider name should be 'jdk'");
+ }
+}
diff --git a/pom.xml b/pom.xml
index 643b56e53..9a712913a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,7 @@
1.7.1
4.33.1
0.6.1
- 3.30.1
+ 3.30.6
5.5.1
2.0.17
1.5.18
@@ -119,6 +119,11 @@
a2a-java-sdk-http-client
${project.version}