Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ concurrency:
env:
MIX_ENV: test
POSTGRES_IMAGE: supabase/postgres:17.6.1.074
OLLAMA_IMAGE: ollama/ollama
OLLAMA_MODEL: qwen2:0.5b

jobs:
tests:
Expand Down Expand Up @@ -85,6 +87,65 @@ jobs:
name: coverage-partition-${{ matrix.partition }}
path: cover/lcov.info

live-llm-tests:
name: Tests (AI Agent / Live LLM)
runs-on: blacksmith-8vcpu-ubuntu-2404

steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Setup elixir
id: beam
uses: erlef/setup-beam@ee09b1e59bb240681c382eb1f0abc6a04af72764 # v1.23.0
with:
otp-version: 27.x
elixir-version: 1.18.x
- name: Cache Mix
uses: actions/cache@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
with:
path: |
deps
_build
priv/native
key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-
- name: Cache Docker images
uses: actions/cache@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
id: docker-cache
with:
path: /tmp/docker-images
key: docker-images-live-llm-zstd-${{ env.POSTGRES_IMAGE }}-${{ env.OLLAMA_IMAGE }}-${{ env.OLLAMA_MODEL }}
- name: Load Docker images from cache
if: steps.docker-cache.outputs.cache-hit == 'true'
run: |
zstd -d --stdout /tmp/docker-images/postgres.tar.zst | docker image load
zstd -d --stdout /tmp/docker-images/ollama.tar.zst | docker image load
- name: Pull and save Docker images
if: steps.docker-cache.outputs.cache-hit != 'true'
run: |
docker pull ${{ env.POSTGRES_IMAGE }} &
PID1=$!
docker pull ${{ env.OLLAMA_IMAGE }} &
PID2=$!
wait $PID1 || exit $?
wait $PID2 || exit $?
mkdir -p /tmp/docker-images
docker image save ${{ env.POSTGRES_IMAGE }} | zstd -T0 -o /tmp/docker-images/postgres.tar.zst
docker image save ${{ env.OLLAMA_IMAGE }} | zstd -T0 -o /tmp/docker-images/ollama.tar.zst
- name: Install dependencies
run: mix deps.get
- name: Set up Postgres
run: docker compose -f compose.dbs.yml up -d --wait
- name: Set up Ollama
run: docker compose -f compose.ollama.yml up -d --wait
- name: Start epmd
run: epmd -daemon
- name: Run AI agent live tests
env:
OLLAMA_HOST: http://localhost:11434
OLLAMA_MODEL: ${{ env.OLLAMA_MODEL }}
run: mix test --include live_llm test/integration/ai_agent/

coverage:
name: Merge Coverage
needs: tests
Expand Down
28 changes: 28 additions & 0 deletions compose.ollama.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
services:
ollama:
image: ollama/ollama
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
healthcheck:
test: ["CMD-SHELL", "ollama list || exit 1"]
interval: 10s
timeout: 5s
retries: 10
start_period: 10s

ollama-model-init:
image: ollama/ollama
depends_on:
ollama:
condition: service_healthy
volumes:
- ollama_data:/root/.ollama
environment:
OLLAMA_HOST: http://ollama:11434
entrypoint: ["/bin/sh", "-c", "ollama pull ${OLLAMA_MODEL:-qwen2:0.5b}"]
restart: "no"

volumes:
ollama_data:
6 changes: 3 additions & 3 deletions compose.tests.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
services:
# Supabase Realtime service
test_db:
image: supabase/postgres:17.6.1.074
container_name: test-realtime-db
Expand All @@ -16,9 +15,11 @@ services:
interval: 10s
timeout: 5s
retries: 5

test_realtime:
depends_on:
- test_db
test_db:
condition: service_healthy
build: .
container_name: test-realtime-server
ports:
Expand Down Expand Up @@ -58,7 +59,6 @@ services:
retries: 5
start_period: 5s

# Deno test runner
test-runner:
image: denoland/deno:alpine-2.5.6
container_name: deno-test-runner
Expand Down
2 changes: 2 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
include:
- compose.dbs.yml
- compose.ollama.yml

services:
realtime:
depends_on:
- db
- tenant_db
- ollama-model-init
build: .
container_name: realtime-server
environment:
Expand Down
7 changes: 7 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ config :realtime, :extensions,
driver: Extensions.PostgresCdcRls,
supervisor: Extensions.PostgresCdcRls.Supervisor,
db_settings: Extensions.PostgresCdcRls.DbSettings
},
ai_agent: %{
type: :ai_agent,
key: "ai_agent",
driver: Extensions.AiAgent,
supervisor: Extensions.AiAgent.Supervisor,
db_settings: Extensions.AiAgent.DbSettings
}

config :esbuild,
Expand Down
31 changes: 31 additions & 0 deletions lib/extensions/ai_agent/adapter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Extensions.AiAgent.Adapter do
@moduledoc """
Behaviour for AI provider adapters.

An adapter receives resolved settings and a message history, makes a
streaming HTTP request to the provider, and sends `Extensions.AiAgent.Event`
structs to the caller process as `{:ai_event, event}` messages.

The caller is expected to be a `Extensions.AiAgent.Session` GenServer that
runs the adapter in a `Task` so it can handle cancellation via `Task.shutdown`.
"""

alias Extensions.AiAgent.Types.Event

@callback stream(settings :: map(), messages :: list(map()), caller :: pid()) ::
:ok | {:error, term()}

@spec emit(pid(), Event.t()) :: :ok
def emit(caller, %Event{} = event) do
send(caller, {:ai_event, event})
:ok
end

@spec maybe_put(map(), String.t(), term()) :: map()
def maybe_put(map, _key, nil), do: map
def maybe_put(map, key, value), do: Map.put(map, key, value)

@spec maybe_prepend(list(), String.t(), term()) :: list()
def maybe_prepend(list, _item, nil), do: list
def maybe_prepend(list, item, value), do: [{item, value} | list]
end
118 changes: 118 additions & 0 deletions lib/extensions/ai_agent/adapter/anthropic_messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
defmodule Extensions.AiAgent.Adapter.AnthropicMessages do
@moduledoc """
Adapter for the Anthropic `/v1/messages` SSE protocol.
"""

@behaviour Extensions.AiAgent.Adapter

alias Extensions.AiAgent.Adapter
alias Extensions.AiAgent.Adapter.SSEStream
alias Extensions.AiAgent.Types.Event
alias Extensions.AiAgent.Types.ToolCallBuffer

@default_max_tokens 4096
@default_anthropic_version "2023-06-01"
@default_anthropic_beta "interleaved-thinking-2025-05-14"

@impl true
def stream(settings, messages, caller) do
url = settings["base_url"] <> "/v1/messages"
request = Finch.build(:post, url, headers(settings), Jason.encode!(build_body(settings, messages)))
SSEStream.run(request, &process_event/3, caller)
end

defp process_event(%{"type" => "content_block_delta"} = message, buffer, caller) do
%{"index" => idx, "delta" => delta} = message

case delta do
%{"type" => "text_delta", "text" => text} ->
Adapter.emit(caller, %Event{type: :text_delta, payload: %{delta: text}})
buffer

%{"type" => "thinking_delta", "thinking" => text} ->
Adapter.emit(caller, %Event{type: :thinking_delta, payload: %{delta: text}})
buffer

%{"type" => "input_json_delta", "partial_json" => partial} ->
ToolCallBuffer.append_args(buffer, idx, partial, caller)

_ ->
buffer
end
end

defp process_event(
%{"type" => "content_block_start", "content_block" => %{"type" => "tool_use"}} = message,
buffer,
_caller
) do
%{"index" => idx, "content_block" => block} = message
ToolCallBuffer.start(buffer, idx, block["id"], block["name"])
end

defp process_event(%{"type" => "content_block_stop"} = message, buffer, caller) do
%{"index" => idx} = message
ToolCallBuffer.finish(buffer, idx, caller)
end

defp process_event(%{"type" => "message_delta"} = message, buffer, caller) do
%{"delta" => delta, "usage" => usage} = message
Adapter.emit(caller, %Event{type: :usage, payload: %{output_tokens: usage["output_tokens"]}})

if stop_reason = delta["stop_reason"] do
Adapter.emit(caller, %Event{type: :done, payload: %{stop_reason: stop_reason}})
end

buffer
end

defp process_event(%{"type" => "message_start"} = message, buffer, caller) do
%{"message" => %{"usage" => usage}} = message
Adapter.emit(caller, %Event{type: :usage, payload: %{input_tokens: usage["input_tokens"]}})
buffer
end

defp process_event(%{"type" => "error"} = message, buffer, caller) do
%{"error" => error} = message
Adapter.emit(caller, %Event{type: :error, payload: %{reason: error["message"]}})
buffer
end

defp process_event(event, buffer, _caller) do
require Logger
Logger.debug("AnthropicUnknownEvent type=#{inspect(event["type"])}")
buffer
end

defp build_body(settings, messages) do
%{
"model" => settings["model"],
"messages" => Enum.reject(messages, &(&1["role"] == "system")),
"max_tokens" => settings["max_tokens"] || @default_max_tokens,
"stream" => true
}
|> Adapter.maybe_put("system", settings["system_prompt"])
|> Adapter.maybe_put("tools", anthropic_tools(settings["tools"]))
end

defp anthropic_tools(nil), do: nil

defp anthropic_tools(tools) when is_list(tools) do
Enum.map(tools, fn tool ->
%{
"name" => tool["function"]["name"],
"description" => tool["function"]["description"],
"input_schema" => tool["function"]["parameters"]
}
end)
end

defp headers(settings) do
[
{"content-type", "application/json"},
{"x-api-key", settings["api_key"]},
{"anthropic-version", settings["anthropic_version"] || @default_anthropic_version},
{"anthropic-beta", settings["anthropic_beta"] || @default_anthropic_beta}
]
end
end
Loading
Loading