diff --git a/agent/agent.go b/agent/agent.go index e1a6d90..14fd220 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -98,6 +98,9 @@ type Options struct { // tool is still registered but returns "[no human available]" instead of // blocking (e.g. standalone squadron with no commander attached). HumanBridge aitools.HumanInputBridge + // GatewayBridge powers the `builtins.gateway.post` tool. When nil, the + // tool is still registered but returns "[no gateway configured]". + GatewayBridge aitools.GatewayBridge } // New creates a new agent from config @@ -155,7 +158,7 @@ func New(ctx context.Context, opts Options) (*Agent, error) { // Build tools map and add sanitized aliases so LLM tool calls // (which use API-safe names like "plugins_shell_echo") resolve correctly - tools := config.BuildToolsMap(agentCfg.Tools, cfg.CustomTools, cfg.LoadedPlugins, cfg.LoadedMCPClients, opts.DatasetStore, opts.HumanBridge) + tools := config.BuildToolsMap(agentCfg.Tools, cfg.CustomTools, cfg.LoadedPlugins, cfg.LoadedMCPClients, opts.DatasetStore, opts.HumanBridge, opts.GatewayBridge) aitools.AddSanitizedAliases(tools) // Create result store and interceptor for large results @@ -186,6 +189,12 @@ func New(ctx context.Context, opts Options) (*Agent, error) { tools["file_delete"] = &aitools.MemoryDeleteTool{Store: opts.MemoryStore} tools["file_search"] = &aitools.MemorySearchTool{Store: opts.MemoryStore} tools["file_grep"] = &aitools.MemoryGrepTool{Store: opts.MemoryStore} + // The gateway post tool resolves attachments from the same store. + for _, tool := range tools { + if gp, ok := tool.(*aitools.GatewayPostTool); ok { + gp.Store = opts.MemoryStore + } + } } // Resolve skills and add load_skill tool @@ -197,7 +206,7 @@ func New(ctx context.Context, opts Options) (*Agent, error) { AvailableSkills: availableSkills, AgentTools: tools, ToolBuilder: func(toolRefs []string) map[string]aitools.Tool { - t := config.BuildToolsMap(toolRefs, cfg.CustomTools, cfg.LoadedPlugins, cfg.LoadedMCPClients, opts.DatasetStore, opts.HumanBridge) + t := config.BuildToolsMap(toolRefs, cfg.CustomTools, cfg.LoadedPlugins, cfg.LoadedMCPClients, opts.DatasetStore, opts.HumanBridge, opts.GatewayBridge) aitools.AddSanitizedAliases(t) return t }, diff --git a/agent/agent_manager.go b/agent/agent_manager.go index aeb4a70..bc3e783 100644 --- a/agent/agent_manager.go +++ b/agent/agent_manager.go @@ -46,6 +46,7 @@ type AgentManager struct { provider llm.Provider // optional injected provider for agents budget BudgetChecker humanBridge aitools.HumanInputBridge // bridge for builtins.human.ask on spawned agents + gatewayBridge aitools.GatewayBridge // bridge for builtins.gateway.post on spawned agents } // AgentManagerConfig holds the dependencies needed to create an AgentManager. @@ -70,6 +71,8 @@ type AgentManagerConfig struct { Budget BudgetChecker // HumanBridge — nil disables builtins.human.ask on spawned agents. HumanBridge aitools.HumanInputBridge + // GatewayBridge — nil disables builtins.gateway.post on spawned agents. + GatewayBridge aitools.GatewayBridge } // NewAgentManager creates a new AgentManager. @@ -95,6 +98,7 @@ func NewAgentManager(cfg AgentManagerConfig) *AgentManager { provider: cfg.Provider, budget: cfg.Budget, humanBridge: cfg.HumanBridge, + gatewayBridge: cfg.GatewayBridge, } } @@ -284,6 +288,7 @@ func (m *AgentManager) createAgent(ctx context.Context, agentCfg *config.Agent) PricingOverrides: m.pricingOverrides, Budget: m.budget, HumanBridge: m.humanBridge, + GatewayBridge: m.gatewayBridge, }) } diff --git a/agent/commander.go b/agent/commander.go index 8119848..6027e58 100644 --- a/agent/commander.go +++ b/agent/commander.go @@ -106,6 +106,9 @@ type CommanderOptions struct { // spawns. Nil disables HITL — the tool then returns // "[no human available]" instead of blocking. HumanBridge aitools.HumanInputBridge + // GatewayBridge powers builtins.gateway.post on agents this commander + // spawns. Nil → the tool returns "[no gateway configured]". + GatewayBridge aitools.GatewayBridge } // DependencyOutputSchema describes a completed dependency task's output schema @@ -348,6 +351,7 @@ type Commander struct { pruneTo int // Prune down to this many turns budget BudgetChecker // Optional token/dollar budget enforcer humanBridge aitools.HumanInputBridge // Optional bridge for builtins.human.ask + gatewayBridge aitools.GatewayBridge // Optional bridge for builtins.gateway.post } // NewCommander creates a new commander for a mission task @@ -473,6 +477,7 @@ func NewCommander(ctx context.Context, opts CommanderOptions) (*Commander, error pricingOverrides: opts.PricingOverrides, budget: opts.Budget, humanBridge: opts.HumanBridge, + gatewayBridge: opts.GatewayBridge, } // Add result tools to commander's tool map @@ -491,6 +496,11 @@ func NewCommander(ctx context.Context, opts CommanderOptions) (*Commander, error sup.tools["file_delete"] = &aitools.MemoryDeleteTool{Store: opts.MemoryStore} sup.tools["file_search"] = &aitools.MemorySearchTool{Store: opts.MemoryStore} sup.tools["file_grep"] = &aitools.MemoryGrepTool{Store: opts.MemoryStore} + for _, tool := range sup.tools { + if gp, ok := tool.(*aitools.GatewayPostTool); ok { + gp.Store = opts.MemoryStore + } + } if memoryPrompt := prompts.FormatMemoryContext(opts.MemoryStore); memoryPrompt != "" { session.AddSystemPrompt(memoryPrompt) } @@ -737,6 +747,7 @@ func (s *Commander) SetToolCallbacks(callbacks *CommanderToolCallbacks, depSumma Provider: s.provider, Budget: s.budget, HumanBridge: s.humanBridge, + GatewayBridge: s.gatewayBridge, }) } diff --git a/aitools/gateway_post.go b/aitools/gateway_post.go new file mode 100644 index 0000000..0ab1332 --- /dev/null +++ b/aitools/gateway_post.go @@ -0,0 +1,248 @@ +package aitools + +import ( + "context" + "encoding/json" + "fmt" + "mime" + "net/http" + "os" + "path/filepath" + "strings" +) + +// GatewayBridge posts a message through the configured gateway subprocess +// (Discord, Slack, …) and advertises how that gateway wants messages shaped. +// Pass nil to build a tool that reports the gateway is unavailable instead of +// posting — the tool is always registered. +type GatewayBridge interface { + // PostMessage forwards the raw, gateway-schema-shaped JSON the agent + // produced (text + rich layout) plus any squadron-resolved file + // attachments. The gateway parses the payload and uploads the attachment + // bytes directly. + PostMessage(ctx context.Context, payload string, attachments []GatewayAttachment) error + // MessageToolDescription is the gateway-supplied tool description (how to + // format messages for this gateway). Empty → squadron's default. + MessageToolDescription() string + // MessageToolSchema is the gateway-supplied JSON Schema for the tool's + // params. Empty → squadron's default { message, channel } shape. + MessageToolSchema() string +} + +// GatewayAttachment is a squadron-local file resolved from the mission's +// memory/scratchpad/packet storage, shipped to the gateway as raw bytes. +type GatewayAttachment struct { + Filename string + MimeType string + Content []byte +} + +// Attachments are sourced from squadron-local files only (never a URL the +// model picks), so there is no SSRF surface. Caps keep a single post within +// the gateway gRPC channel's message-size budget. +const ( + maxAttachmentBytes = 25 << 20 // 25 MiB per file + maxTotalAttachmentBytes = 30 << 20 // 30 MiB per post +) + +// GatewayPostTool backs builtins.gateway.post. The gateway owns the message +// contract (text + rich layout): its description and JSON Schema are surfaced +// to the LLM and the params are forwarded verbatim. Squadron owns the +// `attachments` field — it resolves each {slot, path} reference against the +// mission's MemoryStore and ships the bytes to the gateway. +type GatewayPostTool struct { + Bridge GatewayBridge + Store MemoryStore +} + +func (t *GatewayPostTool) ToolName() string { return "post" } + +const defaultGatewayPostDescription = "Post a message to the configured gateway's external system (Discord, Slack, etc.). " + + "If no gateway is configured, returns \"[no gateway configured]\" so you can proceed without failing." + +const attachmentsDescriptionSuffix = "To attach files, set `attachments` to a list of {\"slot\":..., \"path\":...} objects " + + "referencing squadron's own memory/scratchpad/packet storage (NOT URLs) — squadron reads each file and uploads it." + +const defaultGatewayPostSchema = `{ + "type": "object", + "properties": { + "message": {"type": "string", "description": "The message text to post."}, + "channel": {"type": "string", "description": "Optional channel name or id override."} + }, + "required": ["message"] +}` + +const attachmentsSchemaProperty = `{ + "type": "array", + "description": "Optional files to attach, sourced from squadron's own memory/scratchpad/packet storage (NOT URLs). Each item references a local file by slot and path.", + "items": { + "type": "object", + "properties": { + "slot": {"type": "string", "description": "Slot: \"memory\", \"scratchpad\", a shared-memory name, or \"packet.\"."}, + "path": {"type": "string", "description": "Relative path within the slot, e.g. \"report.pdf\"."} + }, + "required": ["slot", "path"] + } +}` + +type gatewayAttachmentRef struct { + Slot string `json:"slot"` + Path string `json:"path"` +} + +func (t *GatewayPostTool) ToolDescription() string { + base := defaultGatewayPostDescription + if t.Bridge != nil { + if d := t.Bridge.MessageToolDescription(); d != "" { + base = d + } + } + if t.Store != nil { + return base + " " + attachmentsDescriptionSuffix + } + return base +} + +func (t *GatewayPostTool) ToolPayloadSchema() Schema { + raw := "" + if t.Bridge != nil { + raw = t.Bridge.MessageToolSchema() + } + if strings.TrimSpace(raw) == "" { + raw = defaultGatewayPostSchema + } + // The gateway owns text + rich layout; squadron owns attachments (local + // files), so inject that field only when a memory store is available. + if t.Store != nil { + raw = injectAttachmentsProperty(raw) + } + return Schema{Type: TypeObject, Properties: PropertyMap{}}.WithRawJSONSchema(json.RawMessage(raw)) +} + +// injectAttachmentsProperty adds squadron's `attachments` property to the +// gateway-owned schema. Best-effort: if the schema can't be parsed or already +// defines `attachments`, it is returned unchanged. +func injectAttachmentsProperty(schema string) string { + var root map[string]json.RawMessage + if err := json.Unmarshal([]byte(schema), &root); err != nil { + return schema + } + props := map[string]json.RawMessage{} + if rawProps, ok := root["properties"]; ok { + if err := json.Unmarshal(rawProps, &props); err != nil { + return schema + } + } + if _, exists := props["attachments"]; exists { + return schema + } + props["attachments"] = json.RawMessage(attachmentsSchemaProperty) + newProps, err := json.Marshal(props) + if err != nil { + return schema + } + root["properties"] = newProps + out, err := json.Marshal(root) + if err != nil { + return schema + } + return string(out) +} + +// NoGatewayObservation is what Call returns when no gateway bridge is wired. +const NoGatewayObservation = "[no gateway configured]" + +func (t *GatewayPostTool) Call(ctx context.Context, params string) string { + if t.Bridge == nil { + return NoGatewayObservation + } + if s := strings.TrimSpace(params); s == "" || s == "{}" { + return "Error: empty message payload" + } + if !json.Valid([]byte(params)) { + return "Error: invalid JSON parameters" + } + + payload := params + var attachments []GatewayAttachment + + // Pull `attachments` out of the payload and resolve it against the mission's + // local file storage; the gateway never sees the references, only bytes. + var root map[string]json.RawMessage + if err := json.Unmarshal([]byte(params), &root); err != nil { + return "Error: invalid JSON parameters" + } + if rawAtt, ok := root["attachments"]; ok { + delete(root, "attachments") + var refs []gatewayAttachmentRef + if err := json.Unmarshal(rawAtt, &refs); err != nil { + return "Error: attachments must be an array of {slot, path} objects" + } + resolved, errMsg := t.resolveAttachments(refs) + if errMsg != "" { + return "Error: " + errMsg + } + attachments = resolved + rest, err := json.Marshal(root) + if err != nil { + return "Error: " + err.Error() + } + payload = string(rest) + } + + if err := t.Bridge.PostMessage(ctx, payload, attachments); err != nil { + return "Error: " + err.Error() + } + return "Message posted to the gateway." +} + +func (t *GatewayPostTool) resolveAttachments(refs []gatewayAttachmentRef) ([]GatewayAttachment, string) { + if len(refs) == 0 { + return nil, "" + } + if t.Store == nil { + return nil, "attachments are not available for this mission (no memory or scratchpad configured)" + } + var out []GatewayAttachment + var total int + for _, r := range refs { + if strings.TrimSpace(r.Slot) == "" || strings.TrimSpace(r.Path) == "" { + return nil, "each attachment needs a non-empty slot and path" + } + abs, err := resolveSlotPath(t.Store, r.Slot, r.Path) + if err != nil { + return nil, fmt.Sprintf("attachment %s/%s: %v", r.Slot, r.Path, err) + } + info, err := os.Stat(abs) + if err != nil { + return nil, fmt.Sprintf("attachment %s/%s: %v", r.Slot, r.Path, err) + } + if info.IsDir() { + return nil, fmt.Sprintf("attachment %s/%s: is a directory, not a file", r.Slot, r.Path) + } + if info.Size() > maxAttachmentBytes { + return nil, fmt.Sprintf("attachment %s/%s: %d bytes exceeds the %d-byte per-file limit", r.Slot, r.Path, info.Size(), maxAttachmentBytes) + } + data, err := os.ReadFile(abs) + if err != nil { + return nil, fmt.Sprintf("attachment %s/%s: %v", r.Slot, r.Path, err) + } + total += len(data) + if total > maxTotalAttachmentBytes { + return nil, fmt.Sprintf("total attachment size exceeds the %d-byte per-post limit", maxTotalAttachmentBytes) + } + out = append(out, GatewayAttachment{ + Filename: filepath.Base(r.Path), + MimeType: detectAttachmentMime(r.Path, data), + Content: data, + }) + } + return out, "" +} + +func detectAttachmentMime(name string, data []byte) string { + if ct := mime.TypeByExtension(filepath.Ext(name)); ct != "" { + return ct + } + return http.DetectContentType(data) +} diff --git a/aitools/gateway_post_test.go b/aitools/gateway_post_test.go new file mode 100644 index 0000000..4883d2d --- /dev/null +++ b/aitools/gateway_post_test.go @@ -0,0 +1,120 @@ +package aitools_test + +import ( + "context" + "errors" + "os" + "path/filepath" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "squadron/aitools" +) + +type fakeGatewayBridge struct { + payload string + attachments []aitools.GatewayAttachment + err error + desc string + schema string +} + +func (f *fakeGatewayBridge) PostMessage(_ context.Context, payload string, attachments []aitools.GatewayAttachment) error { + f.payload = payload + f.attachments = attachments + return f.err +} +func (f *fakeGatewayBridge) MessageToolDescription() string { return f.desc } +func (f *fakeGatewayBridge) MessageToolSchema() string { return f.schema } + +// fakeMemStore resolves any slot to a fixed root dir for attachment tests. +type fakeMemStore struct{ root string } + +func (s fakeMemStore) ResolvePath(_ string, relPath string) (string, error) { + return filepath.Join(s.root, relPath), nil +} +func (s fakeMemStore) MemoryInfos() []aitools.MemoryInfo { return nil } + +var _ = Describe("GatewayPostTool", func() { + It("returns the no-gateway observation when no bridge is wired", func() { + t := &aitools.GatewayPostTool{Bridge: nil} + Expect(t.Call(context.Background(), `{"message":"hi"}`)).To(Equal(aitools.NoGatewayObservation)) + }) + + It("rejects an empty payload", func() { + t := &aitools.GatewayPostTool{Bridge: &fakeGatewayBridge{}} + Expect(t.Call(context.Background(), `{}`)).To(ContainSubstring("empty message payload")) + }) + + It("forwards the raw payload to the gateway verbatim", func() { + b := &fakeGatewayBridge{} + t := &aitools.GatewayPostTool{Bridge: b} + payload := `{"text":"deploy done","channel":"#ops","embeds":[{"title":"v2"}]}` + out := t.Call(context.Background(), payload) + Expect(out).To(ContainSubstring("posted")) + Expect(b.payload).To(Equal(payload)) + }) + + It("surfaces a bridge error to the agent", func() { + t := &aitools.GatewayPostTool{Bridge: &fakeGatewayBridge{err: errors.New("no gateway is currently running")}} + Expect(t.Call(context.Background(), `{"text":"hi"}`)).To(ContainSubstring("no gateway is currently running")) + }) + + It("advertises the gateway-supplied description and schema", func() { + b := &fakeGatewayBridge{ + desc: "Post to Discord. text supports markdown.", + schema: `{"type":"object","properties":{"text":{"type":"string"}},"required":["text"]}`, + } + t := &aitools.GatewayPostTool{Bridge: b} + Expect(t.ToolDescription()).To(Equal(b.desc)) + raw := t.ToolPayloadSchema().ToJSONSchema() + Expect(string(raw)).To(ContainSubstring(`"text"`)) + }) + + It("falls back to a default schema when the gateway provides none", func() { + t := &aitools.GatewayPostTool{Bridge: &fakeGatewayBridge{}} + Expect(string(t.ToolPayloadSchema().ToJSONSchema())).To(ContainSubstring(`"message"`)) + }) + + It("advertises attachments only when a memory store is wired", func() { + gw := &fakeGatewayBridge{schema: `{"type":"object","properties":{"text":{"type":"string"}},"required":["text"]}`} + without := &aitools.GatewayPostTool{Bridge: gw} + Expect(string(without.ToolPayloadSchema().ToJSONSchema())).NotTo(ContainSubstring("attachments")) + Expect(without.ToolDescription()).NotTo(ContainSubstring("attachments")) + + with := &aitools.GatewayPostTool{Bridge: gw, Store: fakeMemStore{root: "/tmp"}} + Expect(string(with.ToolPayloadSchema().ToJSONSchema())).To(ContainSubstring("attachments")) + Expect(with.ToolDescription()).To(ContainSubstring("attachments")) + }) + + It("resolves local-file attachments to bytes and strips them from the payload", func() { + dir := GinkgoT().TempDir() + Expect(os.WriteFile(filepath.Join(dir, "report.txt"), []byte("hello report"), 0o644)).To(Succeed()) + + b := &fakeGatewayBridge{} + t := &aitools.GatewayPostTool{Bridge: b, Store: fakeMemStore{root: dir}} + out := t.Call(context.Background(), + `{"text":"see attached","attachments":[{"slot":"scratchpad","path":"report.txt"}]}`) + + Expect(out).To(ContainSubstring("posted")) + Expect(b.attachments).To(HaveLen(1)) + Expect(b.attachments[0].Filename).To(Equal("report.txt")) + Expect(string(b.attachments[0].Content)).To(Equal("hello report")) + // the gateway must not see the attachments reference, only text + Expect(b.payload).To(ContainSubstring(`"text":"see attached"`)) + Expect(b.payload).NotTo(ContainSubstring("attachments")) + }) + + It("errors when attachments are requested but no store is available", func() { + t := &aitools.GatewayPostTool{Bridge: &fakeGatewayBridge{}} + out := t.Call(context.Background(), `{"text":"hi","attachments":[{"slot":"memory","path":"x.txt"}]}`) + Expect(out).To(ContainSubstring("not available")) + }) + + It("errors when an attachment file does not exist", func() { + t := &aitools.GatewayPostTool{Bridge: &fakeGatewayBridge{}, Store: fakeMemStore{root: GinkgoT().TempDir()}} + out := t.Call(context.Background(), `{"text":"hi","attachments":[{"slot":"memory","path":"missing.txt"}]}`) + Expect(out).To(ContainSubstring("Error:")) + }) +}) diff --git a/cmd/engage.go b/cmd/engage.go index 01cd131..e3d65dd 100644 --- a/cmd/engage.go +++ b/cmd/engage.go @@ -29,6 +29,7 @@ import ( squadronmcp "squadron/mcp" "squadron/mcphost" "squadron/mission" + "squadron/notification" "squadron/scheduler" "squadron/store" "squadron/wsbridge" @@ -390,6 +391,18 @@ func runEngage(cmd *cobra.Command, args []string) { } }() + // Mission-lifecycle notification dispatcher. The gateway channel is + // wired only when a gateway is configured; the command-center channel + // no-ops when no command center is connected. + var gatewaySink notification.Sink + if gatewayMgr != nil { + gatewaySink = gateway.NewNotifySink(gatewayMgr) + // The Manager satisfies aitools.GatewayBridge directly; only wire it + // when a gateway exists so the tool sees a nil bridge otherwise. + client.SetGatewayBridge(gatewayMgr) + } + client.SetNotifier(notification.NewDispatcher(gatewaySink, wsbridge.NewNotifySink(client))) + sched := scheduler.New(client.RunScheduledMission) client.SetConcurrencyTracker(sched) if cfgErr == nil { diff --git a/config/agent.go b/config/agent.go index 3933bb0..e0e779c 100644 --- a/config/agent.go +++ b/config/agent.go @@ -19,7 +19,7 @@ const ( // ReservedBuiltinNamespaces are names reserved for built-in tools (cannot be // used as plugin or mcp server names). "mcp" itself is reserved so that a // `plugin "mcp" { ... }` can't shadow the consumer-side namespace. -var ReservedBuiltinNamespaces = []string{"http", "dataset", "utils", "human", "mcp"} +var ReservedBuiltinNamespaces = []string{"http", "dataset", "utils", "human", "gateway", "mcp"} // BuiltinTools maps built-in namespaces to their tools. // These are accessed as builtins.http.get, builtins.http.get, etc. @@ -28,6 +28,7 @@ var BuiltinTools = map[string][]string{ "dataset": {"set", "sample", "count"}, "utils": {"sleep", "current_time"}, "human": {"ask"}, + "gateway": {"post"}, } // InternalTools is the list of available internal tools (legacy format for backwards compatibility) diff --git a/config/agent_test.go b/config/agent_test.go index 31d5654..af502bf 100644 --- a/config/agent_test.go +++ b/config/agent_test.go @@ -15,7 +15,7 @@ var _ = Describe("Agent", func() { Expect(config.IsBuiltinTool("builtins.utils.current_time")).To(BeTrue()) Expect(config.BuiltinTools["utils"]).To(ContainElement("current_time")) - tool := config.GetBuiltinTool("builtins.utils.current_time", nil, nil) + tool := config.GetBuiltinTool("builtins.utils.current_time", nil, nil, nil) Expect(tool).NotTo(BeNil()) Expect(tool).To(BeAssignableToTypeOf(&aitools.CurrentTimeTool{})) Expect(tool.ToolName()).To(Equal("current_time")) @@ -36,7 +36,7 @@ agent "clock" { Expect(cfg.Agents).To(HaveLen(1)) Expect(cfg.Agents[0].Tools).To(ConsistOf("builtins.utils.current_time")) - tools := config.BuildToolsMap(cfg.Agents[0].Tools, nil, nil, nil, nil, nil) + tools := config.BuildToolsMap(cfg.Agents[0].Tools, nil, nil, nil, nil, nil, nil) Expect(tools).To(HaveKey("builtins.utils.current_time")) Expect(tools["builtins.utils.current_time"]).To(BeAssignableToTypeOf(&aitools.CurrentTimeTool{})) }) diff --git a/config/config.go b/config/config.go index 1f86de9..9c4789b 100644 --- a/config/config.go +++ b/config/config.go @@ -548,6 +548,11 @@ func (c *Config) Validate() error { // Add built-in tools (builtins.http.get, builtins.http.get, etc.) for namespace, tools := range BuiltinTools { + // The gateway namespace (builtins.gateway.post) only exists when a + // gateway is configured. + if namespace == "gateway" && c.Gateway == nil { + continue + } for _, toolName := range tools { validToolRefs[fmt.Sprintf("builtins.%s.%s", namespace, toolName)] = true } @@ -699,6 +704,15 @@ func (c *Config) Validate() error { } } + // Cross-block: a mission routing notifications to the gateway requires a + // configured top-level gateway block. (The command_center channel has no + // such check — it no-ops when no command center is present at runtime.) + for _, m := range c.Missions { + if m.Notification != nil && m.Notification.Gateway != nil && c.Gateway == nil { + return fmt.Errorf("mission '%s': notification routes to gateway but no gateway block is configured", m.Name) + } + } + // Validate webhook path uniqueness across all missions webhookPaths := make(map[string]string) // path → mission name for _, m := range c.Missions { @@ -2043,6 +2057,7 @@ func parseMissionBlock(block *hcl.Block, ctx *hcl.EvalContext) (*Mission, error) {Type: "schedule"}, {Type: "trigger"}, {Type: "budget"}, + {Type: "notification"}, // Detected so we can produce a nicer error than the parser's default. {Type: "folder"}, {Type: "run_folder"}, @@ -2309,6 +2324,22 @@ func parseMissionBlock(block *hcl.Block, ctx *hcl.EvalContext) (*Mission, error) missionBudget = b } + // Parse notification block (optional, singleton) + var missionNotification *NotificationConfig + for _, notifBlock := range missionContent.Blocks { + if notifBlock.Type != "notification" { + continue + } + if missionNotification != nil { + return nil, fmt.Errorf("mission '%s': only one notification block allowed", missionName) + } + n, err := parseNotificationBlock(notifBlock, ctx) + if err != nil { + return nil, fmt.Errorf("mission '%s' notification: %w", missionName, err) + } + missionNotification = n + } + // Parse max_parallel attribute (optional, default 3) maxParallel := 3 if attr, ok := missionContent.Attributes["max_parallel"]; ok { @@ -2335,6 +2366,7 @@ func parseMissionBlock(block *hcl.Block, ctx *hcl.EvalContext) (*Mission, error) Trigger: trigger, MaxParallel: maxParallel, Budget: missionBudget, + Notification: missionNotification, } // Parse inputs — accept either shorthand attribute or verbose labeled block form. @@ -2941,6 +2973,87 @@ func parseTaskBlock(block *hcl.Block, ctx *hcl.EvalContext) (*Task, error) { }, nil } +// parseNotificationBlock parses a mission `notification { gateway { ... } +// command_center { ... } }` block. Each channel sub-block is a singleton. +func parseNotificationBlock(block *hcl.Block, ctx *hcl.EvalContext) (*NotificationConfig, error) { + content, _, diags := block.Body.PartialContent(&hcl.BodySchema{ + Blocks: []hcl.BlockHeaderSchema{ + {Type: "gateway"}, + {Type: "command_center"}, + }, + }) + if diags.HasErrors() { + return nil, diags + } + + n := &NotificationConfig{} + for _, sub := range content.Blocks { + ch, err := parseNotificationChannel(sub.Body, ctx) + if err != nil { + return nil, fmt.Errorf("%s: %w", sub.Type, err) + } + switch sub.Type { + case "gateway": + if n.Gateway != nil { + return nil, fmt.Errorf("only one gateway channel allowed") + } + n.Gateway = ch + case "command_center": + if n.CommandCenter != nil { + return nil, fmt.Errorf("only one command_center channel allowed") + } + n.CommandCenter = ch + } + } + + if err := n.Validate(); err != nil { + return nil, err + } + return n, nil +} + +// parseNotificationChannel decodes a single channel sub-block. `enabled` +// defaults to true when the block is present. +func parseNotificationChannel(body hcl.Body, ctx *hcl.EvalContext) (*NotificationChannel, error) { + content, _, diags := body.PartialContent(&hcl.BodySchema{ + Attributes: []hcl.AttributeSchema{ + {Name: "enabled"}, + {Name: "events"}, + {Name: "channel"}, + }, + }) + if diags.HasErrors() { + return nil, diags + } + + // Enabled defaults to true when the block is present. + ch := &NotificationChannel{Enabled: true} + if attr, ok := content.Attributes["enabled"]; ok { + val, diags := attr.Expr.Value(ctx) + if diags.HasErrors() { + return nil, fmt.Errorf("enabled: %w", diags) + } + ch.Enabled = val.True() + } + if attr, ok := content.Attributes["events"]; ok { + val, diags := attr.Expr.Value(ctx) + if diags.HasErrors() { + return nil, fmt.Errorf("events: %w", diags) + } + for _, ev := range val.AsValueSlice() { + ch.Events = append(ch.Events, ev.AsString()) + } + } + if attr, ok := content.Attributes["channel"]; ok { + val, diags := attr.Expr.Value(ctx) + if diags.HasErrors() { + return nil, fmt.Errorf("channel: %w", diags) + } + ch.Channel = val.AsString() + } + return ch, nil +} + // parseBudgetBlock parses a `budget { tokens = N, dollars = M }` block. // Both attributes are optional but at least one must be set (enforced by Validate). func parseBudgetBlock(block *hcl.Block, ctx *hcl.EvalContext) (*Budget, error) { diff --git a/config/gateway_tool_test.go b/config/gateway_tool_test.go new file mode 100644 index 0000000..8959bc6 --- /dev/null +++ b/config/gateway_tool_test.go @@ -0,0 +1,42 @@ +package config_test + +import ( + "squadron/config" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("builtins.gateway.post tool", func() { + // An agent that uses the gateway post tool. + base := func(extra string) string { + return minimalVarsHCL() + minimalModelHCL() + extra + ` +agent "poster" { + model = models.anthropic.claude_sonnet_4 + personality = "Helpful" + tools = [builtins.gateway.post] +} +mission "m" { + commander { model = models.anthropic.claude_sonnet_4 } + agents = [agents.poster] + task "run" { objective = "post a message" } +} +` + } + + It("is rejected when no gateway is configured", func() { + _, f := writeFixture("config.hcl", base("")) + _, err := config.LoadAndValidate(f) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("builtins.gateway.post")) + }) + + It("is accepted when a gateway is configured", func() { + dir := writeFixtures(map[string]string{ + "config.hcl": base(""), + "gateway.hcl": gatewayBlockHCL(), + }) + _, err := config.LoadAndValidate(dir) + Expect(err).NotTo(HaveOccurred()) + }) +}) diff --git a/config/mission.go b/config/mission.go index 861907e..01ff31e 100644 --- a/config/mission.go +++ b/config/mission.go @@ -317,6 +317,7 @@ type Mission struct { Trigger *Trigger `json:"trigger,omitempty"` MaxParallel int `json:"maxParallel,omitempty"` // default 3 Budget *Budget `json:"budget,omitempty"` + Notification *NotificationConfig `json:"notification,omitempty"` // opt-in terminal-event notifications } // GetLocalAgent returns a mission-scoped agent by name, or nil if not found. diff --git a/config/notification.go b/config/notification.go new file mode 100644 index 0000000..f73d641 --- /dev/null +++ b/config/notification.go @@ -0,0 +1,120 @@ +package config + +import "fmt" + +// Notification event names. These mirror the terminal mission lifecycle +// event strings on the wire (protocol.EventMissionCompleted/Failed/Stopped) +// — kept as local constants so the config package stays decoupled from the +// wire protocol package. +const ( + NotifyMissionCompleted = "mission_completed" + NotifyMissionFailed = "mission_failed" + // NotifyAllEvents is a convenience value usable in a channel's `events` + // list that expands to every terminal event. + NotifyAllEvents = "all" +) + +// allNotifyEvents is what "all" expands to. +var allNotifyEvents = []string{ + NotifyMissionCompleted, + NotifyMissionFailed, +} + +func validNotifyEvent(e string) bool { + switch e { + case NotifyMissionCompleted, NotifyMissionFailed, NotifyAllEvents: + return true + } + return false +} + +// NotificationConfig is a mission's `notification { ... }` block. It is +// purely opt-in: a mission without the block has a nil *NotificationConfig +// and emits no notifications. +type NotificationConfig struct { + Gateway *NotificationChannel `json:"gateway,omitempty"` + CommandCenter *NotificationChannel `json:"commandCenter,omitempty"` +} + +// NotificationChannel configures one delivery channel within a mission's +// notification block. +type NotificationChannel struct { + // Enabled toggles delivery. It defaults to true when the block is + // present (set during parsing); set `enabled = false` to keep the + // channel configured but turn delivery off. + Enabled bool `hcl:"enabled,optional" json:"enabled"` + // Events is the explicit list of terminal events that fire on this + // channel. Required and non-empty. Valid values are mission_completed, + // mission_failed, or "all" (every terminal event). + Events []string `hcl:"events,optional" json:"events,omitempty"` + // Channel is a gateway-only per-mission destination override. Empty + // means "use the gateway's globally configured default channel". It is + // rejected on the command_center channel. + Channel string `hcl:"channel,optional" json:"channel,omitempty"` +} + +// EffectiveEvents returns the resolved event set, expanding "all". +func (ch *NotificationChannel) EffectiveEvents() []string { + if ch == nil { + return nil + } + for _, e := range ch.Events { + if e == NotifyAllEvents { + return allNotifyEvents + } + } + return ch.Events +} + +// WantsEvent reports whether the channel should fire for the given event. A +// nil or disabled channel never fires. +func (ch *NotificationChannel) WantsEvent(event string) bool { + if ch == nil || !ch.Enabled { + return false + } + for _, e := range ch.EffectiveEvents() { + if e == event { + return true + } + } + return false +} + +// Validate checks the notification block in isolation. Cross-block checks +// (e.g. gateway channel requires a configured gateway) live in +// Config.Validate. +func (n *NotificationConfig) Validate() error { + if n == nil { + return nil + } + if n.Gateway == nil && n.CommandCenter == nil { + return fmt.Errorf("notification: at least one of 'gateway' or 'command_center' must be set") + } + if err := n.Gateway.validate("gateway", true); err != nil { + return err + } + if err := n.CommandCenter.validate("command_center", false); err != nil { + return err + } + return nil +} + +func (ch *NotificationChannel) validate(name string, allowChannel bool) error { + if ch == nil { + return nil + } + if len(ch.Events) == 0 { + return fmt.Errorf("notification %s: 'events' is required (list one or more of %s, %s, or %q)", + name, NotifyMissionCompleted, NotifyMissionFailed, NotifyAllEvents) + } + for _, e := range ch.Events { + if !validNotifyEvent(e) { + return fmt.Errorf("notification %s: invalid event %q (valid: %s, %s, %q)", + name, e, NotifyMissionCompleted, NotifyMissionFailed, NotifyAllEvents) + } + } + if !allowChannel && ch.Channel != "" { + return fmt.Errorf("notification %s: 'channel' is only valid on the gateway channel", name) + } + return nil +} diff --git a/config/notification_test.go b/config/notification_test.go new file mode 100644 index 0000000..bdba1a0 --- /dev/null +++ b/config/notification_test.go @@ -0,0 +1,165 @@ +package config_test + +import ( + "squadron/config" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Notification Config", func() { + missionWith := func(notificationBlock string, extra string) string { + return fullBaseHCL() + extra + ` +mission "m" { + commander { model = models.anthropic.claude_sonnet_4 } + agents = [agents.test_agent] +` + notificationBlock + ` + task "run" { objective = "do the thing" } +} +` + } + + Describe("parsing", func() { + It("is nil when the mission has no notification block", func() { + _, f := writeFixture("config.hcl", missionWith("", "")) + cfg, err := config.LoadFile(f) + Expect(err).NotTo(HaveOccurred()) + Expect(cfg.Missions[0].Notification).To(BeNil()) + }) + + It("parses both channels with enabled defaulting to true", func() { + block := ` + notification { + gateway { + events = ["mission_failed"] + channel = "#ops" + } + command_center { events = ["all"] } + }` + _, f := writeFixture("config.hcl", missionWith(block, gatewayBlockHCL())) + cfg, err := config.LoadFile(f) + Expect(err).NotTo(HaveOccurred()) + n := cfg.Missions[0].Notification + Expect(n).NotTo(BeNil()) + Expect(n.Gateway.Enabled).To(BeTrue()) + Expect(n.Gateway.Events).To(ConsistOf("mission_failed")) + Expect(n.Gateway.Channel).To(Equal("#ops")) + Expect(n.CommandCenter.Enabled).To(BeTrue()) + Expect(n.CommandCenter.Events).To(ConsistOf("all")) + }) + + It("expands \"all\" to the terminal events", func() { + block := ` + notification { + command_center { events = ["all"] } + }` + _, f := writeFixture("config.hcl", missionWith(block, "")) + cfg, err := config.LoadFile(f) + Expect(err).NotTo(HaveOccurred()) + ch := cfg.Missions[0].Notification.CommandCenter + Expect(ch.EffectiveEvents()).To(ConsistOf( + config.NotifyMissionCompleted, config.NotifyMissionFailed)) + Expect(ch.WantsEvent(config.NotifyMissionFailed)).To(BeTrue()) + }) + + It("honors enabled = false", func() { + block := ` + notification { + command_center { + enabled = false + events = ["all"] + } + }` + _, f := writeFixture("config.hcl", missionWith(block, "")) + cfg, err := config.LoadFile(f) + Expect(err).NotTo(HaveOccurred()) + ch := cfg.Missions[0].Notification.CommandCenter + Expect(ch.Enabled).To(BeFalse()) + Expect(ch.WantsEvent(config.NotifyMissionCompleted)).To(BeFalse()) + }) + }) + + Describe("validation", func() { + It("requires an explicit events list", func() { + block := ` + notification { + command_center { } + }` + _, f := writeFixture("config.hcl", missionWith(block, "")) + _, err := config.LoadFile(f) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("'events' is required")) + }) + + It("rejects an unknown event name", func() { + block := ` + notification { + command_center { events = ["bogus_event"] } + }` + _, f := writeFixture("config.hcl", missionWith(block, "")) + _, err := config.LoadFile(f) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid event")) + }) + + It("rejects 'channel' on the command_center channel", func() { + block := ` + notification { + command_center { + events = ["all"] + channel = "#nope" + } + }` + _, f := writeFixture("config.hcl", missionWith(block, "")) + _, err := config.LoadFile(f) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("only valid on the gateway channel")) + }) + + It("rejects an empty notification block", func() { + block := ` + notification { + }` + _, f := writeFixture("config.hcl", missionWith(block, "")) + _, err := config.LoadFile(f) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("at least one of 'gateway' or 'command_center'")) + }) + + It("rejects a gateway channel when no gateway block is configured", func() { + block := ` + notification { + gateway { events = ["all"] } + }` + _, f := writeFixture("config.hcl", missionWith(block, "")) + cfg, err := config.LoadFile(f) + Expect(err).NotTo(HaveOccurred()) + err = cfg.Validate() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("no gateway block is configured")) + }) + + It("accepts a gateway channel when a gateway block exists", func() { + block := ` + notification { + gateway { events = ["all"] } + }` + _, f := writeFixture("config.hcl", missionWith(block, gatewayBlockHCL())) + cfg, err := config.LoadFile(f) + Expect(err).NotTo(HaveOccurred()) + Expect(cfg.Missions[0].Notification.Gateway).NotTo(BeNil()) + Expect(cfg.Validate()).To(Succeed()) + }) + }) +}) + +func gatewayBlockHCL() string { + return ` +gateway "slack" { + version = "local" + settings = { + channel_id = "C123" + } +} +` +} diff --git a/config/tools_builder.go b/config/tools_builder.go index 4b1a57e..bd02066 100644 --- a/config/tools_builder.go +++ b/config/tools_builder.go @@ -21,7 +21,7 @@ import ( // humanBridge is optional and powers the `builtins.human.ask` tool. Pass // nil when no commander is attached; the tool is still registered and returns // a stable "[no human available]" observation to the agent instead of blocking. -func BuildToolsMap(agentTools []string, customTools []CustomTool, loadedPlugins map[string]*plugin.PluginClient, loadedMCPClients map[string]*squadronmcp.Client, datasetStore aitools.DatasetStore, humanBridge aitools.HumanInputBridge) map[string]aitools.Tool { +func BuildToolsMap(agentTools []string, customTools []CustomTool, loadedPlugins map[string]*plugin.PluginClient, loadedMCPClients map[string]*squadronmcp.Client, datasetStore aitools.DatasetStore, humanBridge aitools.HumanInputBridge, gatewayBridge aitools.GatewayBridge) map[string]aitools.Tool { tools := make(map[string]aitools.Tool) // Build a lookup map for custom tool definitions @@ -45,7 +45,7 @@ func BuildToolsMap(agentTools []string, customTools []CustomTool, loadedPlugins if builtinToolList, ok := BuiltinTools[namespaceName]; ok { for _, toolName := range builtinToolList { ref := "builtins." + namespaceName + "." + toolName - tool := GetBuiltinTool(ref, datasetStore, humanBridge) + tool := GetBuiltinTool(ref, datasetStore, humanBridge, gatewayBridge) if tool != nil { tools[ref] = tool } @@ -101,7 +101,7 @@ func BuildToolsMap(agentTools []string, customTools []CustomTool, loadedPlugins // Check if it's a builtin tool reference (builtins.{namespace}.{tool}) if IsBuiltinTool(toolRef) { - tool := GetBuiltinTool(toolRef, datasetStore, humanBridge) + tool := GetBuiltinTool(toolRef, datasetStore, humanBridge, gatewayBridge) if tool != nil { tools[toolRef] = tool } @@ -161,7 +161,7 @@ func BuildToolsMap(agentTools []string, customTools []CustomTool, loadedPlugins // datasetStore is optional and required for dataset tools. // humanBridge is optional; when nil, the ask tool returns a stable // "[no human available]" observation rather than blocking. -func GetBuiltinTool(ref string, datasetStore aitools.DatasetStore, humanBridge aitools.HumanInputBridge) aitools.Tool { +func GetBuiltinTool(ref string, datasetStore aitools.DatasetStore, humanBridge aitools.HumanInputBridge, gatewayBridge aitools.GatewayBridge) aitools.Tool { switch ref { case "builtins.http.get": return &aitools.HTTPGetTool{} @@ -185,6 +185,8 @@ func GetBuiltinTool(ref string, datasetStore aitools.DatasetStore, humanBridge a return &aitools.CurrentTimeTool{} case "builtins.human.ask": return &aitools.HumanInputTool{Bridge: humanBridge} + case "builtins.gateway.post": + return &aitools.GatewayPostTool{Bridge: gatewayBridge} default: return nil } diff --git a/docs/content/config/command_center.mdx b/docs/content/config/command_center.mdx index 7088865..e3759ba 100644 --- a/docs/content/config/command_center.mdx +++ b/docs/content/config/command_center.mdx @@ -8,6 +8,8 @@ Declare a `command_center` block to connect Squadron outbound to a remote comman When a `command_center` block is present, `squadron engage` skips the local UI and opens a persistent websocket to the remote command center instead. Without the block, `squadron engage` launches the local UI by default (pass `--headless` to opt out entirely). +Beyond live execution, the command center can also receive per-mission [notifications](/missions/notifications) — a bell feed of `mission_completed` / `mission_failed` / `mission_stopped` events for missions that opt in. + ## Minimal Example ```hcl diff --git a/docs/content/config/gateways.mdx b/docs/content/config/gateways.mdx index b8943df..f3a3187 100644 --- a/docs/content/config/gateways.mdx +++ b/docs/content/config/gateways.mdx @@ -8,6 +8,8 @@ A `gateway` block runs a managed subprocess that bridges Squadron to an external The gateway subscribes to live `human_input_requested` / `human_input_resolved` events from Squadron and surfaces them in its target system; user actions in that system flow back to Squadron through the same record store. From the agent's perspective, an answer in Discord is indistinguishable from one typed into the Command Center Inbox. +A gateway can also receive **mission-lifecycle notifications** (`mission_completed` / `mission_failed` / `mission_stopped`) when a mission opts in via a [`notification`](/missions/notifications) block — a one-way post, separate from the interactive human-input flow. + Squadron supports at most one `gateway` block per instance. ## Minimal Example — Discord @@ -33,6 +35,27 @@ gateway "discord" { Restart Squadron and the next `builtins.human.ask` call will appear in the configured Discord channel as a message with quick-reply buttons (or a multi-select dropdown when `multi_select = true` on the tool call). +## Posting from an agent — `builtins.gateway.post` + +When a gateway is configured, agents gain a built-in tool, **`builtins.gateway.post`**, for posting a message to the gateway's channel. Use it to send a heads-up, status update, or summary to the team mid-mission. + +```hcl +agent "announcer" { + model = models.anthropic.claude_sonnet_4 + tools = [builtins.gateway.post] +} +``` + +**The gateway owns the message contract.** Each gateway advertises (via the SDK's `MessageToolSpec`) the tool's description and a JSON Schema for its parameters, so the LLM sees exactly the rich-message shape that gateway supports — squadron just forwards the agent's payload through. See your gateway's README for the exact fields it accepts. + +A gateway that advertises no spec falls back to a simple `{ message, channel }` shape. The tool is one-way (the agent doesn't wait for a reply); for interactive prompts use `builtins.human.ask` instead. + +**Attachments are local files.** Independent of the gateway-owned fields, squadron adds an `attachments` parameter (present only when the mission has [memory or a scratchpad](/missions/memory)). It takes a list of `{ slot, path }` objects referencing squadron's own files — for example `{ "slot": "scratchpad", "path": "report.pdf" }`. Squadron reads each file and ships the bytes to the gateway to upload. Attachments are **never** URLs the model picks, so there is no outbound-fetch (SSRF) surface; the per-file cap is 25 MB. + +> **Tip:** rich payloads (Block Kit `blocks`, Discord `embeds`) are large nested JSON. Smaller models sometimes drop the nested field and post text only. If an agent needs to emit rich layouts reliably, give it a capable model. + +`builtins.gateway.post` is only a valid tool reference **when a gateway block is present** — `squadron verify` rejects it otherwise. + ## Fields | Field | Type | Required | Description | diff --git a/docs/content/missions/_meta.js b/docs/content/missions/_meta.js index d90aaf9..d2bee3b 100644 --- a/docs/content/missions/_meta.js +++ b/docs/content/missions/_meta.js @@ -10,4 +10,5 @@ export default { 'internal-tools': 'Internal Tools', budgets: 'Budgets', schedules: 'Schedules & Triggers', + notifications: 'Notifications', } diff --git a/docs/content/missions/notifications.mdx b/docs/content/missions/notifications.mdx new file mode 100644 index 0000000..c4a27fb --- /dev/null +++ b/docs/content/missions/notifications.mdx @@ -0,0 +1,109 @@ +--- +title: Notifications +--- + +# Notifications + +A `notification` block makes a mission announce its **terminal outcome** — +`mission_completed` or `mission_failed` — to one or both of two channels: the +configured [gateway](/config/gateways) (Discord, Slack, …) and the +[command center](/config/command_center). + +Notifications are **opt-in per mission**: a mission with no `notification` block +emits nothing. + +```hcl +mission "nightly_ingest" { + notification { + gateway { + events = ["mission_failed"] # only ping the gateway on failure + channel = "#ops-alerts" # gateway-only destination override + } + command_center { + events = ["all"] # show every terminal event in the UI + } + } + + commander { model = models.anthropic.claude_sonnet_4 } + agents = [agents.worker] + task "ingest" { objective = "Pull and normalize today's data" } +} +``` + +## Channels + +A `notification` block contains up to two channel sub-blocks. At least one is +required. + +| Channel | Delivery | +|---------|----------| +| `gateway` | Posts a message to the configured gateway's external system (Discord/Slack/…). Requires a top-level [`gateway`](/config/gateways) block. | +| `command_center` | Pushes the notification to the [command center](/config/command_center) UI (a bell feed). No-ops when no command center is connected. | + +A channel fires only when its block is present. To turn a channel off without +deleting its config, set `enabled = false`. + +## Channel fields + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `enabled` | bool | no | `true` | Set `false` to keep the channel configured but stop delivery. | +| `events` | list(string) | **yes** | — | Which terminal events fire. Values: `mission_completed`, `mission_failed`, or `"all"` (every terminal event). | +| `channel` | string | no | gateway default | **gateway-only.** Per-mission destination override — a channel **name** (with or without a leading `#`, e.g. `"#ops-alerts"`) or a raw channel id. The gateway resolves names against its workspace/guild, so you don't need to look up an id. When omitted, the gateway posts to its globally configured default channel. Rejected on `command_center`. | + +`events` is always explicit — there is no implicit "all". Use `events = ["all"]` +to opt into every terminal event. + +## What each event carries + +- **`mission_completed`** — fires when the mission finishes successfully. +- **`mission_failed`** — includes the failure error message. + +A user-initiated stop is **not** a notification event. + +## Examples + +### Failures everywhere, completions only in the UI + +```hcl +notification { + gateway { events = ["mission_failed"] } + command_center { events = ["all"] } +} +``` + +### Route a mission's alerts to a dedicated channel + +```hcl +notification { + gateway { + events = ["all"] + channel = "#ops-alerts" # name or id; this mission posts here, not the gateway default + } +} +``` + +### Temporarily mute a channel + +```hcl +notification { + command_center { + enabled = false # keep the config, stop delivery + events = ["all"] + } +} +``` + +## Validation + +- A `notification` block must declare at least one of `gateway` / `command_center`. +- Each channel's `events` list is required and must contain only valid values + (`mission_completed`, `mission_failed`, `"all"`). +- A `gateway` channel requires a top-level [`gateway`](/config/gateways) block — + `squadron verify` errors otherwise. +- `channel` is rejected on the `command_center` channel. + +## Notes + +- Notifications fire only in serve mode (`squadron engage`), where the mission + runs under the command-center bridge. diff --git a/gateway/manager.go b/gateway/manager.go index ef6b406..d80907e 100644 --- a/gateway/manager.go +++ b/gateway/manager.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/go-plugin" gwsdk "github.com/mlund01/squadron-gateway-sdk" + "squadron/aitools" "squadron/humaninput" "squadron/store" ) @@ -34,6 +35,9 @@ type gatewayClient interface { Configure(ctx context.Context, settings map[string]string) error OnHumanInputRequested(ctx context.Context, rec gwsdk.HumanInputRecord) error OnHumanInputResolved(ctx context.Context, rec gwsdk.HumanInputRecord) error + OnNotification(ctx context.Context, rec gwsdk.NotificationRecord) error + PostMessage(ctx context.Context, req gwsdk.PostMessageRequest) error + MessageToolSpec(ctx context.Context) (gwsdk.MessageToolSpec, error) Shutdown(ctx context.Context) error } @@ -63,10 +67,11 @@ type Manager struct { initialBackoff time.Duration maxBackoff time.Duration - mu sync.Mutex - cfg *Config - client subprocess - gw gatewayClient + mu sync.Mutex + cfg *Config + client subprocess + gw gatewayClient + msgSpec gwsdk.MessageToolSpec // post-tool spec fetched after the gateway starts cancelEvents context.CancelFunc eventDone chan struct{} @@ -143,6 +148,16 @@ func (m *Manager) launchLocked(ctx context.Context, cfg Config) error { m.gw = gw m.cfg = &cfg + // Fetch the post-tool spec so builtins.gateway.post can advertise this + // gateway's message format. Best-effort — a gateway that doesn't + // implement it leaves the default { message } shape. + if spec, specErr := gw.MessageToolSpec(ctx); specErr == nil { + m.msgSpec = spec + } else { + m.msgSpec = gwsdk.MessageToolSpec{} + log.Printf("gateway %q: MessageToolSpec: %v", cfg.Name, specErr) + } + // Subscribe synchronously so an event published immediately after // Start returns can't race ahead of the dispatcher goroutine. events, cancelSub := m.notifier.Subscribe() @@ -275,6 +290,56 @@ func (m *Manager) dispatchLoop(ctx context.Context, events <-chan humaninput.Eve } } +// Notify forwards a mission-lifecycle notification to the running gateway. +// Best-effort: drops silently when no gateway is currently up (mirrors the +// human-input dispatch guard). +func (m *Manager) Notify(ctx context.Context, rec gwsdk.NotificationRecord) error { + m.mu.Lock() + gw := m.gw + m.mu.Unlock() + if gw == nil { + return nil + } + return gw.OnNotification(ctx, rec) +} + +// PostMessage forwards the raw, gateway-schema-shaped payload to the running +// gateway. Returns an error (surfaced to the calling agent) when no gateway is +// up. Satisfies aitools.GatewayBridge. +func (m *Manager) PostMessage(ctx context.Context, payload string, attachments []aitools.GatewayAttachment) error { + m.mu.Lock() + gw := m.gw + m.mu.Unlock() + if gw == nil { + return fmt.Errorf("no gateway is currently running") + } + req := gwsdk.PostMessageRequest{Payload: payload} + for _, a := range attachments { + req.Attachments = append(req.Attachments, gwsdk.FileAttachment{ + Filename: a.Filename, + MimeType: a.MimeType, + Content: a.Content, + }) + } + return gw.PostMessage(ctx, req) +} + +// MessageToolDescription returns the gateway-supplied post-tool description +// (empty when the gateway provides none). Satisfies aitools.GatewayBridge. +func (m *Manager) MessageToolDescription() string { + m.mu.Lock() + defer m.mu.Unlock() + return m.msgSpec.Description +} + +// MessageToolSchema returns the gateway-supplied post-tool params JSON Schema +// (empty when the gateway provides none). Satisfies aitools.GatewayBridge. +func (m *Manager) MessageToolSchema() string { + m.mu.Lock() + defer m.mu.Unlock() + return m.msgSpec.ParamsSchema +} + func (m *Manager) dispatch(ctx context.Context, ev humaninput.Event) { m.mu.Lock() gw := m.gw diff --git a/gateway/manager_test.go b/gateway/manager_test.go index f06047c..d4f0a67 100644 --- a/gateway/manager_test.go +++ b/gateway/manager_test.go @@ -14,6 +14,7 @@ import ( gwsdk "github.com/mlund01/squadron-gateway-sdk" + "squadron/aitools" "squadron/humaninput" "squadron/store" ) @@ -65,10 +66,13 @@ func (f *fakeSubprocess) wasKilled() bool { type fakeGateway struct { mu sync.Mutex configureErr error - configureCalls int - requested []string - resolved []string - shutdowns int + configureCalls int + requested []string + resolved []string + notified []string + posted []string + postedAttachments []gwsdk.FileAttachment + shutdowns int } func (g *fakeGateway) Configure(ctx context.Context, settings map[string]string) error { @@ -93,6 +97,25 @@ func (g *fakeGateway) OnHumanInputResolved(ctx context.Context, rec gwsdk.HumanI return nil } +func (g *fakeGateway) OnNotification(ctx context.Context, rec gwsdk.NotificationRecord) error { + g.mu.Lock() + g.notified = append(g.notified, rec.Event) + g.mu.Unlock() + return nil +} + +func (g *fakeGateway) PostMessage(ctx context.Context, req gwsdk.PostMessageRequest) error { + g.mu.Lock() + g.posted = append(g.posted, req.Payload) + g.postedAttachments = req.Attachments + g.mu.Unlock() + return nil +} + +func (g *fakeGateway) MessageToolSpec(ctx context.Context) (gwsdk.MessageToolSpec, error) { + return gwsdk.MessageToolSpec{Description: "fake gateway", ParamsSchema: `{"type":"object"}`}, nil +} + func (g *fakeGateway) Shutdown(ctx context.Context) error { g.mu.Lock() g.shutdowns++ @@ -106,6 +129,12 @@ func (g *fakeGateway) snapshot() (cfg int, req, res []string, sd int) { return g.configureCalls, append([]string(nil), g.requested...), append([]string(nil), g.resolved...), g.shutdowns } +func (g *fakeGateway) notifiedEvents() []string { + g.mu.Lock() + defer g.mu.Unlock() + return append([]string(nil), g.notified...) +} + // scriptedLauncher returns a sequence of (gateway, subprocess) pairs in // order. The Nth call to launch returns the Nth scripted result. Tests // supply enough entries to cover the initial start plus expected @@ -162,6 +191,66 @@ func newTestManager(launch launcher) *Manager { return m } +var _ = Describe("Manager.Notify", func() { + It("forwards a notification to the running gateway", func() { + gw := &fakeGateway{} + proc := &fakeSubprocess{} + s := &scriptedLauncher{results: []launchResult{{gw, proc}}} + + m := newTestManager(s.launcher()) + Expect(m.Start(context.Background(), Config{Name: "discord", Version: "local"})).To(Succeed()) + DeferCleanup(m.Stop) + + err := m.Notify(context.Background(), gwsdk.NotificationRecord{Event: "mission_completed"}) + Expect(err).NotTo(HaveOccurred()) + Expect(gw.notifiedEvents()).To(ConsistOf("mission_completed")) + }) + + It("no-ops when no gateway is running", func() { + m := newTestManager((&scriptedLauncher{}).launcher()) + Expect(m.Notify(context.Background(), gwsdk.NotificationRecord{Event: "mission_failed"})).To(Succeed()) + }) +}) + +var _ = Describe("Manager.PostMessage", func() { + It("forwards a message to the running gateway", func() { + gw := &fakeGateway{} + s := &scriptedLauncher{results: []launchResult{{gw, &fakeSubprocess{}}}} + m := newTestManager(s.launcher()) + Expect(m.Start(context.Background(), Config{Name: "discord", Version: "local"})).To(Succeed()) + DeferCleanup(m.Stop) + + Expect(m.PostMessage(context.Background(), `{"text":"deploy done"}`, nil)).To(Succeed()) + gw.mu.Lock() + posted := append([]string(nil), gw.posted...) + gw.mu.Unlock() + Expect(posted).To(ConsistOf(`{"text":"deploy done"}`)) + Expect(m.MessageToolDescription()).To(Equal("fake gateway")) + }) + + It("forwards file attachments to the gateway as bytes", func() { + gw := &fakeGateway{} + s := &scriptedLauncher{results: []launchResult{{gw, &fakeSubprocess{}}}} + m := newTestManager(s.launcher()) + Expect(m.Start(context.Background(), Config{Name: "discord", Version: "local"})).To(Succeed()) + DeferCleanup(m.Stop) + + atts := []aitools.GatewayAttachment{{Filename: "r.txt", MimeType: "text/plain", Content: []byte("hi")}} + Expect(m.PostMessage(context.Background(), `{"text":"x"}`, atts)).To(Succeed()) + gw.mu.Lock() + got := append([]gwsdk.FileAttachment(nil), gw.postedAttachments...) + gw.mu.Unlock() + Expect(got).To(HaveLen(1)) + Expect(got[0].Filename).To(Equal("r.txt")) + Expect(string(got[0].Content)).To(Equal("hi")) + }) + + It("errors when no gateway is running", func() { + m := newTestManager((&scriptedLauncher{}).launcher()) + Expect(m.PostMessage(context.Background(), `{"text":"hi"}`, nil)).To(MatchError(ContainSubstring("no gateway"))) + }) +}) + var _ = Describe("Manager.Start / Stop", func() { It("launches and configures the gateway with the supplied settings", func() { gw := &fakeGateway{} diff --git a/gateway/notify.go b/gateway/notify.go new file mode 100644 index 0000000..8480f09 --- /dev/null +++ b/gateway/notify.go @@ -0,0 +1,44 @@ +package gateway + +import ( + "context" + + gwsdk "github.com/mlund01/squadron-gateway-sdk" + + "squadron/config" + "squadron/notification" +) + +// NotifySink adapts the gateway Manager to notification.Sink so the +// notification dispatcher can deliver mission-lifecycle notifications to the +// configured gateway subprocess. +type NotifySink struct { + mgr *Manager +} + +// NewNotifySink wraps a Manager as a notification.Sink. +func NewNotifySink(mgr *Manager) *NotifySink { + return &NotifySink{mgr: mgr} +} + +// Notify converts the Record (honoring the mission's per-channel gateway +// override) and forwards it to the gateway subprocess. +func (s *NotifySink) Notify(ctx context.Context, ch *config.NotificationChannel, rec notification.Record) error { + if s == nil || s.mgr == nil { + return nil + } + channel := "" + if ch != nil { + channel = ch.Channel + } + return s.mgr.Notify(ctx, gwsdk.NotificationRecord{ + MissionID: rec.MissionID, + MissionName: rec.MissionName, + Event: rec.Event, + Title: rec.Title, + Message: rec.Message, + OccurredAt: rec.OccurredAt, + Error: rec.Error, + Channel: channel, + }) +} diff --git a/go.mod b/go.mod index 906c9cd..a16de88 100644 --- a/go.mod +++ b/go.mod @@ -13,12 +13,13 @@ require ( github.com/hashicorp/hcl/v2 v2.24.0 github.com/jackc/pgx/v5 v5.8.0 github.com/mark3labs/mcp-go v0.46.0 - github.com/mlund01/squadron-gateway-sdk v0.0.1 + github.com/mlund01/squadron-gateway-sdk v0.0.3 github.com/mlund01/squadron-sdk v0.0.31 - github.com/mlund01/squadron-wire v0.0.41 + github.com/mlund01/squadron-wire v0.0.42 github.com/onsi/ginkgo/v2 v2.28.1 github.com/onsi/gomega v1.39.1 github.com/openai/openai-go v1.12.0 + github.com/pelletier/go-toml/v2 v2.3.1 github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cobra v1.10.2 github.com/zclconf/go-cty v1.16.3 @@ -84,7 +85,6 @@ require ( github.com/ncruces/go-strftime v1.0.0 // indirect github.com/oklog/run v1.1.0 // indirect github.com/pb33f/ordered-map/v2 v2.3.1 // indirect - github.com/pelletier/go-toml/v2 v2.3.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/spf13/cast v1.7.1 // indirect diff --git a/go.sum b/go.sum index 751e91c..01c0052 100644 --- a/go.sum +++ b/go.sum @@ -168,12 +168,12 @@ github.com/microcosm-cc/bluemonday v1.0.27 h1:MpEUotklkwCSLeH+Qdx1VJgNqLlpY2KXwX github.com/microcosm-cc/bluemonday v1.0.27/go.mod h1:jFi9vgW+H7c3V0lb6nR74Ib/DIB5OBs92Dimizgw2cA= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= -github.com/mlund01/squadron-gateway-sdk v0.0.1 h1:tlBIJvzfPyIaW8A/jmlXETIzHV4HiOZxejf6PqKw2V4= -github.com/mlund01/squadron-gateway-sdk v0.0.1/go.mod h1:G1TD///Cn18dz6LDELlYuwnavrzMAMnVVrW2hO6jkkw= +github.com/mlund01/squadron-gateway-sdk v0.0.3 h1:z6ifEAGOelM3J9+BNCukjE3ltsvYp37waoyH09oAeNk= +github.com/mlund01/squadron-gateway-sdk v0.0.3/go.mod h1:G1TD///Cn18dz6LDELlYuwnavrzMAMnVVrW2hO6jkkw= github.com/mlund01/squadron-sdk v0.0.31 h1:J9URYtoqlIHHa2cilAorhTcaUZStH96YwJw9OldZV1Y= github.com/mlund01/squadron-sdk v0.0.31/go.mod h1:pAx3fSqD4TLliuWQqawosGCk6t4waUlmj35RFGQPlhA= -github.com/mlund01/squadron-wire v0.0.41 h1:Jf4ElHuvtIE1PpbnH7HtwMZwkU8HA/ygEkWdwQFXDn8= -github.com/mlund01/squadron-wire v0.0.41/go.mod h1:BmgUAhEkibCiJ2Cre+qfLs/KjeqTmm4BkfcMu6M+jLU= +github.com/mlund01/squadron-wire v0.0.42 h1:e9xov89ikMc2qkRceU2HkbzEW8z8C+iKy4iSmyK3jUQ= +github.com/mlund01/squadron-wire v0.0.42/go.mod h1:BmgUAhEkibCiJ2Cre+qfLs/KjeqTmm4BkfcMu6M+jLU= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/muesli/reflow v0.3.0 h1:IFsN6K9NfGtjeggFP+68I4chLZV2yIKsXJFNZ+eWh6s= diff --git a/mission/runner.go b/mission/runner.go index a82ccf1..5510577 100644 --- a/mission/runner.go +++ b/mission/runner.go @@ -82,6 +82,11 @@ type Runner struct { // the tool then surfaces "[no human available]" instead of blocking. humanBridge aitools.HumanInputBridge + // gatewayBridge powers builtins.gateway.post on agents spawned by this + // mission. Nil when no gateway is configured; the tool then surfaces + // "[no gateway configured]". + gatewayBridge aitools.GatewayBridge + // Task state manager — single authority for task lifecycle stateMgr *TaskStateManager @@ -149,6 +154,15 @@ func WithHumanBridge(bridge aitools.HumanInputBridge) RunnerOption { } } +// WithGatewayBridge wires a gateway bridge into agents spawned by this +// mission so builtins.gateway.post can post to the configured gateway. Pass +// nil (or omit) to disable — the tool then returns the no-gateway observation. +func WithGatewayBridge(bridge aitools.GatewayBridge) RunnerOption { + return func(r *Runner) { + r.gatewayBridge = bridge + } +} + // testProvider returns a provider from the factory if set, or nil (letting the commander/agent create its own). func (r *Runner) testProvider() llm.Provider { if r.providerFactory != nil { @@ -373,6 +387,17 @@ func (r *Runner) DrainCh() <-chan struct{} { return r.drainCh } +// MissionName returns the name of the mission this runner executes. +func (r *Runner) MissionName() string { + return r.mission.Name +} + +// NotificationConfig returns the mission's notification config, or nil when +// the mission did not declare a `notification` block. +func (r *Runner) NotificationConfig() *config.NotificationConfig { + return r.mission.Notification +} + // NextMission returns the mission name to launch as a result of cross-mission routing, or "". func (r *Runner) NextMission() string { return r.nextMission @@ -1012,6 +1037,7 @@ func (r *Runner) resaturateCommanders(ctx context.Context, completedTaskNames [] Provider: r.testProvider(), Budget: r.budgetTracker.For(taskName), HumanBridge: r.humanBridge, + GatewayBridge: r.gatewayBridge, }) if err != nil { return fmt.Errorf("creating commander for resaturation of '%s': %w", taskName, err) @@ -1049,14 +1075,15 @@ func (r *Runner) resaturateCommanders(ctx context.Context, completedTaskNames [] continue // Non-fatal: skip agent if messages can't be loaded } restoredAgent, err := agent.RestoreAgent(ctx, agent.Options{ - ConfigPath: r.configPath, - Config: r.cfg, - AgentName: agentName, - SecretInfos: r.secretInfos, - SecretValues: r.secretValues, - DatasetStore: r, - MemoryStore: r.memoryStore, - HumanBridge: r.humanBridge, + ConfigPath: r.configPath, + Config: r.cfg, + AgentName: agentName, + SecretInfos: r.secretInfos, + SecretValues: r.secretValues, + DatasetStore: r, + MemoryStore: r.memoryStore, + HumanBridge: r.humanBridge, + GatewayBridge: r.gatewayBridge, }, agentLLMMsgs) if err != nil { continue // Non-fatal: skip agent if it can't be restored @@ -1132,15 +1159,16 @@ func (r *Runner) restoreAgentSessions(ctx context.Context, sup *agent.Commander, llmMsgs = agent.HealSessionMessages(llmMsgs) mode := config.ModeMission restoredAgent, err := agent.RestoreAgent(ctx, agent.Options{ - ConfigPath: r.configPath, - Config: r.cfg, - AgentName: s.AgentName, - Mode: &mode, - SecretInfos: r.secretInfos, - SecretValues: r.secretValues, - DatasetStore: r, - MemoryStore: r.memoryStore, - HumanBridge: r.humanBridge, + ConfigPath: r.configPath, + Config: r.cfg, + AgentName: s.AgentName, + Mode: &mode, + SecretInfos: r.secretInfos, + SecretValues: r.secretValues, + DatasetStore: r, + MemoryStore: r.memoryStore, + HumanBridge: r.humanBridge, + GatewayBridge: r.gatewayBridge, }, llmMsgs) if err != nil { continue @@ -1274,6 +1302,7 @@ func (r *Runner) runTask(ctx context.Context, task config.Task, missionID string Provider: r.testProvider(), Budget: r.budgetTracker.For(task.Name), HumanBridge: r.humanBridge, + GatewayBridge: r.gatewayBridge, }) if err != nil { errStr := err.Error() @@ -2074,6 +2103,7 @@ Continue until dataset_next returns "exhausted".`, len(items), taskObjective) Provider: r.testProvider(), Budget: r.budgetTracker.For(task.Name), HumanBridge: r.humanBridge, + GatewayBridge: r.gatewayBridge, }) if err != nil { return []IterationResult{{ @@ -2527,6 +2557,7 @@ Continue until dataset_next returns "exhausted".`, len(remainingItems), taskObje Provider: r.testProvider(), Budget: r.budgetTracker.For(task.Name), HumanBridge: r.humanBridge, + GatewayBridge: r.gatewayBridge, }) if err != nil { return append(iterations, IterationResult{ @@ -2770,6 +2801,7 @@ func (r *Runner) runSingleIteration(ctx context.Context, task config.Task, index Provider: r.testProvider(), Budget: r.budgetTracker.For(task.Name), HumanBridge: r.humanBridge, + GatewayBridge: r.gatewayBridge, }) if err != nil { streamer.IterationFailed(task.Name, index, err) diff --git a/notification/dispatcher.go b/notification/dispatcher.go new file mode 100644 index 0000000..d544515 --- /dev/null +++ b/notification/dispatcher.go @@ -0,0 +1,68 @@ +// Package notification delivers mission-lifecycle notifications +// (mission_completed / mission_failed) to the channels a mission opted into +// via its `notification { ... }` config block. +// +// It is intentionally separate from human-input: notifications are one-way, +// informational, and never block a mission. The dispatcher fans a Record out +// to the enabled, event-matching channels; missions without a notification +// block produce no Records. +package notification + +import ( + "context" + "log" + "time" + + "squadron/config" +) + +// Record is a single mission-lifecycle notification. +type Record struct { + MissionID string + MissionName string + // Event is one of config.NotifyMission{Completed,Failed}. + Event string + Title string + Message string + OccurredAt time.Time + // Error is set for mission_failed. + Error string +} + +// Sink delivers a Record to one external surface. The per-channel +// NotificationChannel config (including the gateway channel override) is +// passed through so the sink can honor it. +type Sink interface { + Notify(ctx context.Context, ch *config.NotificationChannel, rec Record) error +} + +// Dispatcher fans Records out to the configured sinks. +type Dispatcher struct { + gateway Sink + commandCenter Sink +} + +// NewDispatcher wires the available sinks. Either may be nil (e.g. no gateway +// configured), in which case the corresponding channel is skipped. +func NewDispatcher(gateway, commandCenter Sink) *Dispatcher { + return &Dispatcher{gateway: gateway, commandCenter: commandCenter} +} + +// Dispatch resolves the mission's notification config and delivers the Record +// to every enabled channel whose event filter matches. A nil cfg (mission has +// no notification block) is a no-op. +func (d *Dispatcher) Dispatch(ctx context.Context, cfg *config.NotificationConfig, rec Record) { + if d == nil || cfg == nil { + return + } + if d.gateway != nil && cfg.Gateway.WantsEvent(rec.Event) { + if err := d.gateway.Notify(ctx, cfg.Gateway, rec); err != nil { + log.Printf("notification: gateway channel: %v", err) + } + } + if d.commandCenter != nil && cfg.CommandCenter.WantsEvent(rec.Event) { + if err := d.commandCenter.Notify(ctx, cfg.CommandCenter, rec); err != nil { + log.Printf("notification: command_center channel: %v", err) + } + } +} diff --git a/notification/dispatcher_test.go b/notification/dispatcher_test.go new file mode 100644 index 0000000..04272cd --- /dev/null +++ b/notification/dispatcher_test.go @@ -0,0 +1,94 @@ +package notification_test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "squadron/config" + "squadron/notification" +) + +type fakeSink struct { + events []string +} + +func (s *fakeSink) Notify(_ context.Context, _ *config.NotificationChannel, rec notification.Record) error { + s.events = append(s.events, rec.Event) + return nil +} + +var _ = Describe("Dispatcher", func() { + var ( + gw *fakeSink + cc *fakeSink + d *notification.Dispatcher + ) + + BeforeEach(func() { + gw = &fakeSink{} + cc = &fakeSink{} + d = notification.NewDispatcher(gw, cc) + }) + + rec := func(event string) notification.Record { + return notification.Record{MissionID: "m1", MissionName: "m", Event: event} + } + + It("no-ops when the mission has no notification config", func() { + d.Dispatch(context.Background(), nil, rec(config.NotifyMissionCompleted)) + Expect(gw.events).To(BeEmpty()) + Expect(cc.events).To(BeEmpty()) + }) + + allCh := func() *config.NotificationChannel { + return &config.NotificationChannel{Enabled: true, Events: []string{config.NotifyAllEvents}} + } + + It("fans out to both enabled channels when the event matches", func() { + cfg := &config.NotificationConfig{Gateway: allCh(), CommandCenter: allCh()} + d.Dispatch(context.Background(), cfg, rec(config.NotifyMissionCompleted)) + Expect(gw.events).To(ConsistOf(config.NotifyMissionCompleted)) + Expect(cc.events).To(ConsistOf(config.NotifyMissionCompleted)) + }) + + It("respects a per-channel event filter", func() { + cfg := &config.NotificationConfig{ + Gateway: &config.NotificationChannel{Enabled: true, Events: []string{config.NotifyMissionFailed}}, + CommandCenter: allCh(), + } + d.Dispatch(context.Background(), cfg, rec(config.NotifyMissionCompleted)) + Expect(gw.events).To(BeEmpty()) // filtered out + Expect(cc.events).To(ConsistOf(config.NotifyMissionCompleted)) + + d.Dispatch(context.Background(), cfg, rec(config.NotifyMissionFailed)) + Expect(gw.events).To(ConsistOf(config.NotifyMissionFailed)) + }) + + It("skips a disabled channel", func() { + cfg := &config.NotificationConfig{ + Gateway: &config.NotificationChannel{Enabled: false, Events: []string{config.NotifyAllEvents}}, + } + d.Dispatch(context.Background(), cfg, rec(config.NotifyMissionCompleted)) + Expect(gw.events).To(BeEmpty()) + }) + + It("skips an omitted channel", func() { + // Only the gateway channel is present; command_center is omitted. + cfg := &config.NotificationConfig{Gateway: allCh()} + d.Dispatch(context.Background(), cfg, rec(config.NotifyMissionCompleted)) + Expect(gw.events).To(ConsistOf(config.NotifyMissionCompleted)) + Expect(cc.events).To(BeEmpty()) + }) + + It("skips a channel with no sink wired", func() { + // Only a command-center sink is wired; gateway sink is nil. + d = notification.NewDispatcher(nil, cc) + cfg := &config.NotificationConfig{Gateway: allCh(), CommandCenter: allCh()} + Expect(func() { + d.Dispatch(context.Background(), cfg, rec(config.NotifyMissionFailed)) + }).NotTo(Panic()) + Expect(cc.events).To(ConsistOf(config.NotifyMissionFailed)) + }) +}) diff --git a/notification/notification_suite_test.go b/notification/notification_suite_test.go new file mode 100644 index 0000000..b5ccece --- /dev/null +++ b/notification/notification_suite_test.go @@ -0,0 +1,13 @@ +package notification_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestNotification(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Notification Suite") +} diff --git a/wsbridge/client.go b/wsbridge/client.go index aaa4935..02e7981 100644 --- a/wsbridge/client.go +++ b/wsbridge/client.go @@ -13,8 +13,10 @@ import ( "github.com/mlund01/squadron-wire/protocol" "squadron/agent" + "squadron/aitools" "squadron/config" "squadron/humaninput" + "squadron/notification" "squadron/store" ) @@ -41,7 +43,15 @@ type Client struct { stores *store.Bundle version string - ws *websocket.Conn + // connMu guards the per-connection handles (ws/done/connQuit) so a + // reconnect can atomically tear down the previous connection's pumps + // before swapping in the new socket. Each pump captures its own conn, + // so a stale pump never touches the replacement socket — which is what + // gorilla/websocket's single-reader / single-writer rule requires. + connMu sync.Mutex + ws *websocket.Conn + connQuit chan struct{} // closed to stop the current connection's pumps + send chan []byte connected bool // true after successful Connect + register @@ -73,6 +83,13 @@ type Client struct { // In-process notifier for human-input events; nil = no-op. humanInputNotifier *humaninput.Notifier + // Dispatcher for mission-lifecycle notifications; nil = no-op. + notifier *notification.Dispatcher + + // Gateway bridge powering builtins.gateway.post on mission agents; nil + // when no gateway is configured. + gatewayBridge aitools.GatewayBridge + // Lifecycle done chan struct{} ctx context.Context @@ -184,18 +201,37 @@ func (c *Client) connectToURL(url string) error { if err != nil { return fmt.Errorf("dial command center: %w", err) } + + // Tear down any previous connection's pumps before swapping in the new + // socket: signal them to stop and close the old conn so a lingering + // readPump/writePump can't keep operating on a shared field that now + // points at the new socket (which caused "concurrent write to websocket + // connection" panics on reconnect). + c.connMu.Lock() + if c.connQuit != nil { + close(c.connQuit) + } + if c.ws != nil { + c.ws.Close() + } + quit := make(chan struct{}) + done := make(chan struct{}) c.ws = ws - c.done = make(chan struct{}) + c.connQuit = quit + c.done = done + c.connMu.Unlock() - // Start pumps first — register() needs them to send/receive messages - go c.readPump() - go c.writePump() + // Each pump captures its own conn + quit so it only ever touches the + // connection it was started for. Start pumps first — register() needs + // them to send/receive messages. + go c.readPump(ws, done) + go c.writePump(ws, quit) // Register with commander. If registration fails, tear down just the // socket — do NOT call Close(), which would cancel c.ctx and prevent // any future reconnect attempts on this client. if err := c.register(); err != nil { - c.ws.Close() + ws.Close() return fmt.Errorf("register: %w", err) } @@ -246,9 +282,15 @@ func (c *Client) SetConcurrencyTracker(ct ConcurrencyTracker) { func (c *Client) Close() { c.connected = false c.stop() + c.connMu.Lock() + if c.connQuit != nil { + close(c.connQuit) + c.connQuit = nil + } if c.ws != nil { c.ws.Close() } + c.connMu.Unlock() } // InstanceID returns the ID assigned by commander. @@ -390,20 +432,20 @@ func (c *Client) register() error { return nil } -func (c *Client) readPump() { +func (c *Client) readPump(ws *websocket.Conn, done chan struct{}) { defer func() { - close(c.done) - c.ws.Close() + close(done) + ws.Close() }() - c.ws.SetReadDeadline(time.Now().Add(pongWait)) - c.ws.SetPongHandler(func(string) error { - c.ws.SetReadDeadline(time.Now().Add(pongWait)) + ws.SetReadDeadline(time.Now().Add(pongWait)) + ws.SetPongHandler(func(string) error { + ws.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { - _, message, err := c.ws.ReadMessage() + _, message, err := ws.ReadMessage() if err != nil { if c.connected && websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { log.Printf("WebSocket read error: %v", err) @@ -421,29 +463,33 @@ func (c *Client) readPump() { } } -func (c *Client) writePump() { +func (c *Client) writePump(ws *websocket.Conn, quit <-chan struct{}) { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() - c.ws.Close() + ws.Close() }() for { select { case message, ok := <-c.send: - c.ws.SetWriteDeadline(time.Now().Add(writeWait)) + ws.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { - c.ws.WriteMessage(websocket.CloseMessage, []byte{}) + ws.WriteMessage(websocket.CloseMessage, []byte{}) return } - if err := c.ws.WriteMessage(websocket.TextMessage, message); err != nil { + if err := ws.WriteMessage(websocket.TextMessage, message); err != nil { return } case <-ticker.C: - c.ws.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.ws.WriteMessage(websocket.PingMessage, nil); err != nil { + ws.SetWriteDeadline(time.Now().Add(writeWait)) + if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil { return } + case <-quit: + // Connection replaced by a reconnect — stop before we can race + // the new connection's writePump. + return case <-c.ctx.Done(): return } @@ -522,6 +568,27 @@ func (c *Client) SendEvent(env *protocol.Envelope) error { return c.sendEnvelope(env) } +// SetNotifier attaches the mission-lifecycle notification dispatcher. +func (c *Client) SetNotifier(n *notification.Dispatcher) { + c.notifier = n +} + +// SetGatewayBridge attaches the gateway bridge that powers +// builtins.gateway.post on mission agents. +func (c *Client) SetGatewayBridge(b aitools.GatewayBridge) { + c.gatewayBridge = b +} + +// dispatchNotification fans a mission-lifecycle notification out to the +// channels the mission opted into. No-op when no dispatcher is attached or the +// mission declared no notification config. +func (c *Client) dispatchNotification(cfg *config.NotificationConfig, rec notification.Record) { + if c.notifier == nil { + return + } + c.notifier.Dispatch(context.Background(), cfg, rec) +} + func (c *Client) sendRequest(env *protocol.Envelope) (*protocol.Envelope, error) { ch := make(chan *protocol.Envelope, 1) diff --git a/wsbridge/convert.go b/wsbridge/convert.go index c35df53..8aaaa05 100644 --- a/wsbridge/convert.go +++ b/wsbridge/convert.go @@ -190,6 +190,10 @@ func ConfigToInstanceConfig(cfg *config.Config) protocol.InstanceConfig { if namespace == "dataset" { continue } + // The gateway tool only exists when a gateway is configured. + if namespace == "gateway" && cfg.Gateway == nil { + continue + } pi := protocol.PluginInfo{ Name: namespace, Path: "builtin", @@ -198,7 +202,7 @@ func ConfigToInstanceConfig(cfg *config.Config) protocol.InstanceConfig { } for _, toolName := range tools { ref := "builtins." + namespace + "." + toolName - if tool := config.GetBuiltinTool(ref, nil, nil); tool != nil { + if tool := config.GetBuiltinTool(ref, nil, nil, nil); tool != nil { ti := aitoolToProtocolToolInfo(tool) ti.Name = toolName // Use config-level name, not legacy ToolName() pi.Tools = append(pi.Tools, ti) diff --git a/wsbridge/handlers.go b/wsbridge/handlers.go index 09a348b..25b987c 100644 --- a/wsbridge/handlers.go +++ b/wsbridge/handlers.go @@ -16,6 +16,7 @@ import ( "squadron/agent" "squadron/config" "squadron/mission" + "squadron/notification" "squadron/store" "squadron/streamers" ) @@ -120,7 +121,7 @@ func (c *Client) handleRunMission(env *protocol.Envelope) (*protocol.Envelope, e // Create mission runner with no-op debug logger debugLogger, _ := mission.NewDebugLogger("") - runner, err := mission.NewRunner(cfg, c.configPath, payload.MissionName, payload.Inputs, mission.WithDebugLogger(debugLogger), mission.WithHumanBridge(c)) + runner, err := mission.NewRunner(cfg, c.configPath, payload.MissionName, payload.Inputs, mission.WithDebugLogger(debugLogger), mission.WithHumanBridge(c), mission.WithGatewayBridge(c.gatewayBridge)) if err != nil { c.concurrency.NotifyMissionDone(payload.MissionName) return protocol.NewResponse(env.RequestID, protocol.TypeRunMissionAck, &protocol.RunMissionAckPayload{ @@ -232,7 +233,7 @@ func (c *Client) handleResumeMission(env *protocol.Envelope) (*protocol.Envelope mission.WithDebugLogger(debugLogger), mission.WithResume(payload.MissionID), mission.WithHumanBridge(c), - ) + mission.WithGatewayBridge(c.gatewayBridge)) if err != nil { return protocol.NewResponse(env.RequestID, protocol.TypeResumeMissionAck, &protocol.ResumeMissionAckPayload{ Accepted: false, @@ -1258,6 +1259,18 @@ func (c *Client) runMissionChain(ctx context.Context, cancel context.CancelFunc, Error: err.Error(), }) c.SendEvent(completeEnv) + // Only mission_failed is a notification event; a user-initiated + // stop does not fire one. + if status == "failed" { + c.dispatchNotification(runner.NotificationConfig(), notification.Record{ + MissionID: mid, + MissionName: missionName, + Event: config.NotifyMissionFailed, + Title: "Mission \"" + missionName + "\" failed", + OccurredAt: time.Now(), + Error: err.Error(), + }) + } runner.CloseStores() return } @@ -1268,6 +1281,13 @@ func (c *Client) runMissionChain(ctx context.Context, cancel context.CancelFunc, Status: "completed", }) c.SendEvent(completeEnv) + c.dispatchNotification(runner.NotificationConfig(), notification.Record{ + MissionID: mid, + MissionName: missionName, + Event: config.NotifyMissionCompleted, + Title: "Mission \"" + missionName + "\" completed", + OccurredAt: time.Now(), + }) // Check for cross-mission routing nextMission := runner.NextMission() @@ -1287,7 +1307,7 @@ func (c *Client) runMissionChain(ctx context.Context, cancel context.CancelFunc, cfg := c.getConfig() debugLogger, _ := mission.NewDebugLogger("") var newErr error - runner, newErr = mission.NewRunner(cfg, c.configPath, nextMission, inputs, mission.WithDebugLogger(debugLogger), mission.WithHumanBridge(c)) + runner, newErr = mission.NewRunner(cfg, c.configPath, nextMission, inputs, mission.WithDebugLogger(debugLogger), mission.WithHumanBridge(c), mission.WithGatewayBridge(c.gatewayBridge)) if newErr != nil { log.Printf("Failed to create runner for chained mission %q: %v", nextMission, newErr) return @@ -1354,7 +1374,7 @@ func (c *Client) ResumeOrphanedMissions() { mission.WithDebugLogger(debugLogger), mission.WithResume(r.ID), mission.WithHumanBridge(c), - ) + mission.WithGatewayBridge(c.gatewayBridge)) if err != nil { log.Printf("auto-resume: failed to create runner for %q: %v", r.MissionName, err) continue @@ -1403,7 +1423,7 @@ func (c *Client) RunScheduledMission(missionName, source string, inputs map[stri log.Printf("scheduler: starting mission %q (%s)", missionName, source) debugLogger, _ := mission.NewDebugLogger("") - runner, err := mission.NewRunner(cfg, c.configPath, missionName, inputs, mission.WithDebugLogger(debugLogger), mission.WithHumanBridge(c)) + runner, err := mission.NewRunner(cfg, c.configPath, missionName, inputs, mission.WithDebugLogger(debugLogger), mission.WithHumanBridge(c), mission.WithGatewayBridge(c.gatewayBridge)) if err != nil { log.Printf("scheduler: failed to create runner for %q: %v", missionName, err) c.concurrency.NotifyMissionDone(missionName) @@ -1459,7 +1479,7 @@ func (c *Client) RunMissionDirect(missionName string, inputs map[string]string) // Create mission runner debugLogger, _ := mission.NewDebugLogger("") - runner, err := mission.NewRunner(cfg, c.configPath, missionName, inputs, mission.WithDebugLogger(debugLogger), mission.WithHumanBridge(c)) + runner, err := mission.NewRunner(cfg, c.configPath, missionName, inputs, mission.WithDebugLogger(debugLogger), mission.WithHumanBridge(c), mission.WithGatewayBridge(c.gatewayBridge)) if err != nil { c.concurrency.NotifyMissionDone(missionName) return "", fmt.Errorf("failed to create runner: %w", err) diff --git a/wsbridge/notify.go b/wsbridge/notify.go new file mode 100644 index 0000000..888f680 --- /dev/null +++ b/wsbridge/notify.go @@ -0,0 +1,47 @@ +package wsbridge + +import ( + "context" + "time" + + "github.com/mlund01/squadron-wire/protocol" + + "squadron/config" + "squadron/notification" +) + +// NotifySink adapts the wsbridge Client to notification.Sink, pushing +// mission-lifecycle notifications to the command center as TypeNotification +// envelopes. When no command center is connected it silently no-ops. +type NotifySink struct { + client *Client +} + +// NewNotifySink wraps a Client as a notification.Sink. +func NewNotifySink(client *Client) *NotifySink { + return &NotifySink{client: client} +} + +// Notify sends the Record to the command center. The per-channel config is +// unused here — the command center has no channel override. +func (s *NotifySink) Notify(ctx context.Context, _ *config.NotificationChannel, rec notification.Record) error { + if s == nil || s.client == nil || !s.client.IsConnected() { + return nil + } + + payload := protocol.NotificationPayload{ + MissionID: rec.MissionID, + MissionName: rec.MissionName, + Event: rec.Event, + Title: rec.Title, + Message: rec.Message, + OccurredAt: rec.OccurredAt.UTC().Format(time.RFC3339Nano), + Error: rec.Error, + } + + env, err := protocol.NewEvent(protocol.TypeNotification, &payload) + if err != nil { + return err + } + return s.client.SendEvent(env) +} diff --git a/wsbridge/wsbridge_test.go b/wsbridge/wsbridge_test.go index 7912cf1..b96484c 100644 --- a/wsbridge/wsbridge_test.go +++ b/wsbridge/wsbridge_test.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -35,36 +36,37 @@ func newTestBundle(t *testing.T) *store.Bundle { var upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} -// mockCommander is a minimal WebSocket server that mimics a commander for testing. +// mockCommander is a minimal WebSocket server that mimics a commander for +// testing. conn is the most recent client connection — it's replaced on every +// (re)connect, so access is guarded by mu. type mockCommander struct { - srv *httptest.Server - conn *websocket.Conn - t *testing.T + srv *httptest.Server + t *testing.T + mu sync.Mutex + conn *websocket.Conn + writeMu sync.Mutex // serializes writes — tests send from multiple goroutines } func newMockCommander(t *testing.T) *mockCommander { t.Helper() mc := &mockCommander{t: t} - connCh := make(chan *websocket.Conn, 1) mux := http.NewServeMux() mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { - t.Fatalf("upgrade: %v", err) + t.Errorf("upgrade: %v", err) + return } - connCh <- ws + mc.mu.Lock() + mc.conn = ws + mc.mu.Unlock() }) mc.srv = httptest.NewServer(mux) - // Wait for connection from client (will be set after client.Connect()) - go func() { - mc.conn = <-connCh - }() - t.Cleanup(func() { - if mc.conn != nil { - mc.conn.Close() + if c := mc.currentConn(); c != nil { + c.Close() } mc.srv.Close() }) @@ -76,9 +78,15 @@ func (mc *mockCommander) wsURL() string { return "ws" + strings.TrimPrefix(mc.srv.URL, "http") + "/ws" } +func (mc *mockCommander) currentConn() *websocket.Conn { + mc.mu.Lock() + defer mc.mu.Unlock() + return mc.conn +} + func (mc *mockCommander) waitForConnection() { for i := 0; i < 50; i++ { - if mc.conn != nil { + if mc.currentConn() != nil { return } time.Sleep(10 * time.Millisecond) @@ -86,10 +94,24 @@ func (mc *mockCommander) waitForConnection() { mc.t.Fatal("timed out waiting for WS connection") } +// waitForNewConnection blocks until a connection distinct from prev arrives +// (used to observe a reconnect), and returns it. +func (mc *mockCommander) waitForNewConnection(prev *websocket.Conn) *websocket.Conn { + for i := 0; i < 200; i++ { + if c := mc.currentConn(); c != nil && c != prev { + return c + } + time.Sleep(10 * time.Millisecond) + } + mc.t.Fatal("timed out waiting for a new WS connection") + return nil +} + func (mc *mockCommander) readEnvelope() *protocol.Envelope { mc.t.Helper() - mc.conn.SetReadDeadline(time.Now().Add(5 * time.Second)) - _, msg, err := mc.conn.ReadMessage() + conn := mc.currentConn() + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + _, msg, err := conn.ReadMessage() if err != nil { mc.t.Fatalf("read from client: %v", err) } @@ -106,7 +128,9 @@ func (mc *mockCommander) sendEnvelope(env *protocol.Envelope) { if err != nil { mc.t.Fatalf("marshal: %v", err) } - if err := mc.conn.WriteMessage(websocket.TextMessage, data); err != nil { + mc.writeMu.Lock() + defer mc.writeMu.Unlock() + if err := mc.currentConn().WriteMessage(websocket.TextMessage, data); err != nil { mc.t.Fatalf("write: %v", err) } } @@ -195,6 +219,79 @@ func TestClientConnectAndRegister(t *testing.T) { } } +// TestClientReconnectStopsStaleWritePump exercises the reconnect path that +// previously panicked with "concurrent write to websocket connection": a +// reconnect must tear down the prior connection's pumps before the new ones +// start. Run under -race to also catch the shared-field data race. +func TestClientReconnectStopsStaleWritePump(t *testing.T) { + mc := newMockCommander(t) + cfg := testConfig(mc.wsURL()) + stores := newTestBundle(t) + client := wsbridge.NewClient(cfg, true, "", ".", stores, "1.0.0") + defer client.Close() + + // Answers one register handshake on the current connection. + ackRegister := func() { + env := mc.readEnvelope() + if env.Type != protocol.TypeRegister { + t.Errorf("expected register, got %s", env.Type) + return + } + resp, _ := protocol.NewResponse(env.RequestID, protocol.TypeRegisterAck, &protocol.RegisterAckPayload{ + InstanceID: "inst-1", + Accepted: true, + }) + mc.sendEnvelope(resp) + } + + // First connection. + go func() { + mc.waitForConnection() + ackRegister() + }() + if err := client.Connect(); err != nil { + t.Fatalf("connect 1: %v", err) + } + conn1 := mc.currentConn() + + // Reconnect — a second Connect swaps in a new socket. Before the fix the + // old writePump kept writing to the shared c.ws field (now the new conn), + // racing the new writePump. + go func() { + mc.waitForNewConnection(conn1) + ackRegister() + }() + if err := client.Connect(); err != nil { + t.Fatalf("connect 2 (reconnect): %v", err) + } + conn2 := mc.currentConn() + if conn2 == conn1 { + t.Fatal("expected a new connection after reconnect") + } + + // Give the stale pump a moment to exit on its quit signal, then drive + // write traffic — it must flow over conn2 with no panic/race. + time.Sleep(50 * time.Millisecond) + for i := 0; i < 3; i++ { + evEnv, _ := protocol.NewEvent(protocol.TypeMissionEvent, &protocol.MissionEventPayload{ + MissionID: "m", + EventType: protocol.EventMissionStarted, + }) + if err := client.SendEvent(evEnv); err != nil { + t.Fatalf("send event after reconnect: %v", err) + } + if got := mc.readEnvelope(); got.Type != protocol.TypeMissionEvent { + t.Errorf("expected mission_event over new connection, got %s", got.Type) + } + } + + // The original connection should have been closed by the reconnect. + conn1.SetReadDeadline(time.Now().Add(time.Second)) + if _, _, err := conn1.ReadMessage(); err == nil { + t.Error("expected the old connection to be closed after reconnect") + } +} + func TestClientHandlesGetConfig(t *testing.T) { mc := newMockCommander(t) cfg := testConfig(mc.wsURL())