diff --git a/boms/extras/pom.xml b/boms/extras/pom.xml index c06575595..809a3a6a9 100644 --- a/boms/extras/pom.xml +++ b/boms/extras/pom.xml @@ -34,6 +34,11 @@ a2a-java-extras-common ${project.version} + + ${project.groupId} + a2a-java-sdk-http-client-vertx + ${project.version} + ${project.groupId} a2a-java-extras-task-store-database-jpa diff --git a/boms/extras/src/it/extras-usage-test/pom.xml b/boms/extras/src/it/extras-usage-test/pom.xml index 5beeb50ea..cd8b897d9 100644 --- a/boms/extras/src/it/extras-usage-test/pom.xml +++ b/boms/extras/src/it/extras-usage-test/pom.xml @@ -44,6 +44,10 @@ io.github.a2asdk a2a-java-extras-common + + io.github.a2asdk + a2a-java-sdk-http-client-vertx + io.github.a2asdk a2a-java-extras-task-store-database-jpa diff --git a/client/base/pom.xml b/client/base/pom.xml index 05b772931..bd16914a6 100644 --- a/client/base/pom.xml +++ b/client/base/pom.xml @@ -22,6 +22,11 @@ ${project.groupId} a2a-java-sdk-http-client + + ${project.groupId} + a2a-java-sdk-http-client-vertx + provided + ${project.groupId} a2a-java-sdk-client-transport-spi diff --git a/client/base/src/main/java/io/a2a/A2A.java b/client/base/src/main/java/io/a2a/A2A.java index 02f212656..b04b85df4 100644 --- a/client/base/src/main/java/io/a2a/A2A.java +++ b/client/base/src/main/java/io/a2a/A2A.java @@ -6,7 +6,7 @@ import io.a2a.client.http.A2ACardResolver; import io.a2a.client.http.A2AHttpClient; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.spec.A2AClientError; import io.a2a.spec.A2AClientJSONError; import io.a2a.spec.AgentCard; @@ -286,7 +286,7 @@ private static Message toMessage(List> parts, Message.Role role, String * @see AgentCard */ public static AgentCard getAgentCard(String agentUrl) throws A2AClientError, A2AClientJSONError { - return getAgentCard(new JdkA2AHttpClient(), agentUrl); + return getAgentCard(A2AHttpClientFactory.create(), agentUrl); } /** @@ -357,7 +357,7 @@ public static AgentCard getAgentCard(A2AHttpClient httpClient, String agentUrl) * @throws io.a2a.spec.A2AClientJSONError if the response body cannot be decoded as JSON or validated against the AgentCard schema */ public static AgentCard getAgentCard(String agentUrl, String relativeCardPath, Map authHeaders) throws A2AClientError, A2AClientJSONError { - return getAgentCard(new JdkA2AHttpClient(), agentUrl, relativeCardPath, authHeaders); + return getAgentCard(A2AHttpClientFactory.create(), agentUrl, relativeCardPath, authHeaders); } /** diff --git a/client/base/src/test/java/io/a2a/client/ClientBuilderTest.java b/client/base/src/test/java/io/a2a/client/ClientBuilderTest.java index 8b9e4bba0..2c95c2619 100644 --- a/client/base/src/test/java/io/a2a/client/ClientBuilderTest.java +++ b/client/base/src/test/java/io/a2a/client/ClientBuilderTest.java @@ -6,7 +6,7 @@ import java.util.List; import io.a2a.client.config.ClientConfig; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.transport.grpc.GrpcTransport; import io.a2a.client.transport.grpc.GrpcTransportConfigBuilder; import io.a2a.client.transport.jsonrpc.JSONRPCTransport; @@ -89,7 +89,7 @@ public void shouldCreateClient_differentConfigurations() throws A2AClientExcepti Client client = Client .builder(card) .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfigBuilder()) - .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig(new JdkA2AHttpClient())) + .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig(A2AHttpClientFactory.create())) .build(); Assertions.assertNotNull(client); diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java index 69e291988..8c948c4a8 100644 --- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java +++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java @@ -23,8 +23,8 @@ import com.google.protobuf.MessageOrBuilder; import io.a2a.client.http.A2ACardResolver; import io.a2a.client.http.A2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.http.A2AHttpResponse; -import io.a2a.client.http.JdkA2AHttpClient; import io.a2a.client.transport.jsonrpc.sse.SSEEventListener; import io.a2a.client.transport.spi.ClientTransport; import io.a2a.client.transport.spi.interceptors.ClientCallContext; @@ -84,7 +84,7 @@ public JSONRPCTransport(AgentCard agentCard) { public JSONRPCTransport(@Nullable A2AHttpClient httpClient, @Nullable AgentCard agentCard, AgentInterface agentInterface, @Nullable List interceptors) { - this.httpClient = httpClient == null ? new JdkA2AHttpClient() : httpClient; + this.httpClient = httpClient == null ? A2AHttpClientFactory.create() : httpClient; this.agentCard = agentCard; this.agentInterface = agentInterface; this.interceptors = interceptors; diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfigBuilder.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfigBuilder.java index 9cd5fae5a..7f73b4c2d 100644 --- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfigBuilder.java +++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfigBuilder.java @@ -1,7 +1,7 @@ package io.a2a.client.transport.jsonrpc; import io.a2a.client.http.A2AHttpClient; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.transport.spi.ClientTransportConfigBuilder; import org.jspecify.annotations.Nullable; @@ -11,7 +11,7 @@ * This builder provides a fluent API for configuring the JSON-RPC transport protocol. * All configuration options are optional - if not specified, sensible defaults are used: *
    - *
  • HTTP client: {@link JdkA2AHttpClient} (JDK's built-in HTTP client)
  • + *
  • HTTP client: Auto-selected via {@link A2AHttpClientFactory} (prefers Vert.x, falls back to JDK)
  • *
  • Interceptors: None
  • *
*

@@ -78,7 +78,7 @@ public class JSONRPCTransportConfigBuilder extends ClientTransportConfigBuilder< *

  • Custom header handling
  • * *

    - * If not specified, the default {@link JdkA2AHttpClient} is used. + * If not specified, a client is auto-selected via {@link A2AHttpClientFactory}. *

    * Example: *

    {@code
    @@ -101,16 +101,16 @@ public JSONRPCTransportConfigBuilder httpClient(A2AHttpClient httpClient) {
         /**
          * Build the JSON-RPC transport configuration.
          * 

    - * If no HTTP client was configured, the default {@link JdkA2AHttpClient} is used. + * If no HTTP client was configured, one is auto-selected via {@link A2AHttpClientFactory}. * Any configured interceptors are transferred to the configuration. * * @return the configured JSON-RPC transport configuration */ @Override public JSONRPCTransportConfig build() { - // No HTTP client provided, fallback to the default one (JDK-based implementation) + // No HTTP client provided, use factory to get best available implementation if (httpClient == null) { - httpClient = new JdkA2AHttpClient(); + httpClient = A2AHttpClientFactory.create(); } JSONRPCTransportConfig config = new JSONRPCTransportConfig(httpClient); diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java index 5e9266a50..2dc1a9733 100644 --- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java +++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java @@ -1,6 +1,6 @@ package io.a2a.client.transport.jsonrpc; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.transport.spi.ClientTransportProvider; import io.a2a.spec.A2AClientException; import io.a2a.spec.AgentCard; @@ -14,7 +14,7 @@ public class JSONRPCTransportProvider implements ClientTransportProvider interceptors) { - this.httpClient = httpClient == null ? new JdkA2AHttpClient() : httpClient; + this.httpClient = httpClient == null ? A2AHttpClientFactory.create() : httpClient; this.agentCard = agentCard; this.agentInterface = agentInterface; this.interceptors = interceptors; diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java index 855de0ca6..257836199 100644 --- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java +++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java @@ -1,7 +1,7 @@ package io.a2a.client.transport.rest; import io.a2a.client.http.A2AHttpClient; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.transport.spi.ClientTransportConfigBuilder; import org.jspecify.annotations.Nullable; @@ -11,7 +11,7 @@ * This builder provides a fluent API for configuring the REST transport protocol. * All configuration options are optional - if not specified, sensible defaults are used: *

      - *
    • HTTP client: {@link JdkA2AHttpClient} (JDK's built-in HTTP client)
    • + *
    • HTTP client: Auto-selected via {@link A2AHttpClientFactory} (prefers Vert.x, falls back to JDK)
    • *
    • Interceptors: None
    • *
    *

    @@ -78,7 +78,7 @@ public class RestTransportConfigBuilder extends ClientTransportConfigBuilderCustom header handling * *

    - * If not specified, the default {@link JdkA2AHttpClient} is used. + * If not specified, a client is auto-selected via {@link A2AHttpClientFactory}. *

    * Example: *

    {@code
    @@ -101,16 +101,16 @@ public RestTransportConfigBuilder httpClient(A2AHttpClient httpClient) {
         /**
          * Build the REST transport configuration.
          * 

    - * If no HTTP client was configured, the default {@link JdkA2AHttpClient} is used. + * If no HTTP client was configured, one is auto-selected via {@link A2AHttpClientFactory}. * Any configured interceptors are transferred to the configuration. * * @return the configured REST transport configuration */ @Override public RestTransportConfig build() { - // No HTTP client provided, fallback to the default one (JDK-based implementation) + // No HTTP client provided, use factory to get best available implementation if (httpClient == null) { - httpClient = new JdkA2AHttpClient(); + httpClient = A2AHttpClientFactory.create(); } RestTransportConfig config = new RestTransportConfig(httpClient); diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java index 5b5af20c9..b6e6d7a47 100644 --- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java +++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java @@ -1,6 +1,6 @@ package io.a2a.client.transport.rest; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.transport.spi.ClientTransportProvider; import io.a2a.spec.A2AClientException; import io.a2a.spec.AgentCard; @@ -18,9 +18,9 @@ public String getTransportProtocol() { public RestTransport create(RestTransportConfig clientTransportConfig, AgentCard agentCard, AgentInterface agentInterface) throws A2AClientException { RestTransportConfig transportConfig = clientTransportConfig; if (transportConfig == null) { - transportConfig = new RestTransportConfig(new JdkA2AHttpClient()); + transportConfig = new RestTransportConfig(A2AHttpClientFactory.create()); } - return new RestTransport(clientTransportConfig.getHttpClient(), agentCard, agentInterface, transportConfig.getInterceptors()); + return new RestTransport(transportConfig.getHttpClient(), agentCard, agentInterface, transportConfig.getInterceptors()); } @Override diff --git a/extras/http-client-vertx/pom.xml b/extras/http-client-vertx/pom.xml new file mode 100644 index 000000000..59bbf1141 --- /dev/null +++ b/extras/http-client-vertx/pom.xml @@ -0,0 +1,44 @@ + + + 4.0.0 + + + io.github.a2asdk + a2a-java-sdk-parent + 1.0.0.Alpha1-SNAPSHOT + ../../pom.xml + + a2a-java-sdk-http-client-vertx + + jar + + Java SDK A2A HTTP Client - Vert.x Implementation + Vert.x implementation for A2A HTTP Client + + + + ${project.groupId} + a2a-java-sdk-http-client + + + io.vertx + vertx-web-client + provided + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.mock-server + mockserver-netty + test + + + + diff --git a/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClient.java b/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClient.java new file mode 100644 index 000000000..b6afce027 --- /dev/null +++ b/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClient.java @@ -0,0 +1,554 @@ +package io.a2a.client.http; + +import static java.net.HttpURLConnection.HTTP_FORBIDDEN; +import static java.net.HttpURLConnection.HTTP_MULT_CHOICE; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import org.jspecify.annotations.Nullable; + +import io.a2a.common.A2AErrorMessages; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpRequest; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.ext.web.codec.BodyCodec; +import jakarta.enterprise.context.spi.CreationalContext; +import jakarta.enterprise.inject.spi.Bean; +import jakarta.enterprise.inject.spi.BeanManager; +import jakarta.enterprise.inject.spi.CDI; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Vert.x WebClient-based implementation of {@link A2AHttpClient}. + * + *

    + * This implementation uses Vert.x's reactive HTTP client to execute requests. + * For synchronous methods ({@link GetBuilder#get()}, {@link PostBuilder#post()}, {@link DeleteBuilder#delete()}), + * the implementation blocks the calling thread until the asynchronous operation completes. + * For SSE streaming methods, the implementation returns immediately with a + * {@link CompletableFuture} and streams events asynchronously via callbacks. + * + *

    Lifecycle Management

    + *

    + * This client implements {@link AutoCloseable} and should be closed when no longer needed: + *

    {@code
    + * try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
    + *     A2AHttpResponse response = client.createGet()
    + *         .url("https://example.com/api")
    + *         .get();
    + *     // Use response
    + * }
    + * }
    + * + *

    + * If constructed with the no-args constructor, the client creates and owns a + * {@link Vertx} instance which will be closed when {@link #close()} is called. + * If constructed with an external {@link Vertx} instance, only the WebClient is + * closed, leaving the Vertx instance management to the caller. + * + *

    Thread Safety

    + *

    + * This client is thread-safe. Multiple threads can create and execute requests + * concurrently. However, individual builder instances are NOT thread-safe and should + * not be shared across threads. + * + *

    HTTP/2 Support

    + *

    + * Vert.x WebClient automatically negotiates HTTP/2 when supported by the server + * via ALPN. No explicit configuration is required. + * + *

    Usage Examples

    + * + *

    Simple GET Request

    + *
    {@code
    + * try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
    + *     A2AHttpResponse response = client.createGet()
    + *         .url("https://api.example.com/data")
    + *         .addHeader("Authorization", "Bearer token")
    + *         .get();
    + *
    + *     if (response.success()) {
    + *         System.out.println(response.body());
    + *     }
    + * }
    + * }
    + * + *

    POST Request with JSON Body

    + *
    {@code
    + * try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
    + *     A2AHttpResponse response = client.createPost()
    + *         .url("https://api.example.com/submit")
    + *         .addHeader("Content-Type", "application/json")
    + *         .body("{\"key\":\"value\"}")
    + *         .post();
    + *
    + *     System.out.println("Status: " + response.status());
    + * }
    + * }
    + * + *

    Async SSE Streaming

    + *
    {@code
    + * try (VertxA2AHttpClient client = new VertxA2AHttpClient()) {
    + *     CompletableFuture future = client.createGet()
    + *         .url("https://api.example.com/stream")
    + *         .getAsyncSSE(
    + *             message -> System.out.println("Received: " + message),
    + *             error -> error.printStackTrace(),
    + *             () -> System.out.println("Stream complete")
    + *         );
    + *
    + *     // Do other work while streaming...
    + *     future.join(); // Wait for completion if needed
    + * }
    + * }
    + */ +public class VertxA2AHttpClient implements A2AHttpClient, AutoCloseable { + + private final Vertx vertx; + private final WebClient webClient; + private boolean ownsVertx; + private static final Logger log = Logger.getLogger(VertxA2AHttpClient.class.getName()); + + /** + * Creates a new VertxA2AHttpClient with an internally managed Vert.x instance. + * + *

    + * The client creates a new {@link Vertx} instance and {@link WebClient} configured + * with HTTP keep-alive and automatic redirect following. When {@link #close()} is called, + * both the WebClient and Vertx instance are closed. + * + *

    + * Important: Always call {@link #close()} when done with this client + * to prevent resource leaks. + * + * @see #VertxA2AHttpClient(Vertx) for using an externally managed Vertx instance + */ + public VertxA2AHttpClient() { + this.vertx = createVertx(); + WebClientOptions options = new WebClientOptions() + .setFollowRedirects(true) + .setKeepAlive(true); + this.webClient = WebClient.create(vertx, options); + log.fine("Vert.x client is ready."); + } + + private Vertx createVertx() { + try { + BeanManager beanManager = CDI.current().getBeanManager(); + Set> beans = beanManager.getBeans(Vertx.class); + if (beans != null && !beans.isEmpty()) { + this.ownsVertx = false; + Bean bean = beans.iterator().next(); + CreationalContext context = beanManager.createCreationalContext(bean); + return (Vertx) beanManager.getReference(bean, Vertx.class, context); + } + } catch (Exception ex) { + log.log(Level.FINE, "Error loading vertx from CDI error details", ex); + } + this.ownsVertx = true; + return Vertx.vertx(); + } + + /** + * Creates a new VertxA2AHttpClient using an externally managed Vert.x instance. + * + *

    + * The client creates a {@link WebClient} using the provided {@link Vertx} instance. + * When {@link #close()} is called, only the WebClient is closed; the Vertx instance + * remains open and must be managed by the caller. + * + *

    + * This constructor is useful in environments where Vert.x is already managed, + * such as Quarkus applications. + * + * @param vertx the Vert.x instance to use; must not be null + * @throws NullPointerException if vertx is null + */ + public VertxA2AHttpClient(Vertx vertx) { + if (vertx == null) { + throw new NullPointerException("vertx must not be null"); + } + this.vertx = vertx; + this.ownsVertx = false; + WebClientOptions options = new WebClientOptions() + .setFollowRedirects(true) + .setKeepAlive(true); + this.webClient = WebClient.create(vertx, options); + log.fine("Vert.x client is ready."); + } + + /** + * Closes this HTTP client and releases associated resources. + * + *

    + * This method always closes the WebClient. If the client was created with the + * no-args constructor (and thus owns the Vert.x instance), the Vertx instance is + * also closed. Otherwise, the Vertx instance is left open for the caller to manage. + */ + @Override + public void close() { + webClient.close(); + if (ownsVertx) { + vertx.close(); + } + } + + @Override + public GetBuilder createGet() { + return new VertxGetBuilder(); + } + + @Override + public PostBuilder createPost() { + return new VertxPostBuilder(); + } + + @Override + public DeleteBuilder createDelete() { + return new VertxDeleteBuilder(); + } + + private abstract class VertxBuilder> implements Builder { + + protected String url = ""; + protected Map headers = new HashMap<>(); + + @Override + public T url(String url) { + this.url = url; + return self(); + } + + @Override + public T addHeader(String name, String value) { + headers.put(name, value); + return self(); + } + + @Override + public T addHeaders(Map headers) { + if (headers != null && !headers.isEmpty()) { + for (Map.Entry entry : headers.entrySet()) { + addHeader(entry.getKey(), entry.getValue()); + } + } + return self(); + } + + @SuppressWarnings("unchecked") + T self() { + return (T) this; + } + } + + /** + * Common method to execute synchronous HTTP requests (GET, POST, DELETE). + * + * @param request the HTTP request configured with method and URL + * @param headers custom headers to add to the request + * @param bodyBuffer optional body buffer for POST requests (null for GET/DELETE) + * @return the HTTP response + * @throws IOException if the request fails or returns 401/403 + * @throws InterruptedException if the thread is interrupted while waiting + */ + private A2AHttpResponse executeSyncRequest( + HttpRequest request, + Map headers, + @Nullable Buffer bodyBuffer) throws IOException, InterruptedException { + + // Add headers + for (Map.Entry entry : headers.entrySet()) { + request.putHeader(entry.getKey(), entry.getValue()); + } + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + // Send with or without body + if (bodyBuffer != null) { + request.sendBuffer(bodyBuffer, ar -> handleResponse(ar, responseRef, errorRef, latch)); + } else { + request.send(ar -> handleResponse(ar, responseRef, errorRef, latch)); + } + + latch.await(); + + if (errorRef.get() != null) { + Throwable error = errorRef.get(); + if (error instanceof IOException) { + throw (IOException) error; + } + if (error instanceof InterruptedException) { + throw (InterruptedException) error; + } + throw new IOException("Request failed", error); + } + A2AHttpResponse finalResponse = responseRef.get(); + if(finalResponse == null) { + throw new IllegalStateException("No response from http request"); + } + return finalResponse; + } + + /** + * Handles the HTTP response callback, checking for auth errors and populating response/error refs. + */ + private void handleResponse( + io.vertx.core.AsyncResult> ar, + AtomicReference responseRef, + AtomicReference errorRef, + CountDownLatch latch) { + + if (ar.succeeded()) { + HttpResponse response = ar.result(); + int status = response.statusCode(); + + // Check for authentication/authorization errors + switch (status) { + case HTTP_UNAUTHORIZED -> errorRef.set(new IOException(A2AErrorMessages.AUTHENTICATION_FAILED)); + case HTTP_FORBIDDEN -> errorRef.set(new IOException(A2AErrorMessages.AUTHORIZATION_FAILED)); + default -> { + String body = response.bodyAsString(); + responseRef.set(new VertxHttpResponse(status, body != null ? body : "")); + } + } + } else { + errorRef.set(ar.cause()); + } + latch.countDown(); + } + + /** + * Common method to execute async SSE requests (GET or POST). + * + * @param baseRequest the base HTTP request (HttpRequest<Buffer>) configured with method and URL + * @param headers custom headers to add to the request + * @param bodyBuffer optional body buffer for POST requests (null for GET) + * @param messageConsumer callback for each SSE message received + * @param errorConsumer callback for errors + * @param completeRunnable callback when stream completes successfully + * @return CompletableFuture that completes when the stream ends + */ + private CompletableFuture executeAsyncSSE( + HttpRequest baseRequest, + Map headers, + @Nullable Buffer bodyBuffer, + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) { + + CompletableFuture future = new CompletableFuture<>(); + AtomicBoolean successOccurred = new AtomicBoolean(false); + AtomicBoolean streamEnded = new AtomicBoolean(false); + AtomicBoolean futureCompleted = new AtomicBoolean(false); + + HttpRequest request = baseRequest + .putHeader(ACCEPT, EVENT_STREAM) + .as(BodyCodec.sseStream(stream -> { + stream.handler(event -> { + String data = event.data(); + if (data != null) { + data = data.trim(); + if (!data.isEmpty()) { + messageConsumer.accept(data); + } + } + }); + + stream.endHandler(v -> { + streamEnded.set(true); + // Only complete if we've validated success and haven't completed yet + if (successOccurred.get() && futureCompleted.compareAndSet(false, true)) { + completeRunnable.run(); + future.complete(null); + } + }); + + stream.exceptionHandler(error -> { + if (futureCompleted.compareAndSet(false, true)) { + errorConsumer.accept(error); + future.complete(null); + } + }); + })); + + // Add custom headers + for (Map.Entry entry : headers.entrySet()) { + request.putHeader(entry.getKey(), entry.getValue()); + } + + // Send with or without body + var sendFuture = (bodyBuffer != null) ? request.sendBuffer(bodyBuffer) : request.send(); + + sendFuture + .onSuccess(response -> { + // Validate status code manually since .expecting() doesn't work with SSE streams + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + // Error - don't set successOccurred, just report error + if (futureCompleted.compareAndSet(false, true)) { + // Use same error messages as sync requests for consistency + IOException error = switch (statusCode) { + case HTTP_UNAUTHORIZED -> new IOException(A2AErrorMessages.AUTHENTICATION_FAILED); + case HTTP_FORBIDDEN -> new IOException(A2AErrorMessages.AUTHORIZATION_FAILED); + default -> new IOException("HTTP " + statusCode + ": " + response.bodyAsString()); + }; + errorConsumer.accept(error); + future.complete(null); + } + } else { + // Success - mark as successful + successOccurred.set(true); + // If stream already ended, complete now + if (streamEnded.get() && futureCompleted.compareAndSet(false, true)) { + completeRunnable.run(); + future.complete(null); + } + } + }) + .onFailure(cause -> { + if (futureCompleted.compareAndSet(false, true)) { + errorConsumer.accept(cause); + future.complete(null); + } + }); + + return future; + } + + private class VertxGetBuilder extends VertxBuilder implements A2AHttpClient.GetBuilder { + + /** + * {@inheritDoc} + * + *

    + * Implementation Note: This method blocks the calling thread until + * the asynchronous HTTP request completes. The underlying Vert.x operation executes + * asynchronously on the Vert.x event loop. + * + * @throws IOException if the request fails, including: + *

      + *
    • Network errors (connection refused, timeout, etc.)
    • + *
    • HTTP 401 Unauthorized - with message from {@link A2AErrorMessages#AUTHENTICATION_FAILED}
    • + *
    • HTTP 403 Forbidden - with message from {@link A2AErrorMessages#AUTHORIZATION_FAILED}
    • + *
    + * @throws InterruptedException if the thread is interrupted while waiting + */ + @Override + public A2AHttpResponse get() throws IOException, InterruptedException { + return executeSyncRequest(webClient.getAbs(url), headers, null); + } + + @Override + public CompletableFuture getAsyncSSE( + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) throws IOException, InterruptedException { + + HttpRequest request = webClient.getAbs(url); + return executeAsyncSSE(request, headers, null, messageConsumer, errorConsumer, completeRunnable); + } + } + + private class VertxPostBuilder extends VertxBuilder implements A2AHttpClient.PostBuilder { + + private String body = ""; + + @Override + public PostBuilder body(String body) { + this.body = body; + return self(); + } + + /** + * {@inheritDoc} + * + *

    + * Implementation Note: This method blocks the calling thread until + * the asynchronous HTTP request completes. The underlying Vert.x operation executes + * asynchronously on the Vert.x event loop. + * + * @throws IOException if the request fails, including: + *

      + *
    • Network errors (connection refused, timeout, etc.)
    • + *
    • HTTP 401 Unauthorized - with message from {@link A2AErrorMessages#AUTHENTICATION_FAILED}
    • + *
    • HTTP 403 Forbidden - with message from {@link A2AErrorMessages#AUTHORIZATION_FAILED}
    • + *
    + * @throws InterruptedException if the thread is interrupted while waiting + */ + @Override + public A2AHttpResponse post() throws IOException, InterruptedException { + Buffer bodyBuffer = Buffer.buffer(body, StandardCharsets.UTF_8.name()); + return executeSyncRequest(webClient.postAbs(url), headers, bodyBuffer); + } + + @Override + public CompletableFuture postAsyncSSE( + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) throws IOException, InterruptedException { + + HttpRequest request = webClient.postAbs(url); + Buffer bodyBuffer = Buffer.buffer(body, StandardCharsets.UTF_8.name()); + return executeAsyncSSE(request, headers, bodyBuffer, messageConsumer, errorConsumer, completeRunnable); + } + } + + private class VertxDeleteBuilder extends VertxBuilder implements A2AHttpClient.DeleteBuilder { + + /** + * {@inheritDoc} + * + *

    + * Implementation Note: This method blocks the calling thread until + * the asynchronous HTTP request completes. The underlying Vert.x operation executes + * asynchronously on the Vert.x event loop. + * + * @throws IOException if the request fails, including: + *

      + *
    • Network errors (connection refused, timeout, etc.)
    • + *
    • HTTP 401 Unauthorized - with message from {@link A2AErrorMessages#AUTHENTICATION_FAILED}
    • + *
    • HTTP 403 Forbidden - with message from {@link A2AErrorMessages#AUTHORIZATION_FAILED}
    • + *
    + * @throws InterruptedException if the thread is interrupted while waiting + */ + @Override + public A2AHttpResponse delete() throws IOException, InterruptedException { + return executeSyncRequest(webClient.deleteAbs(url), headers, null); + } + } + + private record VertxHttpResponse(int status, String body) implements A2AHttpResponse { + + @Override + public int status() { + return status; + } + + @Override + public boolean success() { + return status >= HTTP_OK && status < HTTP_MULT_CHOICE; + } + + @Override + public String body() { + return body; + } + } +} diff --git a/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClientProvider.java b/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClientProvider.java new file mode 100644 index 000000000..e802e1df1 --- /dev/null +++ b/extras/http-client-vertx/src/main/java/io/a2a/client/http/VertxA2AHttpClientProvider.java @@ -0,0 +1,59 @@ +package io.a2a.client.http; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Service provider for {@link VertxA2AHttpClient}. + * + *

    + * This provider has a higher priority (100) than the JDK implementation and will be + * preferred when the Vert.x dependencies are available on the classpath. + * + *

    + * If Vert.x classes are not available at runtime, this provider will check for their + * presence and throw an {@link IllegalStateException} when attempting to create a client. + * The ServiceLoader mechanism will skip this provider and fall back to the JDK implementation. + */ +public final class VertxA2AHttpClientProvider implements A2AHttpClientProvider { + + private static final boolean VERTX_AVAILABLE = isVertxAvailable(); + private static final Logger log = Logger.getLogger(VertxA2AHttpClientProvider.class.getName()); + + private static boolean isVertxAvailable() { + try { + Class.forName("io.vertx.core.Vertx"); + Class.forName("io.vertx.ext.web.client.WebClient"); + return true; + } catch (ClassNotFoundException ex) { + Logger.getLogger(VertxA2AHttpClientProvider.class.getName()).log(Level.FINE, "Vert.x classes are not available on the classpath. Falling back to other providers.", ex); + return false; + } + } + + @Override + public A2AHttpClient create() { + if (!VERTX_AVAILABLE) { + throw new IllegalStateException( + "Vert.x classes are not available on the classpath. " + + "Add io.vertx:vertx-web-client dependency or use the JDK HTTP client implementation."); + } + + try { + Class clientClass = Class.forName("io.a2a.client.http.VertxA2AHttpClient"); + return (A2AHttpClient) clientClass.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new IllegalStateException("Failed to create VertxA2AHttpClient instance", e); + } + } + + @Override + public int priority() { + return VERTX_AVAILABLE ? 100 : -1; // Higher priority when available, negative when not + } + + @Override + public String name() { + return "vertx"; + } +} diff --git a/extras/http-client-vertx/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider b/extras/http-client-vertx/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider new file mode 100644 index 000000000..9d6a67c3b --- /dev/null +++ b/extras/http-client-vertx/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider @@ -0,0 +1 @@ +io.a2a.client.http.VertxA2AHttpClientProvider diff --git a/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientFactoryTest.java b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientFactoryTest.java new file mode 100644 index 000000000..2ac3eac94 --- /dev/null +++ b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientFactoryTest.java @@ -0,0 +1,66 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +public class VertxA2AHttpClientFactoryTest { + + @Test + public void testCreateReturnsVertxClient() { + // When both JDK and Vertx are on classpath, Vertx should be preferred due to higher priority + A2AHttpClient client = A2AHttpClientFactory.create(); + assertNotNull(client); + assertInstanceOf(VertxA2AHttpClient.class, client, + "Factory should return VertxA2AHttpClient when Vertx is available"); + // Clean up + if (client instanceof AutoCloseable) { + try { + ((AutoCloseable) client).close(); + } catch (Exception e) { + fail("Failed to close client: " + e.getMessage()); + } + } + } + + @Test + public void testCreateWithVertxProviderName() { + A2AHttpClient client = A2AHttpClientFactory.create("vertx"); + assertNotNull(client); + assertInstanceOf(VertxA2AHttpClient.class, client, + "Factory should return VertxA2AHttpClient when 'vertx' provider is requested"); + // Clean up + if (client instanceof AutoCloseable) { + try { + ((AutoCloseable) client).close(); + } catch (Exception e) { + fail("Failed to close client: " + e.getMessage()); + } + } + } + + @Test + public void testVertxClientIsUsable() { + A2AHttpClient client = A2AHttpClientFactory.create("vertx"); + assertNotNull(client); + + // Verify we can create builders + A2AHttpClient.GetBuilder getBuilder = client.createGet(); + assertNotNull(getBuilder, "Should be able to create GET builder"); + + A2AHttpClient.PostBuilder postBuilder = client.createPost(); + assertNotNull(postBuilder, "Should be able to create POST builder"); + + A2AHttpClient.DeleteBuilder deleteBuilder = client.createDelete(); + assertNotNull(deleteBuilder, "Should be able to create DELETE builder"); + + // Clean up + if (client instanceof AutoCloseable) { + try { + ((AutoCloseable) client).close(); + } catch (Exception e) { + fail("Failed to close client: " + e.getMessage()); + } + } + } +} diff --git a/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientIntegrationTest.java b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientIntegrationTest.java new file mode 100644 index 000000000..f945aecde --- /dev/null +++ b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientIntegrationTest.java @@ -0,0 +1,212 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +import io.a2a.common.A2AErrorMessages; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; + +public class VertxA2AHttpClientIntegrationTest { + + private ClientAndServer mockServer; + private VertxA2AHttpClient client; + + @BeforeEach + public void setup() { + mockServer = ClientAndServer.startClientAndServer(0); // Use random port + client = new VertxA2AHttpClient(); + } + + @AfterEach + public void teardown() { + if (client != null) { + client.close(); + } + if (mockServer != null) { + mockServer.stop(); + } + } + + private String getBaseUrl() { + return "http://localhost:" + mockServer.getPort(); + } + + @Test + public void testGetRequestSuccess() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(200).withBody("success")); + + A2AHttpResponse response = client.createGet() + .url(getBaseUrl() + "/test") + .get(); + + assertEquals(200, response.status()); + assertTrue(response.success()); + assertEquals("success", response.body()); + } + + @Test + public void testPostRequestSuccess() throws Exception { + mockServer + .when(request() + .withMethod("POST") + .withPath("/test") + .withBody("{\"key\":\"value\"}")) + .respond(response().withStatusCode(201).withBody("created")); + + A2AHttpResponse response = client.createPost() + .url(getBaseUrl() + "/test") + .body("{\"key\":\"value\"}") + .post(); + + assertEquals(201, response.status()); + assertTrue(response.success()); + assertEquals("created", response.body()); + } + + @Test + public void testDeleteRequestSuccess() throws Exception { + mockServer + .when(request().withMethod("DELETE").withPath("/test")) + .respond(response().withStatusCode(204)); + + A2AHttpResponse response = client.createDelete() + .url(getBaseUrl() + "/test") + .delete(); + + assertEquals(204, response.status()); + assertTrue(response.success()); + } + + @Test + public void test401AuthenticationErrorOnGet() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(401)); + + Exception exception = assertThrows(java.io.IOException.class, () -> { + client.createGet() + .url(getBaseUrl() + "/test") + .get(); + }); + + assertEquals(A2AErrorMessages.AUTHENTICATION_FAILED, exception.getMessage()); + } + + @Test + public void test403AuthorizationErrorOnGet() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(403)); + + Exception exception = assertThrows(java.io.IOException.class, () -> { + client.createGet() + .url(getBaseUrl() + "/test") + .get(); + }); + + assertEquals(A2AErrorMessages.AUTHORIZATION_FAILED, exception.getMessage()); + } + + @Test + public void test401AuthenticationErrorOnPost() throws Exception { + mockServer + .when(request().withMethod("POST").withPath("/test")) + .respond(response().withStatusCode(401)); + + Exception exception = assertThrows(java.io.IOException.class, () -> { + client.createPost() + .url(getBaseUrl() + "/test") + .body("{}") + .post(); + }); + + assertEquals(A2AErrorMessages.AUTHENTICATION_FAILED, exception.getMessage()); + } + + @Test + public void test403AuthorizationErrorOnPost() throws Exception { + mockServer + .when(request().withMethod("POST").withPath("/test")) + .respond(response().withStatusCode(403)); + + Exception exception = assertThrows(java.io.IOException.class, () -> { + client.createPost() + .url(getBaseUrl() + "/test") + .body("{}") + .post(); + }); + + assertEquals(A2AErrorMessages.AUTHORIZATION_FAILED, exception.getMessage()); + } + + @Test + public void test401AuthenticationErrorOnDelete() throws Exception { + mockServer + .when(request().withMethod("DELETE").withPath("/test")) + .respond(response().withStatusCode(401)); + + Exception exception = assertThrows(java.io.IOException.class, () -> { + client.createDelete() + .url(getBaseUrl() + "/test") + .delete(); + }); + + assertEquals(A2AErrorMessages.AUTHENTICATION_FAILED, exception.getMessage()); + } + + @Test + public void testHeaderPropagation() throws Exception { + mockServer + .when(request() + .withMethod("GET") + .withPath("/test") + .withHeader("Authorization", "Bearer token") + .withHeader("X-Custom-Header", "custom-value")) + .respond(response().withStatusCode(200).withBody("ok")); + + A2AHttpResponse response = client.createGet() + .url(getBaseUrl() + "/test") + .addHeader("Authorization", "Bearer token") + .addHeader("X-Custom-Header", "custom-value") + .get(); + + assertEquals(200, response.status()); + assertEquals("ok", response.body()); + } + + @Test + public void testNonSuccessStatusCode() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(500).withBody("Internal Server Error")); + + A2AHttpResponse response = client.createGet() + .url(getBaseUrl() + "/test") + .get(); + + assertEquals(500, response.status()); + assertFalse(response.success()); + assertEquals("Internal Server Error", response.body()); + } + + @Test + public void test404NotFound() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(404).withBody("Not Found")); + + A2AHttpResponse response = client.createGet() + .url(getBaseUrl() + "/test") + .get(); + + assertEquals(404, response.status()); + assertFalse(response.success()); + assertEquals("Not Found", response.body()); + } +} diff --git a/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientSSETest.java b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientSSETest.java new file mode 100644 index 000000000..64b1dec54 --- /dev/null +++ b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientSSETest.java @@ -0,0 +1,253 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +import io.a2a.common.A2AErrorMessages; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class VertxA2AHttpClientSSETest { + + private ClientAndServer mockServer; + private VertxA2AHttpClient client; + + @BeforeEach + public void setup() { + mockServer = ClientAndServer.startClientAndServer(0); // Use random port + client = new VertxA2AHttpClient(); + } + + @AfterEach + public void teardown() { + if (client != null) { + client.close(); + } + if (mockServer != null) { + mockServer.stop(); + } + } + + private String getBaseUrl() { + return "http://localhost:" + mockServer.getPort(); + } + + @Test + public void testGetAsyncSSE() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: event1\n\ndata: event2\n\ndata: event3\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + CompletableFuture future = client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS), "Expected completion handler to be called"); + assertNull(error.get(), "Expected no errors"); + assertFalse(events.isEmpty(), "Expected to receive events"); + assertTrue(events.contains("event1"), "Expected event1"); + assertTrue(events.contains("event2"), "Expected event2"); + assertTrue(events.contains("event3"), "Expected event3"); + } + + @Test + public void testPostAsyncSSE() throws Exception { + mockServer + .when(request() + .withMethod("POST") + .withPath("/sse") + .withBody("{\"subscribe\":true}")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: message1\n\ndata: message2\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + CompletableFuture future = client.createPost() + .url(getBaseUrl() + "/sse") + .body("{\"subscribe\":true}") + .postAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS), "Expected completion handler to be called"); + assertNull(error.get(), "Expected no errors"); + assertFalse(events.isEmpty(), "Expected to receive events"); + assertTrue(events.contains("message1"), "Expected message1"); + assertTrue(events.contains("message2"), "Expected message2"); + } + + @Test + public void testSSEDataPrefixStripping() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: content here\n\ndata:no space\n\ndata: extra spaces \n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + CompletableFuture future = client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertTrue(events.contains("content here"), "Should have stripped 'data: ' prefix"); + assertTrue(events.contains("no space"), "Should handle 'data:' without space"); + assertTrue(events.contains("extra spaces"), "Should trim whitespace"); + } + + @Test + public void testSSEAuthenticationError() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response().withStatusCode(401)); + + CountDownLatch errorLatch = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + AtomicBoolean completed = new AtomicBoolean(false); + + CompletableFuture future = client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + msg -> {}, + e -> { + error.set(e); + errorLatch.countDown(); + }, + () -> completed.set(true) + ); + + assertTrue(errorLatch.await(5, TimeUnit.SECONDS), "Expected error handler to be called"); + assertNotNull(error.get(), "Expected an error"); + assertTrue(error.get() instanceof IOException, "Expected IOException"); + assertTrue(error.get().getMessage().contains("Authentication failed"), + "Expected authentication error message but got: " + error.get().getMessage()); + assertFalse(completed.get(), "Should not call completion handler on error"); + } + + @Test + public void testSSEAuthorizationError() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response().withStatusCode(403)); + + CountDownLatch errorLatch = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + AtomicBoolean completed = new AtomicBoolean(false); + + CompletableFuture future = client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + msg -> {}, + e -> { + error.set(e); + errorLatch.countDown(); + }, + () -> completed.set(true) + ); + + assertTrue(errorLatch.await(5, TimeUnit.SECONDS), "Expected error handler to be called"); + assertNotNull(error.get(), "Expected an error"); + assertTrue(error.get() instanceof IOException, "Expected IOException"); + assertTrue(error.get().getMessage().contains("Authorization failed"), + "Expected authorization error message but got: " + error.get().getMessage()); + assertFalse(completed.get(), "Should not call completion handler on error"); + } + + @Test + public void testSSEEmptyLinesIgnored() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: first\n\n\n\ndata: second\n\ndata: \n\ndata: third\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + CompletableFuture future = client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertEquals(3, events.size(), "Should have received 3 non-empty events"); + assertTrue(events.contains("first")); + assertTrue(events.contains("second")); + assertTrue(events.contains("third")); + } + + @Test + public void testSSEHeaderPropagation() throws Exception { + mockServer + .when(request() + .withMethod("GET") + .withPath("/sse") + .withHeader("Accept", "text/event-stream") + .withHeader("Authorization", "Bearer token")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: authenticated\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + CompletableFuture future = client.createGet() + .url(getBaseUrl() + "/sse") + .addHeader("Authorization", "Bearer token") + .getAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertTrue(events.contains("authenticated")); + } +} diff --git a/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientTest.java b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientTest.java new file mode 100644 index 000000000..55e9afd94 --- /dev/null +++ b/extras/http-client-vertx/src/test/java/io/a2a/client/http/VertxA2AHttpClientTest.java @@ -0,0 +1,94 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.*; + +import io.vertx.core.Vertx; +import org.junit.jupiter.api.Test; + +public class VertxA2AHttpClientTest { + + @Test + public void testNoArgsConstructor() { + VertxA2AHttpClient client = new VertxA2AHttpClient(); + assertNotNull(client); + client.close(); + } + + @Test + public void testVertxParameterConstructor() { + Vertx vertx = Vertx.vertx(); + VertxA2AHttpClient client = new VertxA2AHttpClient(vertx); + assertNotNull(client); + client.close(); + vertx.close(); + } + + @Test + public void testVertxParameterConstructorNullThrows() { + assertThrows(NullPointerException.class, () -> { + new VertxA2AHttpClient(null); + }); + } + + @Test + public void testCreateGet() { + try (VertxA2AHttpClient client = new VertxA2AHttpClient()) { + A2AHttpClient.GetBuilder builder = client.createGet(); + assertNotNull(builder); + } + } + + @Test + public void testCreatePost() { + try (VertxA2AHttpClient client = new VertxA2AHttpClient()) { + A2AHttpClient.PostBuilder builder = client.createPost(); + assertNotNull(builder); + } + } + + @Test + public void testCreateDelete() { + try (VertxA2AHttpClient client = new VertxA2AHttpClient()) { + A2AHttpClient.DeleteBuilder builder = client.createDelete(); + assertNotNull(builder); + } + } + + @Test + public void testBuilderUrlSetting() { + try (VertxA2AHttpClient client = new VertxA2AHttpClient()) { + A2AHttpClient.GetBuilder builder = client.createGet(); + A2AHttpClient.GetBuilder result = builder.url("https://example.com"); + assertSame(builder, result, "Builder should return itself for method chaining"); + } + } + + @Test + public void testBuilderHeaderSetting() { + try (VertxA2AHttpClient client = new VertxA2AHttpClient()) { + A2AHttpClient.GetBuilder builder = client.createGet(); + A2AHttpClient.GetBuilder result = builder.addHeader("Accept", "application/json"); + assertSame(builder, result, "Builder should return itself for method chaining"); + } + } + + @Test + public void testBuilderMethodChaining() { + try (VertxA2AHttpClient client = new VertxA2AHttpClient()) { + A2AHttpClient.GetBuilder builder = client.createGet() + .url("https://example.com") + .addHeader("Accept", "application/json") + .addHeader("Authorization", "Bearer token"); + assertNotNull(builder); + } + } + + @Test + public void testPostBuilderBody() { + try (VertxA2AHttpClient client = new VertxA2AHttpClient()) { + A2AHttpClient.PostBuilder builder = client.createPost(); + A2AHttpClient.PostBuilder result = builder.body("{\"key\":\"value\"}"); + assertSame(builder, result, "Builder should return itself for method chaining"); + } + } +} diff --git a/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java b/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java index 6d2e5f55f..b4847cd40 100644 --- a/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java +++ b/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java @@ -27,18 +27,18 @@ public class A2ACardResolver { private static final String DEFAULT_AGENT_CARD_PATH = "/.well-known/agent-card.json"; /** - * Get the agent card for an A2A agent. The {@code JdkA2AHttpClient} will be used to fetch the agent card. + * Get the agent card for an A2A agent. An HTTP client will be auto-selected via {@link A2AHttpClientFactory}. * * @param baseUrl the base URL for the agent whose agent card we want to retrieve, must not be null * @throws A2AClientError if the URL for the agent is invalid * @throws IllegalArgumentException if baseUrl is null */ public A2ACardResolver(String baseUrl) throws A2AClientError { - this(new JdkA2AHttpClient(), baseUrl, null, null); + this(A2AHttpClientFactory.create(), baseUrl, null, null); } /** - * Get the agent card for an A2A agent. The {@code JdkA2AHttpClient} will be used to fetch the agent card. + * Get the agent card for an A2A agent. An HTTP client will be auto-selected via {@link A2AHttpClientFactory}. * * @param baseUrl the base URL for the agent whose agent card we want to retrieve, must not be null * @param tenant the tenant path to use when fetching the agent card, may be null for no tenant @@ -46,7 +46,7 @@ public A2ACardResolver(String baseUrl) throws A2AClientError { * @throws IllegalArgumentException if baseUrl is null */ public A2ACardResolver(String baseUrl, @Nullable String tenant) throws A2AClientError { - this(new JdkA2AHttpClient(), baseUrl, tenant, null); + this(A2AHttpClientFactory.create(), baseUrl, tenant, null); } /** diff --git a/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java b/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java index 8d7f2d0f9..9e4f5f705 100644 --- a/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java +++ b/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java @@ -9,6 +9,8 @@ public interface A2AHttpClient { String CONTENT_TYPE= "Content-Type"; String APPLICATION_JSON= "application/json"; + String ACCEPT = "Accept"; + String EVENT_STREAM = "text/event-stream"; GetBuilder createGet(); diff --git a/http-client/src/main/java/io/a2a/client/http/A2AHttpClientFactory.java b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientFactory.java new file mode 100644 index 000000000..05bac3112 --- /dev/null +++ b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientFactory.java @@ -0,0 +1,89 @@ +package io.a2a.client.http; + +import java.util.Comparator; +import java.util.ServiceLoader; +import java.util.stream.StreamSupport; + +/** + * Factory for creating {@link A2AHttpClient} instances using the ServiceLoader mechanism. + * + *

    + * This factory discovers available {@link A2AHttpClientProvider} implementations at runtime + * and selects the one with the highest priority. If no providers are found, it falls back + * to creating a {@link JdkA2AHttpClient}. + * + *

    Usage

    + *
    {@code
    + * // Get the default client (highest priority provider)
    + * A2AHttpClient client = A2AHttpClientFactory.create();
    + *
    + * // Use with try-with-resources if the client implements AutoCloseable
    + * try (A2AHttpClient client = A2AHttpClientFactory.create()) {
    + *     A2AHttpResponse response = client.createGet()
    + *         .url("https://example.com")
    + *         .get();
    + * }
    + * }
    + * + *

    Priority System

    + *

    + * Providers are selected based on their priority value (higher is better): + *

      + *
    • JdkA2AHttpClient: priority 0 (fallback)
    • + *
    • VertxA2AHttpClient: priority 100 (preferred when available)
    • + *
    + * + *

    Custom Providers

    + *

    + * To add a custom provider, implement {@link A2AHttpClientProvider} and register it + * in {@code META-INF/services/io.a2a.client.http.A2AHttpClientProvider}. + */ +public final class A2AHttpClientFactory { + + private A2AHttpClientFactory() { + // Utility class + } + + /** + * Creates a new A2AHttpClient instance using the highest priority provider available. + * + *

    + * This method uses the ServiceLoader mechanism to discover providers at runtime. + * If no providers are found, it falls back to creating a {@link JdkA2AHttpClient}. + * + * @return a new A2AHttpClient instance + */ + public static A2AHttpClient create() { + ServiceLoader loader = ServiceLoader.load(A2AHttpClientProvider.class); + + return StreamSupport.stream(loader.spliterator(), false) + .max(Comparator.comparingInt(A2AHttpClientProvider::priority)) + .map(A2AHttpClientProvider::create) + .orElseGet(JdkA2AHttpClient::new); + } + + /** + * Creates a new A2AHttpClient instance using a specific provider by name. + * + *

    + * This method is useful for testing or when you need to force a specific implementation. + * + * @param providerName the name of the provider to use + * @return a new A2AHttpClient instance from the specified provider + * @throws IllegalArgumentException if no provider with the given name is found + */ + public static A2AHttpClient create(String providerName) { + if (providerName == null || providerName.isEmpty()) { + throw new IllegalArgumentException("Provider name must not be null or empty"); + } + + ServiceLoader loader = ServiceLoader.load(A2AHttpClientProvider.class); + + return StreamSupport.stream(loader.spliterator(), false) + .filter(provider -> providerName.equals(provider.name())) + .findFirst() + .map(A2AHttpClientProvider::create) + .orElseThrow(() -> new IllegalArgumentException( + "No A2AHttpClientProvider found with name: " + providerName)); + } +} diff --git a/http-client/src/main/java/io/a2a/client/http/A2AHttpClientProvider.java b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientProvider.java new file mode 100644 index 000000000..8c33c66e6 --- /dev/null +++ b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientProvider.java @@ -0,0 +1,48 @@ +package io.a2a.client.http; + +/** + * Service provider interface for creating {@link A2AHttpClient} instances. + * + *

    + * Implementations of this interface can be registered via the Java ServiceLoader + * mechanism. The {@link A2AHttpClientFactory} will discover and use the highest + * priority provider available. + * + *

    + * To register a provider, create a file named + * {@code META-INF/services/io.a2a.client.http.A2AHttpClientProvider} containing + * the fully qualified class name of your provider implementation. + */ +public interface A2AHttpClientProvider { + + /** + * Creates a new instance of an A2AHttpClient. + * + * @return a new A2AHttpClient instance + */ + A2AHttpClient create(); + + /** + * Returns the priority of this provider. Higher priority providers are + * preferred over lower priority ones. + * + *

    + * Default priorities: + *

      + *
    • JdkA2AHttpClient: 0 (fallback)
    • + *
    • VertxA2AHttpClient: 100 (preferred when available)
    • + *
    + * + * @return the priority value (higher is better) + */ + default int priority() { + return 0; + } + + /** + * Returns the name of this provider for logging and debugging purposes. + * + * @return the provider name + */ + String name(); +} diff --git a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java index 9b8003741..d5bc68651 100644 --- a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java +++ b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java @@ -199,7 +199,7 @@ private class JdkGetBuilder extends JdkBuilder implements A2AHttpCli private HttpRequest.Builder createRequestBuilder(boolean SSE) throws IOException { HttpRequest.Builder builder = super.createRequestBuilder().GET(); if (SSE) { - builder.header("Accept", "text/event-stream"); + builder.header(ACCEPT, EVENT_STREAM); } return builder; } @@ -250,7 +250,7 @@ private HttpRequest.Builder createRequestBuilder(boolean SSE) throws IOException HttpRequest.Builder builder = super.createRequestBuilder() .POST(HttpRequest.BodyPublishers.ofString(body, StandardCharsets.UTF_8)); if (SSE) { - builder.header("Accept", "text/event-stream"); + builder.header(ACCEPT, EVENT_STREAM); } return builder; } diff --git a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClientProvider.java b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClientProvider.java new file mode 100644 index 000000000..1bf388286 --- /dev/null +++ b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClientProvider.java @@ -0,0 +1,27 @@ +package io.a2a.client.http; + +/** + * Service provider for {@link JdkA2AHttpClient}. + * + *

    + * This provider has the lowest priority (0) and serves as the fallback implementation + * when no other providers are available. The JDK HTTP client is always available as it + * uses only standard Java libraries. + */ +public final class JdkA2AHttpClientProvider implements A2AHttpClientProvider { + + @Override + public A2AHttpClient create() { + return new JdkA2AHttpClient(); + } + + @Override + public int priority() { + return 0; // Lowest priority - fallback + } + + @Override + public String name() { + return "jdk"; + } +} diff --git a/http-client/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider b/http-client/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider new file mode 100644 index 000000000..78dbb361e --- /dev/null +++ b/http-client/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider @@ -0,0 +1 @@ +io.a2a.client.http.JdkA2AHttpClientProvider diff --git a/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryTest.java b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryTest.java new file mode 100644 index 000000000..72de52230 --- /dev/null +++ b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryTest.java @@ -0,0 +1,88 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +public class A2AHttpClientFactoryTest { + + @Test + public void testCreateReturnsNonNull() { + A2AHttpClient client = A2AHttpClientFactory.create(); + assertNotNull(client, "Factory should return a non-null client"); + } + + @Test + public void testCreateReturnsJdkClient() { + // When Vertx is not on classpath, JDK client should be used + A2AHttpClient client = A2AHttpClientFactory.create(); + assertNotNull(client); + assertInstanceOf(JdkA2AHttpClient.class, client, + "Factory should return JdkA2AHttpClient when Vertx is not available"); + } + + @Test + public void testCreateWithJdkProviderName() { + A2AHttpClient client = A2AHttpClientFactory.create("jdk"); + assertNotNull(client); + assertInstanceOf(JdkA2AHttpClient.class, client, + "Factory should return JdkA2AHttpClient when 'jdk' provider is requested"); + } + + @Test + public void testCreateWithVertxProviderNameThrows() { + // Vertx provider is not available in the core module + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> A2AHttpClientFactory.create("vertx"), + "Factory should throw IllegalArgumentException when vertx provider is not found" + ); + assertTrue(exception.getMessage().contains("vertx"), + "Exception message should mention the provider name"); + } + + @Test + public void testCreateWithInvalidProviderNameThrows() { + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> A2AHttpClientFactory.create("nonexistent"), + "Factory should throw IllegalArgumentException for unknown provider" + ); + assertTrue(exception.getMessage().contains("nonexistent"), + "Exception message should mention the provider name"); + } + + @Test + public void testCreateWithNullProviderNameThrows() { + assertThrows( + IllegalArgumentException.class, + () -> A2AHttpClientFactory.create(null), + "Factory should throw IllegalArgumentException for null provider name" + ); + } + + @Test + public void testCreateWithEmptyProviderNameThrows() { + assertThrows( + IllegalArgumentException.class, + () -> A2AHttpClientFactory.create(""), + "Factory should throw IllegalArgumentException for empty provider name" + ); + } + + @Test + public void testCreatedClientIsUsable() { + A2AHttpClient client = A2AHttpClientFactory.create(); + assertNotNull(client); + + // Verify we can create builders + A2AHttpClient.GetBuilder getBuilder = client.createGet(); + assertNotNull(getBuilder, "Should be able to create GET builder"); + + A2AHttpClient.PostBuilder postBuilder = client.createPost(); + assertNotNull(postBuilder, "Should be able to create POST builder"); + + A2AHttpClient.DeleteBuilder deleteBuilder = client.createDelete(); + assertNotNull(deleteBuilder, "Should be able to create DELETE builder"); + } +} diff --git a/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryUsageExample.java b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryUsageExample.java new file mode 100644 index 000000000..86da8c8cf --- /dev/null +++ b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientFactoryUsageExample.java @@ -0,0 +1,122 @@ +package io.a2a.client.http; + +import java.io.IOException; + +/** + * Example demonstrating how to use {@link A2AHttpClientFactory} to obtain HTTP client instances. + * + *

    + * This class shows various usage patterns for the factory-based approach to creating + * A2AHttpClient instances. + */ +public class A2AHttpClientFactoryUsageExample { + + /** + * Example 1: Basic usage with automatic selection of best available client. + */ + public void basicUsage() throws IOException, InterruptedException { + // The factory automatically selects the best available implementation: + // - VertxA2AHttpClient (priority 100) if Vert.x is on the classpath + // - JdkA2AHttpClient (priority 0) as fallback + A2AHttpClient client = A2AHttpClientFactory.create(); + + try { + A2AHttpResponse response = client.createGet() + .url("https://api.example.com/data") + .addHeader("Accept", "application/json") + .get(); + + if (response.success()) { + System.out.println("Response: " + response.body()); + } + } finally { + // Close if the client supports AutoCloseable (Vertx does, JDK doesn't) + if (client instanceof AutoCloseable) { + try { + ((AutoCloseable) client).close(); + } catch (Exception e) { + // Handle close exception + } + } + } + } + + /** + * Example 2: Try-with-resources pattern (recommended for AutoCloseable clients). + */ + public void tryWithResourcesUsage() throws Exception { + A2AHttpClient client = A2AHttpClientFactory.create(); + + // Only use try-with-resources if the client is AutoCloseable + if (client instanceof AutoCloseable) { + try (AutoCloseable closeableClient = (AutoCloseable) client) { + A2AHttpResponse response = client.createPost() + .url("https://api.example.com/submit") + .addHeader("Content-Type", "application/json") + .body("{\"key\":\"value\"}") + .post(); + + System.out.println("Status: " + response.status()); + } + } else { + // Non-closeable client, use normally + A2AHttpResponse response = client.createPost() + .url("https://api.example.com/submit") + .addHeader("Content-Type", "application/json") + .body("{\"key\":\"value\"}") + .post(); + + System.out.println("Status: " + response.status()); + } + } + + /** + * Example 3: Explicitly selecting a specific implementation. + */ + public void specificProviderUsage() throws IOException, InterruptedException { + // Force the use of JDK client even if Vert.x is available + A2AHttpClient jdkClient = A2AHttpClientFactory.create("jdk"); + A2AHttpResponse response = jdkClient.createGet() + .url("https://api.example.com/data") + .get(); + + System.out.println("Using JDK client: " + response.status()); + + // Or explicitly use Vert.x client + try (AutoCloseable vertxClient = (AutoCloseable) A2AHttpClientFactory.create("vertx")) { + A2AHttpResponse vertxResponse = ((A2AHttpClient) vertxClient).createGet() + .url("https://api.example.com/data") + .get(); + + System.out.println("Using Vert.x client: " + vertxResponse.status()); + } catch (Exception e) { + // Handle exceptions + } + } + + /** + * Example 4: Defensive programming - handling unknown implementations. + */ + public void defensiveUsage() throws IOException, InterruptedException { + A2AHttpClient client = null; + try { + client = A2AHttpClientFactory.create(); + + A2AHttpResponse response = client.createGet() + .url("https://api.example.com/data") + .get(); + + System.out.println("Response: " + response.body()); + + } finally { + // Safely close if possible + if (client instanceof AutoCloseable) { + try { + ((AutoCloseable) client).close(); + } catch (Exception e) { + System.err.println("Warning: Failed to close client: " + e.getMessage()); + } + } + } + } +} diff --git a/http-client/src/test/java/io/a2a/client/http/A2AHttpClientProviderTest.java b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientProviderTest.java new file mode 100644 index 000000000..b2e222517 --- /dev/null +++ b/http-client/src/test/java/io/a2a/client/http/A2AHttpClientProviderTest.java @@ -0,0 +1,28 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +public class A2AHttpClientProviderTest { + + @Test + public void testJdkProviderCreatesClient() { + JdkA2AHttpClientProvider provider = new JdkA2AHttpClientProvider(); + A2AHttpClient client = provider.create(); + assertNotNull(client); + assertInstanceOf(JdkA2AHttpClient.class, client); + } + + @Test + public void testJdkProviderPriority() { + JdkA2AHttpClientProvider provider = new JdkA2AHttpClientProvider(); + assertEquals(0, provider.priority(), "JDK provider should have priority 0"); + } + + @Test + public void testJdkProviderName() { + JdkA2AHttpClientProvider provider = new JdkA2AHttpClientProvider(); + assertEquals("jdk", provider.name(), "JDK provider name should be 'jdk'"); + } +} diff --git a/pom.xml b/pom.xml index 643b56e53..9a712913a 100644 --- a/pom.xml +++ b/pom.xml @@ -65,7 +65,7 @@ 1.7.1 4.33.1 0.6.1 - 3.30.1 + 3.30.6 5.5.1 2.0.17 1.5.18 @@ -119,6 +119,11 @@ a2a-java-sdk-http-client ${project.version} + + ${project.groupId} + a2a-java-sdk-http-client-vertx + ${project.version} + ${project.groupId} a2a-java-sdk-jsonrpc-common @@ -510,6 +515,7 @@ extras/task-store-database-jpa extras/push-notification-config-store-database-jpa extras/queue-manager-replicated + extras/http-client-vertx http-client jsonrpc-common integrations/microprofile-config diff --git a/reference/rest/pom.xml b/reference/rest/pom.xml index b284e140d..6c45cb659 100644 --- a/reference/rest/pom.xml +++ b/reference/rest/pom.xml @@ -35,6 +35,11 @@ a2a-java-sdk-jsonrpc-common ${project.version} + + ${project.groupId} + a2a-java-sdk-http-client-vertx + test + ${project.groupId} a2a-java-sdk-client-transport-rest @@ -107,7 +112,7 @@ maven-surefire-plugin - 3.5.3 + 3.5.4 org.jboss.logmanager.LogManager diff --git a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/QuarkusA2ARestTest.java b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/QuarkusA2ARestTest.java index 5790fc6c5..61576088a 100644 --- a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/QuarkusA2ARestTest.java +++ b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/QuarkusA2ARestTest.java @@ -6,17 +6,23 @@ import java.net.http.HttpResponse; import io.a2a.client.ClientBuilder; +import io.a2a.client.http.VertxA2AHttpClient; import io.a2a.client.transport.rest.RestTransport; import io.a2a.client.transport.rest.RestTransportConfigBuilder; import io.a2a.server.apps.common.AbstractA2AServerTest; import io.a2a.spec.TransportProtocol; import io.quarkus.test.junit.QuarkusTest; +import io.vertx.core.Vertx; +import jakarta.inject.Inject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @QuarkusTest public class QuarkusA2ARestTest extends AbstractA2AServerTest { + @Inject + Vertx vertx; + public QuarkusA2ARestTest() { super(8081); } @@ -33,7 +39,7 @@ protected String getTransportUrl() { @Override protected void configureTransport(ClientBuilder builder) { - builder.withTransport(RestTransport.class, new RestTransportConfigBuilder()); + builder.withTransport(RestTransport.class, new RestTransportConfigBuilder().httpClient(new VertxA2AHttpClient(vertx))); } @Test diff --git a/server-common/pom.xml b/server-common/pom.xml index 1f8633c77..fb9a3fdfa 100644 --- a/server-common/pom.xml +++ b/server-common/pom.xml @@ -34,6 +34,11 @@ ${project.groupId} a2a-java-sdk-http-client + + ${project.groupId} + a2a-java-sdk-http-client-vertx + provided + ${project.groupId} a2a-java-sdk-client-transport-jsonrpc diff --git a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java index a59767d8f..3b434a005 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java +++ b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java @@ -13,7 +13,7 @@ import jakarta.inject.Inject; import io.a2a.client.http.A2AHttpClient; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.jsonrpc.common.json.JsonUtil; import io.a2a.spec.ListTaskPushNotificationConfigParams; import io.a2a.spec.ListTaskPushNotificationConfigResult; @@ -48,7 +48,7 @@ protected BasePushNotificationSender() { @Inject public BasePushNotificationSender(PushNotificationConfigStore configStore) { - this.httpClient = new JdkA2AHttpClient(); + this.httpClient = A2AHttpClientFactory.create(); this.configStore = configStore; }