Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions subagents/learn/task-coordination.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ Let's build a debate system where a moderator dispatches a topic to three worker
Each worker runs its own LLM pipeline. The LLM response arrives asynchronously through an event handler, so the worker tracks the current task ID until it has something to respond with:

```python
from pipecat_subagents.agents import LLMAgent
from pipecat_subagents.agents import LLMContextAgent
from pipecat_subagents.bus.messages import BusTaskRequestMessage

class DebateWorker(LLMAgent):
class DebateWorker(LLMContextAgent):
def __init__(self, name, *, bus, role):
super().__init__(name, bus=bus)
self._role = role
Expand All @@ -132,21 +132,17 @@ class DebateWorker(LLMAgent):
settings=OpenAILLMSettings(system_instruction=f"You argue as a {self._role}."),
)

async def build_pipeline(self) -> Pipeline:
llm = self.build_llm()
context = LLMContext()
user_agg, assistant_agg = LLMContextAggregatorPair(context)
async def on_ready(self) -> None:
await super().on_ready()

@assistant_agg.event_handler("on_assistant_turn_stopped")
@self.assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message):
text = message.content
if self._current_task_id:
task_id = self._current_task_id
await self.send_task_response(task_id, {"role": self._role, "text": text})
self._current_task_id = ""

return Pipeline([user_agg, llm, assistant_agg])

async def on_task_request(self, message: BusTaskRequestMessage) -> None:
await super().on_task_request(message)
self._current_task_id = message.task_id
Expand All @@ -158,6 +154,10 @@ class DebateWorker(LLMAgent):
)
```

<Note>
`LLMContextAgent` extends `LLMAgent` with a built-in `LLMContext` and aggregator pair. It automatically builds the pipeline as `[user_aggregator, llm, assistant_aggregator]`, so you don't need to wire the context plumbing yourself. Access the aggregators via `self.user_aggregator` and `self.assistant_aggregator` in `on_ready()` or later hooks.
</Note>

The worker:

1. Receives a task request and stores the `task_id`
Expand Down
Loading