Skip to content

Commit d6572a0

Browse files
intel352claude
andcommitted
feat: ACP client provider for orchestrating ACP-compliant agents
Add acpProvider implementing provider.Provider that launches any ACP agent binary over stdio JSON-RPC. Handles Initialize, NewSession, Prompt with session update collection, auto-approve permissions, and file operations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 22e5dca commit d6572a0

File tree

4 files changed

+361
-0
lines changed

4 files changed

+361
-0
lines changed

genkit/acp_provider.go

Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
package genkit
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"os"
8+
"os/exec"
9+
"strings"
10+
"sync"
11+
"time"
12+
13+
acpsdk "github.com/coder/acp-go-sdk"
14+
15+
"github.com/GoCodeAlone/workflow-plugin-agent/provider"
16+
)
17+
18+
// acpProvider implements provider.Provider by driving an ACP-compliant agent
19+
// via stdio JSON-RPC. It launches the agent binary, connects as an ACP client,
20+
// and translates between the provider interface and ACP protocol.
21+
type acpProvider struct {
22+
name string
23+
binPath string
24+
args []string // extra args passed to the agent binary
25+
workDir string
26+
timeout time.Duration
27+
authInfo provider.AuthModeInfo
28+
29+
mu sync.Mutex
30+
cmd *exec.Cmd
31+
conn *acpsdk.ClientSideConnection
32+
client *acpClient
33+
stdin io.WriteCloser
34+
sessionID acpsdk.SessionId
35+
}
36+
37+
// acpClient implements acp.Client to receive session updates from the agent.
38+
type acpClient struct {
39+
mu sync.Mutex
40+
updates []acpsdk.SessionUpdate
41+
}
42+
43+
func (c *acpClient) ReadTextFile(_ context.Context, p acpsdk.ReadTextFileRequest) (acpsdk.ReadTextFileResponse, error) {
44+
b, err := os.ReadFile(p.Path)
45+
if err != nil {
46+
return acpsdk.ReadTextFileResponse{}, err
47+
}
48+
return acpsdk.ReadTextFileResponse{Content: string(b)}, nil
49+
}
50+
51+
func (c *acpClient) WriteTextFile(_ context.Context, p acpsdk.WriteTextFileRequest) (acpsdk.WriteTextFileResponse, error) {
52+
return acpsdk.WriteTextFileResponse{}, os.WriteFile(p.Path, []byte(p.Content), 0o644)
53+
}
54+
55+
func (c *acpClient) RequestPermission(_ context.Context, p acpsdk.RequestPermissionRequest) (acpsdk.RequestPermissionResponse, error) {
56+
// Auto-approve for agent orchestration.
57+
if len(p.Options) > 0 {
58+
return acpsdk.RequestPermissionResponse{
59+
Outcome: acpsdk.NewRequestPermissionOutcomeSelected(p.Options[0].OptionId),
60+
}, nil
61+
}
62+
return acpsdk.RequestPermissionResponse{
63+
Outcome: acpsdk.NewRequestPermissionOutcomeCancelled(),
64+
}, nil
65+
}
66+
67+
func (c *acpClient) SessionUpdate(_ context.Context, n acpsdk.SessionNotification) error {
68+
c.mu.Lock()
69+
c.updates = append(c.updates, n.Update)
70+
c.mu.Unlock()
71+
return nil
72+
}
73+
74+
func (c *acpClient) CreateTerminal(_ context.Context, _ acpsdk.CreateTerminalRequest) (acpsdk.CreateTerminalResponse, error) {
75+
return acpsdk.CreateTerminalResponse{TerminalId: "t-1"}, nil
76+
}
77+
78+
func (c *acpClient) KillTerminalCommand(_ context.Context, _ acpsdk.KillTerminalCommandRequest) (acpsdk.KillTerminalCommandResponse, error) {
79+
return acpsdk.KillTerminalCommandResponse{}, nil
80+
}
81+
82+
func (c *acpClient) TerminalOutput(_ context.Context, _ acpsdk.TerminalOutputRequest) (acpsdk.TerminalOutputResponse, error) {
83+
return acpsdk.TerminalOutputResponse{Output: "", Truncated: false}, nil
84+
}
85+
86+
func (c *acpClient) ReleaseTerminal(_ context.Context, _ acpsdk.ReleaseTerminalRequest) (acpsdk.ReleaseTerminalResponse, error) {
87+
return acpsdk.ReleaseTerminalResponse{}, nil
88+
}
89+
90+
func (c *acpClient) WaitForTerminalExit(_ context.Context, _ acpsdk.WaitForTerminalExitRequest) (acpsdk.WaitForTerminalExitResponse, error) {
91+
return acpsdk.WaitForTerminalExitResponse{}, nil
92+
}
93+
94+
// NewACPProvider creates a provider that drives an ACP-compliant agent binary.
95+
func NewACPProvider(name, binPath string, args []string, workDir string) (provider.Provider, error) {
96+
if binPath == "" {
97+
return nil, fmt.Errorf("acp provider %s: binary path required", name)
98+
}
99+
if _, err := exec.LookPath(binPath); err != nil {
100+
return nil, fmt.Errorf("acp provider %s: binary not found: %w", name, err)
101+
}
102+
103+
return &acpProvider{
104+
name: name,
105+
binPath: binPath,
106+
args: args,
107+
workDir: workDir,
108+
timeout: 5 * time.Minute,
109+
authInfo: provider.AuthModeInfo{
110+
Mode: "none",
111+
DisplayName: "acp:" + name,
112+
},
113+
}, nil
114+
}
115+
116+
func (p *acpProvider) Name() string { return p.name }
117+
func (p *acpProvider) AuthModeInfo() provider.AuthModeInfo { return p.authInfo }
118+
119+
// ensureConnection starts the agent process and initializes the ACP connection.
120+
func (p *acpProvider) ensureConnection(ctx context.Context) (*acpsdk.ClientSideConnection, *acpClient, acpsdk.SessionId, error) {
121+
p.mu.Lock()
122+
defer p.mu.Unlock()
123+
124+
if p.conn != nil {
125+
// Check if process is still alive.
126+
select {
127+
case <-p.conn.Done():
128+
// Connection closed, restart.
129+
p.cleanup()
130+
default:
131+
// Clear any leftover updates from previous calls.
132+
p.client.mu.Lock()
133+
p.client.updates = nil
134+
p.client.mu.Unlock()
135+
return p.conn, p.client, p.sessionID, nil
136+
}
137+
}
138+
139+
cmd := exec.Command(p.binPath, p.args...) // Don't tie to request context; process is long-lived.
140+
if p.workDir != "" {
141+
cmd.Dir = p.workDir
142+
}
143+
cmd.Stderr = os.Stderr
144+
145+
stdin, err := cmd.StdinPipe()
146+
if err != nil {
147+
return nil, nil, "", fmt.Errorf("stdin pipe: %w", err)
148+
}
149+
stdout, err := cmd.StdoutPipe()
150+
if err != nil {
151+
return nil, nil, "", fmt.Errorf("stdout pipe: %w", err)
152+
}
153+
154+
if err := cmd.Start(); err != nil {
155+
return nil, nil, "", fmt.Errorf("start agent: %w", err)
156+
}
157+
158+
client := &acpClient{}
159+
conn := acpsdk.NewClientSideConnection(client, stdin, stdout)
160+
161+
// Initialize the protocol.
162+
_, err = conn.Initialize(ctx, acpsdk.InitializeRequest{
163+
ProtocolVersion: acpsdk.ProtocolVersionNumber,
164+
ClientCapabilities: acpsdk.ClientCapabilities{
165+
Fs: acpsdk.FileSystemCapability{
166+
ReadTextFile: true,
167+
WriteTextFile: true,
168+
},
169+
Terminal: true,
170+
},
171+
ClientInfo: &acpsdk.Implementation{
172+
Name: "ratchet-orchestrator",
173+
Version: "1.0.0",
174+
},
175+
})
176+
if err != nil {
177+
_ = cmd.Process.Kill()
178+
return nil, nil, "", fmt.Errorf("initialize: %w", err)
179+
}
180+
181+
// Create a session.
182+
cwd := p.workDir
183+
if cwd == "" {
184+
cwd, _ = os.Getwd()
185+
}
186+
sessResp, err := conn.NewSession(ctx, acpsdk.NewSessionRequest{
187+
Cwd: cwd,
188+
McpServers: []acpsdk.McpServer{},
189+
})
190+
if err != nil {
191+
_ = cmd.Process.Kill()
192+
return nil, nil, "", fmt.Errorf("new session: %w", err)
193+
}
194+
195+
p.cmd = cmd
196+
p.conn = conn
197+
p.client = client
198+
p.stdin = stdin
199+
p.sessionID = sessResp.SessionId
200+
201+
return conn, client, sessResp.SessionId, nil
202+
}
203+
204+
func (p *acpProvider) cleanup() {
205+
if p.stdin != nil {
206+
_ = p.stdin.Close()
207+
}
208+
if p.cmd != nil && p.cmd.Process != nil {
209+
_ = p.cmd.Process.Kill()
210+
_ = p.cmd.Wait()
211+
}
212+
p.cmd = nil
213+
p.conn = nil
214+
p.client = nil
215+
p.stdin = nil
216+
p.sessionID = ""
217+
}
218+
219+
// Chat sends a prompt to the ACP agent and collects the full response.
220+
func (p *acpProvider) Chat(ctx context.Context, messages []provider.Message, _ []provider.ToolDef) (*provider.Response, error) {
221+
conn, client, sessID, err := p.ensureConnection(ctx)
222+
if err != nil {
223+
return nil, fmt.Errorf("acp provider %s: %w", p.name, err)
224+
}
225+
226+
msg := flattenMessages(messages)
227+
prompt := []acpsdk.ContentBlock{acpsdk.TextBlock(msg)}
228+
229+
_, err = conn.Prompt(ctx, acpsdk.PromptRequest{
230+
SessionId: sessID,
231+
Prompt: prompt,
232+
})
233+
if err != nil {
234+
return nil, fmt.Errorf("acp provider %s: prompt: %w", p.name, err)
235+
}
236+
237+
// Collect text from session updates.
238+
var content strings.Builder
239+
client.mu.Lock()
240+
for _, u := range client.updates {
241+
if u.AgentMessageChunk != nil && u.AgentMessageChunk.Content.Text != nil {
242+
content.WriteString(u.AgentMessageChunk.Content.Text.Text)
243+
}
244+
}
245+
client.updates = nil
246+
client.mu.Unlock()
247+
248+
return &provider.Response{Content: content.String()}, nil
249+
}
250+
251+
// Stream sends a prompt and returns streaming events.
252+
func (p *acpProvider) Stream(ctx context.Context, messages []provider.Message, _ []provider.ToolDef) (<-chan provider.StreamEvent, error) {
253+
conn, client, sessID, err := p.ensureConnection(ctx)
254+
if err != nil {
255+
return nil, fmt.Errorf("acp provider %s: %w", p.name, err)
256+
}
257+
258+
ch := make(chan provider.StreamEvent, 32)
259+
msg := flattenMessages(messages)
260+
prompt := []acpsdk.ContentBlock{acpsdk.TextBlock(msg)}
261+
262+
go func() {
263+
defer close(ch)
264+
265+
_, err := conn.Prompt(ctx, acpsdk.PromptRequest{
266+
SessionId: sessID,
267+
Prompt: prompt,
268+
})
269+
270+
// Drain collected updates as stream events.
271+
client.mu.Lock()
272+
for _, u := range client.updates {
273+
switch {
274+
case u.AgentMessageChunk != nil && u.AgentMessageChunk.Content.Text != nil:
275+
ch <- provider.StreamEvent{Type: "text", Text: u.AgentMessageChunk.Content.Text.Text}
276+
case u.AgentThoughtChunk != nil && u.AgentThoughtChunk.Content.Text != nil:
277+
ch <- provider.StreamEvent{Type: "thinking", Thinking: u.AgentThoughtChunk.Content.Text.Text}
278+
case u.ToolCall != nil:
279+
ch <- provider.StreamEvent{
280+
Type: "tool_call",
281+
Tool: &provider.ToolCall{
282+
ID: string(u.ToolCall.ToolCallId),
283+
Name: u.ToolCall.Title,
284+
},
285+
}
286+
}
287+
}
288+
client.updates = nil
289+
client.mu.Unlock()
290+
291+
if err != nil {
292+
ch <- provider.StreamEvent{Type: "error", Error: err.Error()}
293+
return
294+
}
295+
ch <- provider.StreamEvent{Type: "done"}
296+
}()
297+
298+
return ch, nil
299+
}
300+
301+
// Close terminates the agent process.
302+
func (p *acpProvider) Close() error {
303+
p.mu.Lock()
304+
defer p.mu.Unlock()
305+
p.cleanup()
306+
return nil
307+
}

genkit/acp_provider_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package genkit
2+
3+
import (
4+
"os"
5+
"testing"
6+
7+
"github.com/GoCodeAlone/workflow-plugin-agent/provider"
8+
)
9+
10+
func TestNewACPProviderMissingBinary(t *testing.T) {
11+
old := os.Getenv("PATH")
12+
_ = os.Setenv("PATH", t.TempDir())
13+
defer func() { _ = os.Setenv("PATH", old) }()
14+
15+
_, err := NewACPProvider("test-agent", "nonexistent-agent-binary", nil, "")
16+
if err == nil {
17+
t.Error("expected error for missing binary")
18+
}
19+
}
20+
21+
func TestNewACPProviderEmptyPath(t *testing.T) {
22+
_, err := NewACPProvider("test-agent", "", nil, "")
23+
if err == nil {
24+
t.Error("expected error for empty binary path")
25+
}
26+
}
27+
28+
func TestACPProviderName(t *testing.T) {
29+
// Use a binary that exists on PATH for the constructor, but we won't connect.
30+
p := &acpProvider{
31+
name: "test_acp",
32+
authInfo: provider.AuthModeInfo{
33+
Mode: "none",
34+
DisplayName: "acp:test_acp",
35+
},
36+
}
37+
if p.Name() != "test_acp" {
38+
t.Errorf("Name() = %q, want %q", p.Name(), "test_acp")
39+
}
40+
info := p.AuthModeInfo()
41+
if info.Mode != "none" {
42+
t.Errorf("AuthModeInfo().Mode = %q, want %q", info.Mode, "none")
43+
}
44+
}
45+
46+
func TestACPProviderClose(t *testing.T) {
47+
p := &acpProvider{name: "test"}
48+
if err := p.Close(); err != nil {
49+
t.Errorf("Close() on idle provider: %v", err)
50+
}
51+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ require (
9797
github.com/cespare/xxhash/v2 v2.3.0 // indirect
9898
github.com/cloudevents/sdk-go/v2 v2.16.2 // indirect
9999
github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect
100+
github.com/coder/acp-go-sdk v0.6.3 // indirect
100101
github.com/containerd/errdefs v1.0.0 // indirect
101102
github.com/containerd/errdefs/pkg v0.3.0 // indirect
102103
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ github.com/cloudevents/sdk-go/v2 v2.16.2 h1:ZYDFrYke4FD+jM8TZTJJO6JhKHzOQl2oqpFK
190190
github.com/cloudevents/sdk-go/v2 v2.16.2/go.mod h1:laOcGImm4nVJEU+PHnUrKL56CKmRL65RlQF0kRmW/kg=
191191
github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 h1:6xNmx7iTtyBRev0+D/Tv1FZd4SCg8axKApyNyRsAt/w=
192192
github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5/go.mod h1:KdCmV+x/BuvyMxRnYBlmVaq4OLiKW6iRQfvC62cvdkI=
193+
github.com/coder/acp-go-sdk v0.6.3 h1:LsXQytehdjKIYJnoVWON/nf7mqbiarnyuyE3rrjBsXQ=
194+
github.com/coder/acp-go-sdk v0.6.3/go.mod h1:yKzM/3R9uELp4+nBAwwtkS0aN1FOFjo11CNPy37yFko=
193195
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
194196
github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M=
195197
github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE=

0 commit comments

Comments
 (0)