diff --git a/subagents/learn/task-coordination.mdx b/subagents/learn/task-coordination.mdx index 191814cd..3456bbb0 100644 --- a/subagents/learn/task-coordination.mdx +++ b/subagents/learn/task-coordination.mdx @@ -117,10 +117,10 @@ Let's build a debate system where a moderator dispatches a topic to three worker Each worker runs its own LLM pipeline. The LLM response arrives asynchronously through an event handler, so the worker tracks the current task ID until it has something to respond with: ```python -from pipecat_subagents.agents import LLMAgent +from pipecat_subagents.agents import LLMContextAgent from pipecat_subagents.bus.messages import BusTaskRequestMessage -class DebateWorker(LLMAgent): +class DebateWorker(LLMContextAgent): def __init__(self, name, *, bus, role): super().__init__(name, bus=bus) self._role = role @@ -132,12 +132,10 @@ class DebateWorker(LLMAgent): settings=OpenAILLMSettings(system_instruction=f"You argue as a {self._role}."), ) - async def build_pipeline(self) -> Pipeline: - llm = self.build_llm() - context = LLMContext() - user_agg, assistant_agg = LLMContextAggregatorPair(context) + async def on_ready(self) -> None: + await super().on_ready() - @assistant_agg.event_handler("on_assistant_turn_stopped") + @self.assistant_aggregator.event_handler("on_assistant_turn_stopped") async def on_assistant_turn_stopped(aggregator, message): text = message.content if self._current_task_id: @@ -145,8 +143,6 @@ class DebateWorker(LLMAgent): await self.send_task_response(task_id, {"role": self._role, "text": text}) self._current_task_id = "" - return Pipeline([user_agg, llm, assistant_agg]) - async def on_task_request(self, message: BusTaskRequestMessage) -> None: await super().on_task_request(message) self._current_task_id = message.task_id @@ -158,6 +154,10 @@ class DebateWorker(LLMAgent): ) ``` + + `LLMContextAgent` extends `LLMAgent` with a built-in `LLMContext` and aggregator pair. It automatically builds the pipeline as `[user_aggregator, llm, assistant_aggregator]`, so you don't need to wire the context plumbing yourself. Access the aggregators via `self.user_aggregator` and `self.assistant_aggregator` in `on_ready()` or later hooks. + + The worker: 1. Receives a task request and stores the `task_id`