diff --git a/docs/ITK_BASELINE.md b/docs/ITK_BASELINE.md new file mode 100644 index 0000000..e6924f0 --- /dev/null +++ b/docs/ITK_BASELINE.md @@ -0,0 +1,159 @@ +# A2A Elixir — ITK Interoperability Baseline + +**Status:** Showcase / instrument PR. **No changes to the A2A Elixir SDK (`lib/`).** +This PR adds a test-harness that drives the *unmodified* SDK against the official +A2A **Interoperability Test Kit (ITK)** and documents — with reproducible +evidence — exactly what works today over JSON-RPC and where the gaps are. + +The goal is a measuring stick, not a fix: establish an honest baseline so future +v1.0-compliance work is gap-driven and regression-gated. + +--- + +## TL;DR + +| Capability | Status | Evidence | +|---|---|---| +| v0.3-shaped Agent Card | ✅ Works | `preferredTransport: JSONRPC`, `protocolVersion: 0.3.0` served at `/jsonrpc/.well-known/agent-card.json` | +| ITK Instruction proto decode (`return_response`, `steps`, `call_agent`) | ✅ Works | `steps_concat` → `[a -> b (jsonrpc)]\ntraversal-completed:jsonrpc` | +| Agent interpreter + Task construction | ✅ Works | 404 unit tests green against pristine SDK | +| `message/send` (non-streaming) round-trip | ✅ Works at wire level | completed Task, `status.message.text = traversal-completed:jsonrpc` | +| ITK end-to-end traversal (Python a2a-sdk 0.3.24 client) | ❌ Fails | enum + streaming-event shape mismatch (below) | + +**Bottom line:** the harness, proto codec, and agent logic are sound. The SDK's +**JSON wire encoding** does not match what the A2A Python v0.3 client +(`a2a-sdk 0.3.24`) accepts — so a real ITK traversal does not yet pass. That +gap is documented here as the entry point for v1.0 work; it is **not** fixed in +this PR. + +--- + +## What this PR adds (harness only) + +``` +test/support/itk/instruction.ex # ITK Instruction protobuf codec (decode/encode) +test/support/itk/agent.ex # A2A.Test.ITK.Agent — JSON-RPC handler / interpreter +test/itk/server.exs # standalone Bandit server (v0.3 card + JSON-RPC + SSE) +test/itk/instruction_test.exs # codec unit tests +test/itk/agent_test.exs # interpreter unit tests +test/itk/fixtures/*.bin # encoded Instruction fixtures +``` + +Plus, in the ITK working copy (not part of the SDK repo): an `elixir_v03` agent +definition + baseline driver (`itk_baseline_elixir.py`) that boots the Elixir +server and runs ITK euler traversals against `python_v03`. + +**No `lib/` changes.** Verified: `git diff --stat origin/main -- lib/` is empty. + +--- + +## The v0.3 vs v1.0 axis (what "compliance" means here) + +ITK has **no per-agent v0.3/v1.0 switch**. The version axis is *which SDK build* +is registered (`python_v03`/`go_v03` vs `python_v10`/`go_v10`). The Instruction +proto is identical across versions; the differences are in **transport surface +and JSON wire shape**: + +- **Agent Card shape:** v0.3 clients want `preferredTransport` + + `additionalInterfaces`; v1.0 emits `supportedInterfaces`. The Elixir SDK's + `A2A.JSON.encode_agent_card` emits the v1.0 `supportedInterfaces` shape, which + the v0.3 Python `ClientFactory` cannot consume — so the harness server + hand-builds a v0.3 card. (Run the server with `--cardVersion v1` to serve the + SDK's native v1.0 card and observe the v0.3 client reject it.) + +- **Enum wire format (the core gap):** `a2a-sdk 0.3.24` (the reference v0.3 + JSON-RPC client) uses **lowercase JSON enums** — `Role.agent = "agent"`, + `TaskState.completed = "completed"`. The Elixir SDK's `A2A.JSON` emits + **proto-style** enums — `"ROLE_AGENT"`, `"TASK_STATE_COMPLETED"` (these belong + on the gRPC/proto transport, not JSON-RPC). The v0.3 client's pydantic + validation rejects them. + +- **Streaming event shape:** the ITK client's `send_message` uses + `message/stream` (SSE) as its **primary** path and requires + `Content-Type: text/event-stream`. The harness server now responds with SSE, + but the first event (a Task snapshot) is validated against the streaming-event + union (`TaskStatusUpdateEvent` / `TaskArtifactUpdateEvent`), which requires + `taskId` and `kind` fields the snapshot does not carry in the expected form. + +--- + +## Baseline results (reproducible) + +### Capability probe — raw JSON-RPC (what works) + +``` +# 1. Agent card +preferredTransport: JSONRPC | protocolVersion: 0.3.0 | url: http://127.0.0.1:PORT/jsonrpc/ + +# 2. message/send (return_response) +task.status.state: TASK_STATE_COMPLETED # <- proto-style enum (gap) +reply text: traversal-completed:jsonrpc # <- interpreter correct +role(raw): ROLE_AGENT # <- proto-style enum (gap) + +# 3. message/send (steps_concat — multi-step interpret) +reply text: '[a -> b (jsonrpc)]\ntraversal-completed:jsonrpc' +``` + +The interpreter and proto codec produce the correct traversal tokens; the +**enum casing** is the visible gap even before involving the streaming client. + +### ITK traversal — Python a2a-sdk 0.3.24 client (the gaps) + +``` +==== ITK ELIXIR v0.3 BASELINE RESULTS ==== +elixir-single-jsonrpc: FAIL +elixir-py03-AB-jsonrpc: FAIL +elixir-py03-AB-streaming: FAIL +``` + +Pydantic validation errors from the client (verbatim): + +``` +status.message.role -> Input should be 'agent' or 'user' [got 'ROLE_AGENT'] +status.state -> Input should be 'completed'... [got 'TASK_STATE_COMPLETED'] +TaskStatusUpdateEvent.taskId -> Field required +TaskArtifactUpdateEvent.kind -> Input should be 'artifact-update' [got 'task'] +``` + +--- + +## Prioritized gap list (seeds future v1.0 work — NOT in this PR) + +1. **JSON-RPC enum encoding.** `A2A.JSON` should emit lowercase v0.3 JSON enums + (`agent`, `completed`, …) on the JSON-RPC transport; reserve proto-style + enums (`ROLE_AGENT`, `TASK_STATE_COMPLETED`) for gRPC. This is the single + highest-leverage fix — it blocks every traversal. +2. **Streaming event envelopes.** Emit proper `TaskStatusUpdateEvent` / + `TaskArtifactUpdateEvent` SSE events (with `taskId`, `kind`) rather than a + raw Task snapshot, so the v0.3 streaming client can parse the stream. +3. **Agent Card v0.3 emission.** Optionally let `encode_agent_card` emit the + v0.3 `preferredTransport`/`additionalInterfaces` shape so a hand-built card + is unnecessary. +4. **gRPC / REST transports.** Required for full ITK multi-transport multi-hop; + deferred (REST scaffolding parked outside this PR). + +--- + +## Reproduce + +```bash +# Unit tests (harness, against pristine SDK) +cd a2a-elixir +mix test # 404 tests + 2 doctests, 0 failures + +# Standalone server (MUST be MIX_ENV=test — test/support is test-only) +MIX_ENV=test mix run test/itk/server.exs --httpPort 10130 +# GET /jsonrpc/.well-known/agent-card.json -> v0.3 card +# POST /jsonrpc/ (message/send, FilePart=Instruction proto) -> completed Task +# --cardVersion v1 serves the SDK's native v1.0 card (to show the v0.3 gap) + +# Full ITK baseline driver (boots the Elixir agent, runs euler traversals) +cd /path/to/a2a-samples/itk +uv run --no-sources python itk_baseline_elixir.py +``` + +## Notes + +- The standalone server **must** run under `MIX_ENV=test`: the harness modules + live in `test/support/`, which is only on `elixirc_paths` in the test env. +- Reference client: `a2a-sdk 0.3.24` (Python). diff --git a/test/itk/agent_test.exs b/test/itk/agent_test.exs new file mode 100644 index 0000000..3ed3a72 --- /dev/null +++ b/test/itk/agent_test.exs @@ -0,0 +1,105 @@ +defmodule A2A.Test.ITK.AgentTest do + use ExUnit.Case, async: true + + alias A2A.Test.ITK.Agent + alias A2A.Test.ITK.Instruction + + describe "interpret/2 (pure interpretation, downstream stubbed)" do + test "return_response yields its text" do + assert Agent.interpret({:return_response, "hello"}) == "hello" + end + + test "steps concatenates fragments with newlines, skipping empties" do + inst = + {:steps, + [ + {:return_response, "[a -> b (jsonrpc)]"}, + {:return_response, ""}, + {:return_response, "traversal-completed:jsonrpc"} + ], 1} + + assert Agent.interpret(inst) == + "[a -> b (jsonrpc)]\ntraversal-completed:jsonrpc" + end + + test "call_agent invokes the supplied call_fun and uses its result" do + stub = fn %{agent_card_uri: uri} -> "downstream:#{uri}" end + + inst = + {:call_agent, + %{ + transport: "jsonrpc", + agent_card_uri: "http://127.0.0.1:1/jsonrpc", + instruction: {:return_response, "leaf"}, + streaming: false + }} + + assert Agent.interpret(inst, stub) == "downstream:http://127.0.0.1:1/jsonrpc" + end + + test "nested A->B->A traversal shape concatenates trace + downstream text" do + # B's local fragment + B's downstream (the leaf) — stub returns leaf text. + stub = fn %{instruction: leaf} -> Agent.interpret(leaf, fn _ -> "" end) end + + root = + {:steps, + [ + {:return_response, "[A -> B (jsonrpc)]"}, + {:call_agent, + %{ + transport: "jsonrpc", + agent_card_uri: "http://127.0.0.1:1/jsonrpc", + instruction: {:return_response, "traversal-completed:jsonrpc"}, + streaming: false + }} + ], 1} + + assert Agent.interpret(root, stub) == + "[A -> B (jsonrpc)]\ntraversal-completed:jsonrpc" + end + end + + describe "call_downstream/1 transport guard" do + test "non-jsonrpc transport returns a visible error fragment" do + result = + Agent.call_downstream(%{ + transport: "grpc", + agent_card_uri: "http://127.0.0.1:1/grpc", + instruction: {:return_response, "x"}, + streaming: false + }) + + assert result =~ "unsupported transport" + assert result =~ "grpc" + end + end + + describe "handle_send/3 (end-to-end over a decoded FilePart, no network)" do + test "decodes a return_response instruction and completes with status.message" do + inst = {:return_response, "traversal-completed:jsonrpc"} + message = build_instruction_message(inst) + + assert {:ok, task} = Agent.handle_send(message, %{}, %{}) + assert task.status.state == :completed + assert A2A.Message.text(task.status.message) == "traversal-completed:jsonrpc" + end + + test "errors when no instruction FilePart is present" do + message = A2A.Message.new_user("just text, no proto") + assert {:error, error} = Agent.handle_send(message, %{}, %{}) + assert error.code == -32602 + end + end + + defp build_instruction_message(inst) do + bytes = Instruction.encode(inst) + + file = + A2A.FileContent.from_bytes(bytes, + name: "instruction.bin", + mime_type: "application/x-protobuf" + ) + + A2A.Message.new_user([A2A.Part.File.new(file)]) + end +end diff --git a/test/itk/fixtures/call_agent_hop.bin b/test/itk/fixtures/call_agent_hop.bin new file mode 100644 index 0000000..ffb1cb2 --- /dev/null +++ b/test/itk/fixtures/call_agent_hop.bin @@ -0,0 +1,7 @@ +g + +[A -> B (jsonrpc)] +K +I +jsonrpchttp://127.0.0.1:1234/jsonrpc +traversal-completed:jsonrpc \ No newline at end of file diff --git a/test/itk/fixtures/return_response.bin b/test/itk/fixtures/return_response.bin new file mode 100644 index 0000000..aedc44b --- /dev/null +++ b/test/itk/fixtures/return_response.bin @@ -0,0 +1,2 @@ + +traversal-completed:jsonrpc \ No newline at end of file diff --git a/test/itk/fixtures/steps_concat.bin b/test/itk/fixtures/steps_concat.bin new file mode 100644 index 0000000..29359cf --- /dev/null +++ b/test/itk/fixtures/steps_concat.bin @@ -0,0 +1,5 @@ +; + +[a -> b (jsonrpc)] + +traversal-completed:jsonrpc \ No newline at end of file diff --git a/test/itk/instruction_test.exs b/test/itk/instruction_test.exs new file mode 100644 index 0000000..c9533e7 --- /dev/null +++ b/test/itk/instruction_test.exs @@ -0,0 +1,131 @@ +defmodule A2A.Test.ITK.InstructionTest do + use ExUnit.Case, async: true + + alias A2A.Test.ITK.Instruction + + describe "round-trip encode/decode" do + test "return_response" do + inst = {:return_response, "traversal-completed:jsonrpc"} + assert {:ok, ^inst} = Instruction.decode(Instruction.encode(inst)) + end + + test "steps with concat generator" do + inst = + {:steps, + [ + {:return_response, "[a -> b (jsonrpc)]"}, + {:return_response, "tail"} + ], 1} + + assert {:ok, decoded} = Instruction.decode(Instruction.encode(inst)) + assert decoded == inst + end + + test "steps with unspecified generator (0) omitted on wire" do + inst = {:steps, [{:return_response, "x"}], 0} + assert {:ok, ^inst} = Instruction.decode(Instruction.encode(inst)) + end + + test "call_agent with nested instruction" do + inst = + {:call_agent, + %{ + transport: "jsonrpc", + agent_card_uri: "http://127.0.0.1:1234/jsonrpc", + instruction: {:return_response, "leaf"}, + streaming: true + }} + + assert {:ok, decoded} = Instruction.decode(Instruction.encode(inst)) + assert decoded == inst + end + + test "deeply nested A->B->A traversal shape" do + leaf = {:return_response, "traversal-completed:jsonrpc"} + + hop_b = + {:steps, + [ + {:return_response, "[B -> A (jsonrpc)]"}, + {:call_agent, + %{ + transport: "jsonrpc", + agent_card_uri: "http://127.0.0.1:2/jsonrpc", + instruction: leaf, + streaming: false + }} + ], 1} + + root = + {:steps, + [ + {:return_response, "[A -> B (jsonrpc)]"}, + {:call_agent, + %{ + transport: "jsonrpc", + agent_card_uri: "http://127.0.0.1:1/jsonrpc", + instruction: hop_b, + streaming: false + }} + ], 1} + + assert {:ok, decoded} = Instruction.decode(Instruction.encode(root)) + assert decoded == root + end + end + + describe "decodes Python-generated fixtures" do + @fixture_dir Path.join([__DIR__, "fixtures"]) + + test "decodes captured python_v03 instruction.bin fixtures, if present" do + case File.ls(@fixture_dir) do + {:ok, files} -> + bins = Enum.filter(files, &String.ends_with?(&1, ".bin")) + + if bins == [] do + # No fixtures captured in this environment; skip silently. + assert true + else + for f <- bins do + raw = File.read!(Path.join(@fixture_dir, f)) + assert {:ok, _inst} = Instruction.decode(raw) + end + + assert_fixture("return_response.bin", {:return_response, "traversal-completed:jsonrpc"}) + + assert_fixture( + "steps_concat.bin", + {:steps, + [ + {:return_response, "[a -> b (jsonrpc)]"}, + {:return_response, "traversal-completed:jsonrpc"} + ], 1} + ) + + assert_fixture( + "call_agent_hop.bin", + {:steps, + [ + {:return_response, "[A -> B (jsonrpc)]"}, + {:call_agent, + %{ + transport: "jsonrpc", + agent_card_uri: "http://127.0.0.1:1234/jsonrpc", + instruction: {:return_response, "traversal-completed:jsonrpc"}, + streaming: false + }} + ], 1} + ) + end + + {:error, _} -> + assert true + end + end + + defp assert_fixture(name, expected) do + raw = File.read!(Path.join(@fixture_dir, name)) + assert {:ok, ^expected} = Instruction.decode(raw) + end + end +end diff --git a/test/itk/server.exs b/test/itk/server.exs new file mode 100644 index 0000000..c4e3144 --- /dev/null +++ b/test/itk/server.exs @@ -0,0 +1,257 @@ +# Boots a Bandit server exposing the test-harness ITK Elixir agent over A2A +# JSON-RPC, shaped so the Python A2A ITK runner (a2a-samples/itk) can drive it. +# +# Usage: +# mix run test/itk/server.exs --httpPort 10110 +# A2A_ITK_HTTP_PORT=10110 mix run test/itk/server.exs +# +# Mirrors test/tck/server.exs, but: +# * Serves a v0.3-shaped Agent Card (preferredTransport / additionalInterfaces / +# url ending in /jsonrpc/) at /jsonrpc/.well-known/agent-card.json. The Elixir +# SDK's own card encoder emits the v1.0-style `supportedInterfaces` field which +# the Python v0.3 ClientFactory does not understand, so we hand-build the v0.3 +# card here (test-harness-only; lib/ is untouched). This card-shape difference +# is part of the documented v0.3-vs-v1.0 axis (see docs/ITK_BASELINE.md). +# * Routes JSON-RPC POSTs through A2A.JSONRPC.handle/3 with the test-harness +# handler A2A.Test.ITK.Agent, which decodes the ITK Instruction proto and +# returns a completed task whose status.message carries the response text. +# +# --grpcPort is accepted and ignored (gRPC is intentionally out of scope). +# +# IMPORTANT: run this under MIX_ENV=test so the harness support modules +# (A2A.Test.ITK.Instruction / .Agent under test/support/) are compiled and +# available — they are only on elixirc_paths in :test (see mix.exs). The ITK +# harness launcher sets MIX_ENV=test: +# MIX_ENV=test mix run test/itk/server.exs --httpPort 10110 +if Code.ensure_loaded?(A2A.Test.ITK.Agent) == false do + IO.puts(:stderr, "FATAL: A2A.Test.ITK.Agent not loaded — run with MIX_ENV=test") + System.halt(1) +end + +defmodule A2A.Test.ITK.CardAgent do + @moduledoc false + # Minimal GenServer agent; only used so a real A2A agent identity exists. + # The actual JSON-RPC handling is done by A2A.Test.ITK.Agent. + use A2A.Agent, + name: "itk-elixir-v03-agent", + description: "A2A Elixir ITK agent (JSON-RPC only)", + version: "0.3.0", + skills: [ + %{ + id: "itk_proto_skill", + name: "ITK Proto Skill", + description: "Handles raw byte Instruction protos over JSON-RPC.", + tags: ["proto", "itk", "jsonrpc"] + } + ] + + @impl A2A.Agent + def handle_message(_message, _context) do + {:reply, [A2A.Part.Text.new("itk-elixir-agent")]} + end + + @impl A2A.Agent + def handle_cancel(_context), do: :ok +end + +defmodule A2A.Test.ITK.Router do + @moduledoc false + use Plug.Router + + plug(:match) + plug(:dispatch) + + # v0.3-shaped agent card. The Python v0.3 ClientFactory fetches this relative + # to the agent_card_uri (".../jsonrpc/.well-known/agent-card.json"). + defp agent_card_json do + host = Application.get_env(:a2a_itk, :host) + port = Application.get_env(:a2a_itk, :http_port) + grpc_port = Application.get_env(:a2a_itk, :grpc_port) + + %{ + "name" => "ITK Elixir v03 Agent", + "description" => "A2A Elixir SDK agent for ITK conformance (JSON-RPC only).", + "url" => "http://#{host}:#{port}/jsonrpc/", + "version" => "0.3.0", + "protocolVersion" => "0.3.0", + "preferredTransport" => "JSONRPC", + "defaultInputModes" => ["text"], + "defaultOutputModes" => ["text"], + "capabilities" => %{"streaming" => true}, + "skills" => [ + %{ + "id" => "itk_proto_skill", + "name" => "ITK Proto Skill", + "description" => "Handles raw byte Instruction protos over JSON-RPC.", + "tags" => ["proto", "itk", "jsonrpc"] + } + ], + # gRPC interface advertised for card-shape parity with the python_v03 card, + # but the Elixir agent does NOT serve gRPC. Traversals must use jsonrpc. + "additionalInterfaces" => [ + %{"transport" => "GRPC", "url" => "#{host}:#{grpc_port}"} + ] + } + end + + # Native v1.0-style card emitted by the SDK's own encoder + # (A2A.JSON.encode_agent_card). This produces the `supportedInterfaces` + # shape and OMITS `preferredTransport`/`additionalInterfaces`. Used only to + # demonstrate the documented v0.3-vs-v1.0 card-shape gap (see + # docs/ITK_BASELINE.md). Selected via `--cardVersion v1`. + defp agent_card_v10_json do + host = Application.get_env(:a2a_itk, :host) + port = Application.get_env(:a2a_itk, :http_port) + + A2A.get_agent_card(A2A.Test.ITK.CardAgent, + base_url: "http://#{host}:#{port}/jsonrpc/", + capabilities: %{streaming: true}, + default_input_modes: ["text"], + default_output_modes: ["text"], + protocol_version: "0.3.0" + ) + end + + defp send_card(conn) do + card = + case Application.get_env(:a2a_itk, :card_version) do + "v1" -> agent_card_v10_json() + _ -> agent_card_json() + end + + conn + |> put_resp_content_type("application/json") + |> send_resp(200, Jason.encode!(card)) + end + + defp handle_rpc(conn) do + {:ok, body, conn} = Plug.Conn.read_body(conn, length: 10_000_000) + + case Jason.decode(body) do + {:ok, decoded} -> + case A2A.JSONRPC.handle(decoded, A2A.Test.ITK.Agent, %{}) do + {:reply, response} -> + conn + |> put_resp_content_type("application/json") + |> send_resp(200, Jason.encode!(response)) + + {:stream, _method, params, id} -> + # The ITK Python client's `send_message` uses the streaming method + # (message/stream) as its PRIMARY path and REQUIRES a + # `text/event-stream` (SSE) response. We compute the result via the + # same handler used for message/send, then emit it as SSE: a Task + # snapshot event followed by a final StatusUpdate event (final:true). + # This mirrors A2A.Plug.SSE's wire format without requiring the agent + # to implement A2A.stream/3. + stream_send_as_sse(conn, params, id) + end + + {:error, _} -> + send_resp(conn, 400, "invalid json") + end + end + + # Emit a non-streaming handler result as an SSE stream so the ITK Python + # client (which calls message/stream and expects text/event-stream) is + # satisfied. Events mirror A2A.Plug.SSE: each `data:` line wraps a JSON-RPC + # success envelope around an encoded A2A object. We send the completed Task + # snapshot, then a final StatusUpdate (final: true). + defp stream_send_as_sse(conn, params, id) do + # A2A.JSONRPC.handle pre-decodes params["message"] into an %A2A.Message{} + # struct before emitting the {:stream, ...} tuple (see A2A.Plug usage), so + # we use it directly rather than re-decoding. + message = params["message"] + + case A2A.Test.ITK.Agent.handle_send(message, params, %{}) do + {:ok, %A2A.Task{} = task} -> + conn = + conn + |> put_resp_header("content-type", "text/event-stream") + |> put_resp_header("cache-control", "no-cache") + |> send_chunked(200) + + {:ok, task_encoded} = A2A.JSON.encode(task) + conn = sse_event(conn, id, task_encoded) + + final = + A2A.Event.StatusUpdate.new( + task.id, + A2A.Task.Status.new(:completed, task.status.message), + context_id: task.context_id, + final: true + ) + + {:ok, final_encoded} = A2A.JSON.encode(final) + sse_event(conn, id, final_encoded) + + {:error, error} -> + conn + |> put_resp_content_type("application/json") + |> send_resp(200, Jason.encode!(%{"jsonrpc" => "2.0", "id" => id, "error" => error})) + end + end + + defp sse_event(conn, id, encoded_result) do + payload = %{"jsonrpc" => "2.0", "id" => id, "result" => encoded_result} + + case chunk(conn, "data: #{Jason.encode!(payload)}\n\n") do + {:ok, conn} -> conn + {:error, :closed} -> conn + end + end + + # Card at both the /jsonrpc-mounted path and the bare well-known path. + get "/jsonrpc/.well-known/agent-card.json", do: send_card(conn) + get "/.well-known/agent-card.json", do: send_card(conn) + + # JSON-RPC endpoint. Plug normalizes the trailing slash, so a single route + # matches both /jsonrpc and /jsonrpc/. + post "/jsonrpc", do: handle_rpc(conn) + + match _ do + send_resp(conn, 404, "not found") + end +end + +# --- argument / env parsing ------------------------------------------------- + +{parsed, _rest, _invalid} = + OptionParser.parse(System.argv(), + strict: [httpPort: :integer, grpcPort: :integer, cardVersion: :string], + aliases: [] + ) + +http_port = + parsed[:httpPort] || + (System.get_env("A2A_ITK_HTTP_PORT") && String.to_integer(System.get_env("A2A_ITK_HTTP_PORT"))) || + 10_110 + +grpc_port = + parsed[:grpcPort] || + (System.get_env("A2A_ITK_GRPC_PORT") && String.to_integer(System.get_env("A2A_ITK_GRPC_PORT"))) || + 11_110 + +host = System.get_env("A2A_ITK_HOST") || "127.0.0.1" + +card_version = parsed[:cardVersion] || System.get_env("A2A_ITK_CARD_VERSION") || "v03" + +Application.put_env(:a2a_itk, :host, host) +Application.put_env(:a2a_itk, :http_port, http_port) +Application.put_env(:a2a_itk, :grpc_port, grpc_port) +Application.put_env(:a2a_itk, :card_version, card_version) + +{:ok, _} = A2A.Test.ITK.CardAgent.start_link() + +{:ok, _} = + Bandit.start_link( + plug: A2A.Test.ITK.Router, + port: http_port, + ip: {127, 0, 0, 1}, + startup_log: false + ) + +IO.puts("ITK Elixir agent running on http://#{host}:#{http_port}/jsonrpc") +IO.puts("Agent card: http://#{host}:#{http_port}/jsonrpc/.well-known/agent-card.json") +IO.puts("(gRPC port #{grpc_port} advertised but NOT served — JSON-RPC only)") + +Process.sleep(:infinity) diff --git a/test/support/itk/agent.ex b/test/support/itk/agent.ex new file mode 100644 index 0000000..9b80cc3 --- /dev/null +++ b/test/support/itk/agent.ex @@ -0,0 +1,271 @@ +defmodule A2A.Test.ITK.Agent do + @moduledoc """ + Test-harness-only A2A agent that participates in the A2A Interoperability + Test Kit (ITK). + + This module is **not** part of the shipped library — it lives under `test/` + (mirroring `test/tck/`) and exists solely so a JSON-RPC-only Elixir agent can + be driven by the Python ITK runner (`a2a-samples/itk`). + + ## What it does + + The ITK runner wraps a serialized `itk.Instruction` protobuf inside an A2A + `FilePart` (`mime_type: application/x-protobuf`, `name: instruction.bin`) and + dispatches it over JSON-RPC `message/send`. This agent: + + 1. Extracts the FilePart bytes from the incoming `%A2A.Message{}`. + 2. Decodes them with `A2A.Test.ITK.Instruction.decode/1`. + 3. Recursively interprets the instruction: + - `{:return_response, text}` → returns `text`. + - `{:steps, instructions, gen}` → interprets each in order and concatenates + the resulting text fragments (the only response generator ITK uses is + `CONCAT`). + - `{:call_agent, %{...}}` → calls the downstream agent over JSON-RPC using + `A2A.Client.send_message/3` (or `stream_message/3` when `streaming` is + set), then extracts the text from the downstream response. + + The accumulated text is returned as a completed `%A2A.Task{}` whose + `status.message` holds the text. This matches the Python v0.3 ITK agent, whose + `task_updater.complete(message=...)` populates `status.message` — which is the + field the ITK runner (`testlib.execute_itk_test`) reads to verify traversal + tokens. + + ## Why a custom JSON-RPC handler (not `use A2A.Agent`) + + `use A2A.Agent` maps a `{:reply, parts}` result to an **artifact** plus a + `:completed` status, but it does *not* set `status.message`. The ITK runner + only reads `Message` responses and `task.status.message` text, so we build the + completed task directly here, setting `status.message`. We still wire this + module as an `A2A.JSONRPC` handler so the SDK's own JSON-RPC dispatch and JSON + codec do the envelope/serialization work. + + Transport scope: JSON-RPC only. gRPC/REST hops are out of scope (see + `docs/ITK_BASELINE.md`); a `CallAgent` requesting a non-JSON-RPC transport is + reported as an error fragment rather than silently mis-handled. + """ + + @behaviour A2A.JSONRPC + + alias A2A.JSONRPC.Error + alias A2A.Test.ITK.Instruction + + @typedoc "Options threaded into the handler context by the server." + @type context :: map() + + # --------------------------------------------------------------------------- + # A2A.JSONRPC behaviour + # --------------------------------------------------------------------------- + + @impl A2A.JSONRPC + @spec handle_send(A2A.Message.t(), map(), context()) :: + {:ok, A2A.Task.t()} | {:error, Error.t()} + def handle_send(%A2A.Message{} = message, params, _context) do + with {:ok, bytes} <- extract_instruction_bytes(message), + {:ok, instruction} <- Instruction.decode(bytes) do + text = interpret(instruction) + + task = + completed_task( + message.task_id || A2A.ID.generate("task"), + message.context_id || params["contextId"] || A2A.ID.generate("ctx"), + text + ) + + {:ok, task} + else + {:error, reason} -> + {:error, Error.invalid_params("ITK instruction handling failed: #{inspect(reason)}")} + end + end + + @impl A2A.JSONRPC + @spec handle_get(String.t(), map(), context()) :: {:ok, A2A.Task.t()} | {:error, Error.t()} + def handle_get(task_id, _params, _context) do + # The ITK runner does not call tasks/get; return a not-found error. + {:error, Error.task_not_found(task_id)} + end + + @impl A2A.JSONRPC + @spec handle_cancel(String.t(), map(), context()) :: {:ok, A2A.Task.t()} | {:error, Error.t()} + def handle_cancel(task_id, _params, _context) do + {:error, Error.task_not_found(task_id)} + end + + # --------------------------------------------------------------------------- + # Instruction interpretation + # --------------------------------------------------------------------------- + + @doc """ + Recursively interprets a decoded ITK instruction into a response string. + + Exposed for unit testing. `call_agent` hops use `call_downstream/1` by + default; pass a 1-arity function as the second argument to stub downstream + calls in tests. + """ + @spec interpret(Instruction.instruction(), (Instruction.call_agent() -> String.t())) :: + String.t() + def interpret(instruction, call_fun \\ &call_downstream/1) + + def interpret({:return_response, text}, _call_fun), do: text + + def interpret({:steps, instructions, _generator}, call_fun) do + # ITK only ever uses RESPONSE_GENERATOR_CONCAT (or unspecified, treated the + # same): interpret each step in order and concatenate the fragments. + instructions + |> Enum.map(&interpret(&1, call_fun)) + |> Enum.reject(&(&1 == "")) + |> Enum.join("\n") + end + + def interpret({:call_agent, call_agent}, call_fun) do + call_fun.(call_agent) + end + + # --------------------------------------------------------------------------- + # Downstream call (JSON-RPC only) + # --------------------------------------------------------------------------- + + @doc """ + Calls a downstream agent for a `CallAgent` step and returns its text. + + Only the `jsonrpc` transport is supported. Other transports yield an error + fragment so the failure is visible in the traversal output rather than silent. + """ + @spec call_downstream(Instruction.call_agent()) :: String.t() + def call_downstream(%{transport: transport} = call) + when transport in ["jsonrpc", "JSONRPC", "", nil] do + message = wrap_instruction(call.instruction) + target = card_base_url(call.agent_card_uri) + + if call.streaming do + call_downstream_stream(target, message, call.agent_card_uri) + else + call_downstream_send(target, message, call.agent_card_uri) + end + end + + def call_downstream(%{transport: transport, agent_card_uri: uri}) do + "ERROR: unsupported transport #{inspect(transport)} for #{uri} (JSON-RPC only)" + end + + defp call_downstream_send(target, message, uri) do + case A2A.Client.send_message(target, message) do + {:ok, %A2A.Task{} = task} -> task_text(task) + {:error, reason} -> "ERROR: call to #{uri} failed: #{inspect(reason)}" + end + end + + defp call_downstream_stream(target, message, uri) do + case A2A.Client.stream_message(target, message) do + {:ok, stream} -> + stream + |> Enum.flat_map(&event_text/1) + |> Enum.reject(&(&1 == "")) + |> Enum.join("\n") + + {:error, reason} -> + "ERROR: stream call to #{uri} failed: #{inspect(reason)}" + end + end + + # --------------------------------------------------------------------------- + # Text extraction helpers + # --------------------------------------------------------------------------- + + # Pulls text out of a downstream task: prefer status.message, fall back to + # the most recent artifact / agent history message. + defp task_text(%A2A.Task{status: %A2A.Task.Status{message: %A2A.Message{} = msg}}) do + message_text(msg) + end + + defp task_text(%A2A.Task{artifacts: [_ | _] = artifacts}) do + artifacts + |> List.last() + |> Map.get(:parts, []) + |> parts_text() + end + + defp task_text(%A2A.Task{history: history}) when is_list(history) and history != [] do + history + |> Enum.reverse() + |> Enum.find(&(&1.role == :agent)) + |> case do + nil -> "" + msg -> message_text(msg) + end + end + + defp task_text(_), do: "" + + defp event_text(%A2A.Message{} = msg), do: [message_text(msg)] + + defp event_text(%A2A.Event.StatusUpdate{status: %A2A.Task.Status{message: %A2A.Message{} = m}}) do + [message_text(m)] + end + + defp event_text(%A2A.Event.ArtifactUpdate{artifact: %A2A.Artifact{parts: parts}}) do + [parts_text(parts)] + end + + defp event_text(%A2A.Task{} = task), do: [task_text(task)] + defp event_text(_), do: [""] + + defp message_text(%A2A.Message{parts: parts}), do: parts_text(parts) + + defp parts_text(parts) do + parts + |> Enum.map(fn + %A2A.Part.Text{text: text} -> text + _ -> nil + end) + |> Enum.reject(&is_nil/1) + |> Enum.join("\n") + end + + # --------------------------------------------------------------------------- + # Message / task construction + # --------------------------------------------------------------------------- + + defp extract_instruction_bytes(%A2A.Message{parts: parts}) do + parts + |> Enum.find_value(fn + %A2A.Part.File{file: %A2A.FileContent{bytes: bytes}} when is_binary(bytes) -> bytes + _ -> nil + end) + |> case do + nil -> {:error, :no_instruction_file_part} + bytes -> {:ok, bytes} + end + end + + # Wraps a nested instruction back into an A2A message (FilePart) for the + # downstream hop, mirroring the ITK runner's `_wrap_instruction`. + defp wrap_instruction(instruction) do + bytes = Instruction.encode(instruction) + + file = + A2A.FileContent.from_bytes(bytes, + name: "instruction.bin", + mime_type: "application/x-protobuf" + ) + + A2A.Message.new_user([A2A.Part.File.new(file)]) + end + + defp completed_task(task_id, context_id, text) do + status_message = A2A.Message.new_agent([A2A.Part.Text.new(text)]) + + %A2A.Task{ + id: task_id, + context_id: context_id, + status: A2A.Task.Status.new(:completed, status_message), + history: [], + artifacts: [A2A.Artifact.new([A2A.Part.Text.new(text)])], + metadata: %{} + } + end + + # The ITK runner connects to `http://host:port/jsonrpc` and the JSON-RPC POST + # endpoint is served at `/jsonrpc/` — strip a trailing slash for the client. + defp card_base_url(uri), do: String.trim_trailing(uri, "/") +end diff --git a/test/support/itk/instruction.ex b/test/support/itk/instruction.ex new file mode 100644 index 0000000..2556d7c --- /dev/null +++ b/test/support/itk/instruction.ex @@ -0,0 +1,248 @@ +defmodule A2A.Test.ITK.Instruction do + @moduledoc """ + Hand-rolled proto3 codec for the A2A ITK `itk.Instruction` payload. + + This is **test-harness-only** code (it lives under `test/support/`, not `lib/`), + mirroring the `test/tck/` pattern. It exists solely so a JSON-RPC-only Elixir + agent can participate in the A2A Interoperability Test Kit (ITK), whose agents + exchange a serialized protobuf `Instruction` carried as a FilePart. + + The wire schema (from `a2a-samples/itk/protos/instruction.proto`): + + message Instruction { + oneof step { + CallAgent call_agent = 1; + ReturnResponse return_response = 2; + SeriesOfSteps steps = 3; + } + } + message CallAgent { + string transport = 1; + string agent_card_uri = 2; + Instruction instruction = 3; + bool streaming = 4; + } + message ReturnResponse { string response = 1; } + message SeriesOfSteps { + repeated Instruction instructions = 1; + enum ResponseGenerator { + RESPONSE_GENERATOR_UNSPECIFIED = 0; + RESPONSE_GENERATOR_CONCAT = 1; + } + ResponseGenerator response_generator = 2; + } + + No external protobuf dependency is used. We implement the minimal proto3 wire + format (varints + length-delimited fields) needed for these four messages. + + Decoded instructions are represented as tagged tuples: + + - `{:return_response, binary}` + - `{:steps, [instruction], response_generator :: 0 | 1}` + - `{:call_agent, %{transport: binary, agent_card_uri: binary, + instruction: instruction | nil, streaming: boolean}}` + """ + + @type response_generator :: 0 | 1 + + @type call_agent :: %{ + transport: binary(), + agent_card_uri: binary(), + instruction: instruction() | nil, + streaming: boolean() + } + + @type instruction :: + {:return_response, binary()} + | {:steps, [instruction()], response_generator()} + | {:call_agent, call_agent()} + + # --------------------------------------------------------------------------- + # Decoding + # --------------------------------------------------------------------------- + + @doc """ + Decodes a serialized `itk.Instruction` protobuf into a tagged tuple. + + Returns `{:ok, instruction}` or `{:error, reason}`. + """ + @spec decode(binary()) :: {:ok, instruction()} | {:error, term()} + def decode(bin) when is_binary(bin) do + {:ok, decode_instruction(bin)} + rescue + e -> {:error, {:decode_failed, Exception.message(e)}} + end + + @doc """ + Like `decode/1` but raises on failure. Convenient for tests. + """ + @spec decode!(binary()) :: instruction() + def decode!(bin) do + case decode(bin) do + {:ok, inst} -> inst + {:error, reason} -> raise "ITK instruction decode failed: #{inspect(reason)}" + end + end + + # An Instruction is a oneof over fields 1 (call_agent), 2 (return_response), + # 3 (steps). The last one wins if (illegally) repeated. + defp decode_instruction(bin) do + bin + |> decode_fields() + |> Enum.reduce(nil, fn + {1, {:length_delimited, data}}, _acc -> {:call_agent, decode_call_agent(data)} + {2, {:length_delimited, data}}, _acc -> {:return_response, decode_return_response(data)} + {3, {:length_delimited, data}}, _acc -> decode_steps(data) + _other, acc -> acc + end) + end + + defp decode_return_response(bin) do + bin + |> decode_fields() + |> Enum.reduce("", fn + {1, {:length_delimited, data}}, _acc -> data + _other, acc -> acc + end) + end + + defp decode_steps(bin) do + {instructions_rev, gen} = + bin + |> decode_fields() + |> Enum.reduce({[], 0}, fn + {1, {:length_delimited, data}}, {acc, gen} -> + {[decode_instruction(data) | acc], gen} + + {2, {:varint, v}}, {acc, _gen} -> + {acc, v} + + _other, acc_gen -> + acc_gen + end) + + {:steps, Enum.reverse(instructions_rev), gen} + end + + defp decode_call_agent(bin) do + bin + |> decode_fields() + |> Enum.reduce( + %{transport: "", agent_card_uri: "", instruction: nil, streaming: false}, + fn + {1, {:length_delimited, data}}, acc -> %{acc | transport: data} + {2, {:length_delimited, data}}, acc -> %{acc | agent_card_uri: data} + {3, {:length_delimited, data}}, acc -> %{acc | instruction: decode_instruction(data)} + {4, {:varint, v}}, acc -> %{acc | streaming: v != 0} + _other, acc -> acc + end + ) + end + + # Decodes a flat list of {field_number, value} from a proto3 message body. + defp decode_fields(<<>>), do: [] + + defp decode_fields(bin) do + {tag, rest} = decode_varint(bin) + field_number = Bitwise.bsr(tag, 3) + wire_type = Bitwise.band(tag, 0x7) + + case wire_type do + 0 -> + {value, rest} = decode_varint(rest) + [{field_number, {:varint, value}} | decode_fields(rest)] + + 2 -> + {len, rest} = decode_varint(rest) + <> = rest + [{field_number, {:length_delimited, data}} | decode_fields(rest)] + + 1 -> + <<_fixed64::binary-size(8), rest::binary>> = rest + decode_fields(rest) + + 5 -> + <<_fixed32::binary-size(4), rest::binary>> = rest + decode_fields(rest) + + other -> + raise "unsupported proto wire type: #{other}" + end + end + + defp decode_varint(bin), do: decode_varint(bin, 0, 0) + + defp decode_varint(<<1::1, chunk::7, rest::binary>>, shift, acc) do + decode_varint(rest, shift + 7, Bitwise.bor(acc, Bitwise.bsl(chunk, shift))) + end + + defp decode_varint(<<0::1, chunk::7, rest::binary>>, shift, acc) do + {Bitwise.bor(acc, Bitwise.bsl(chunk, shift)), rest} + end + + # --------------------------------------------------------------------------- + # Encoding (used by tests / fixtures; round-trips with decode/1) + # --------------------------------------------------------------------------- + + @doc """ + Encodes a tagged-tuple instruction back into serialized proto3 bytes. + """ + @spec encode(instruction()) :: binary() + def encode({:return_response, response}) do + encode_field(2, {:length_delimited, encode_return_response(response)}) + end + + def encode({:steps, instructions, gen}) do + encode_field(3, {:length_delimited, encode_steps(instructions, gen)}) + end + + def encode({:call_agent, ca}) do + encode_field(1, {:length_delimited, encode_call_agent(ca)}) + end + + defp encode_return_response(response) do + encode_field(1, {:length_delimited, response}) + end + + defp encode_steps(instructions, gen) do + body = + instructions + |> Enum.map(fn inst -> encode_field(1, {:length_delimited, encode(inst)}) end) + |> IO.iodata_to_binary() + + gen_field = if gen == 0, do: <<>>, else: encode_field(2, {:varint, gen}) + body <> gen_field + end + + defp encode_call_agent(ca) do + [ + encode_str(1, ca.transport), + encode_str(2, ca.agent_card_uri), + case ca.instruction do + nil -> <<>> + inst -> encode_field(3, {:length_delimited, encode(inst)}) + end, + if(ca.streaming, do: encode_field(4, {:varint, 1}), else: <<>>) + ] + |> IO.iodata_to_binary() + end + + defp encode_str(_field, ""), do: <<>> + defp encode_str(field, str), do: encode_field(field, {:length_delimited, str}) + + defp encode_field(field_number, {:varint, value}) do + tag = Bitwise.bor(Bitwise.bsl(field_number, 3), 0) + encode_varint(tag) <> encode_varint(value) + end + + defp encode_field(field_number, {:length_delimited, data}) do + tag = Bitwise.bor(Bitwise.bsl(field_number, 3), 2) + encode_varint(tag) <> encode_varint(byte_size(data)) <> data + end + + defp encode_varint(value) when value < 0x80, do: <> + + defp encode_varint(value) do + <<1::1, Bitwise.band(value, 0x7F)::7>> <> encode_varint(Bitwise.bsr(value, 7)) + end +end