From 9a316b57d82f588ea9d72cbc588f70807753f63e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 11 May 2026 16:15:03 +0000 Subject: [PATCH] docs: update subagents docs for PgmqBus (PR #21) - Add PgmqBus section to api-reference/pipecat-subagents/bus.mdx - Update fundamentals/agent-bus.mdx to list PgmqBus - Update learn/distributed-agents.mdx with PgmqBus setup guide --- api-reference/pipecat-subagents/bus.mdx | 60 +++++++++++++++++++++++++ subagents/fundamentals/agent-bus.mdx | 27 ++++++++++- subagents/learn/distributed-agents.mdx | 34 ++++++++++++-- 3 files changed, 117 insertions(+), 4 deletions(-) diff --git a/api-reference/pipecat-subagents/bus.mdx b/api-reference/pipecat-subagents/bus.mdx index de68539c..1fb5d237 100644 --- a/api-reference/pipecat-subagents/bus.mdx +++ b/api-reference/pipecat-subagents/bus.mdx @@ -131,6 +131,66 @@ runner = AgentRunner(bus=bus) The Redis pub/sub channel name. +## PgmqBus + +Distributed agent bus backed by PGMQ (PostgreSQL Message Queue) for cross-process communication. Each instance creates its own queue and broadcasts messages to all peer queues sharing the same channel prefix. + +Requires the pgmq extra: `uv add "pipecat-ai-subagents[pgmq]"` + +```python +from pgmq.async_queue import PGMQueue +from pipecat_subagents.bus.network.pgmq import PgmqBus + +pgmq = PGMQueue( + host="aws-0-us-east-1.pooler.supabase.com", + port="5432", + database="postgres", + username="postgres.", + password="...", + pool_size=4, +) +await pgmq.init() +bus = PgmqBus(pgmq=pgmq, channel="pipecat_acme") +runner = AgentRunner(bus=bus) +``` + + + Prefer the session-mode pooler (e.g. port 5432 in Supabase) when available. Transaction-mode pooling (e.g. port 6543) works in practice but may log benign warnings. The connection pool must allow at least 2 concurrent connections (4+ recommended under load). + + +### Configuration + + + An initialized `PGMQueue` client. Must call `await pgmq.init()` before passing. + + + + The + [`MessageSerializer`](/api-reference/pipecat-subagents/serializers#messageserializer) + for encoding/decoding messages. Defaults to + [`JSONMessageSerializer`](/api-reference/pipecat-subagents/serializers#jsonmessageserializer). + + + + Channel prefix for queue names. Sanitized to alphanumeric + underscore. + + + + Seconds a read message stays invisible before redelivery. + + + + Maximum messages to fetch per read. + + + + Long-poll check interval in milliseconds. + + + + Maximum seconds the reader blocks per poll cycle. + + ## BusBridgeProcessor Bidirectional mid-pipeline bridge between a Pipecat pipeline and the bus. Placed in a transport or session agent's pipeline to exchange frames with other agents via the `AgentBus`. diff --git a/subagents/fundamentals/agent-bus.mdx b/subagents/fundamentals/agent-bus.mdx index 06501163..c3579d56 100644 --- a/subagents/fundamentals/agent-bus.mdx +++ b/subagents/fundamentals/agent-bus.mdx @@ -11,7 +11,7 @@ Think of it as an internal event bus -- agents publish messages and other agents ## Bus implementations -Pipecat Subagents provides two bus implementations: +Pipecat Subagents provides three bus implementations: ### AsyncQueueBus (local) @@ -42,6 +42,31 @@ All processes that share the same Redis channel can exchange messages. The progr `RedisBus` requires the `redis` extra: `uv add "pipecat-ai-subagents[redis]"` +### PgmqBus (distributed) + +An alternative distributed bus backed by PGMQ (PostgreSQL Message Queue). Each instance creates its own queue and broadcasts to peer queues. + +```python +from pgmq.async_queue import PGMQueue +from pipecat_subagents.bus.network.pgmq import PgmqBus + +pgmq = PGMQueue( + host="aws-0-us-east-1.pooler.supabase.com", + port="5432", + database="postgres", + username="postgres.", + password="...", + pool_size=4, +) +await pgmq.init() +bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app") +runner = AgentRunner(bus=bus) +``` + + + `PgmqBus` requires the `pgmq` extra: `uv add "pipecat-ai-subagents[pgmq]"` + + ## Message types Messages on the bus fall into three categories: diff --git a/subagents/learn/distributed-agents.mdx b/subagents/learn/distributed-agents.mdx index 9a0d1e48..98150f5c 100644 --- a/subagents/learn/distributed-agents.mdx +++ b/subagents/learn/distributed-agents.mdx @@ -13,7 +13,11 @@ Distributed agents are agents connected to the **same bus** but running in diffe Agents](/subagents/learn/proxy-agents) instead. -## Setting up RedisBus +## Setting up a distributed bus + +Pipecat Subagents provides two distributed bus implementations: RedisBus and PgmqBus. Choose based on your infrastructure. + +### RedisBus Each process creates its own `AgentRunner` with a `RedisBus` connected to the same Redis channel: @@ -31,6 +35,30 @@ All runners sharing the same `channel` can exchange messages. Agents discover ea Install the Redis extra: `uv add "pipecat-ai-subagents[redis]"` +### PgmqBus + +Alternatively, use `PgmqBus` backed by PostgreSQL Message Queue: + +```python +from pgmq.async_queue import PGMQueue +from pipecat_subagents.bus.network.pgmq import PgmqBus +from pipecat_subagents.runner import AgentRunner + +pgmq = PGMQueue( + host="aws-0-us-east-1.pooler.supabase.com", + port="5432", + database="postgres", + username="postgres.", + password="...", + pool_size=4, +) +await pgmq.init() +bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app") +runner = AgentRunner(bus=bus, handle_sigint=True) +``` + +Install the PGMQ extra: `uv add "pipecat-ai-subagents[pgmq]"` + ## Example: distributed handoff This example splits the two-agent handoff across separate processes. The main agent handles transport on one machine, and LLM agents run independently on other machines. @@ -157,6 +185,6 @@ Runners exchange registry information automatically over the shared bus. To get ## Considerations -- **Latency**: Redis adds network overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the Redis instance. -- **Serialization**: `RedisBus` serializes messages to JSON. Custom frame types need to be registered with the serializer. +- **Latency**: Network buses add overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the bus server (Redis or PostgreSQL). +- **Serialization**: Both `RedisBus` and `PgmqBus` serialize messages to JSON. Custom frame types need to be registered with the serializer. - **Single channel**: All agents on the same channel see all messages. Use different channels for different sessions or applications.