From 4f90ed91d237d6e0cbc3dd82ec4c1e71ea9e1728 Mon Sep 17 00:00:00 2001 From: Dhruvkumar-Microsoft Date: Fri, 13 Mar 2026 10:38:31 +0530 Subject: [PATCH 1/4] updated lifecycle --- src/backend/v4/magentic_agents/common/lifecycle.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/backend/v4/magentic_agents/common/lifecycle.py b/src/backend/v4/magentic_agents/common/lifecycle.py index b38e31eed..8c4ccb521 100644 --- a/src/backend/v4/magentic_agents/common/lifecycle.py +++ b/src/backend/v4/magentic_agents/common/lifecycle.py @@ -5,8 +5,7 @@ from typing import Any, Optional from agent_framework import ( - ChatAgent, - HostedMCPTool, + Agent, MCPStreamableHTTPTool, ) @@ -46,8 +45,8 @@ def __init__( ) -> None: self._stack: AsyncExitStack | None = None self.mcp_cfg: MCPConfig | None = mcp - self.mcp_tool: HostedMCPTool | None = None - self._agent: ChatAgent | None = None + self.mcp_tool: MCPStreamableHTTPTool | None = None + self._agent: Agent | None = None self.team_service: TeamService | None = team_service self.team_config: TeamConfiguration | None = team_config self.client: Optional[AgentsClient] = None @@ -155,9 +154,9 @@ def get_chat_client(self) -> AzureAIClient: """ if ( self._agent - and self._agent.chat_client + and self._agent.client ): - return self._agent.chat_client # type: ignore + return self._agent.client # type: ignore chat_client = AzureAIClient( project_endpoint=self.project_endpoint, agent_name=self.agent_name, From 4856b0b40b395af1819724f6e950cdf03b9a3600 Mon Sep 17 00:00:00 2001 From: Dhruvkumar-Microsoft Date: Fri, 13 Mar 2026 18:17:36 +0530 Subject: [PATCH 2/4] updated to latest framework --- src/backend/pyproject.toml | 7 +- src/backend/uv.lock | 44 ++-- src/backend/v4/callbacks/response_handlers.py | 18 +- .../v4/magentic_agents/foundry_agent.py | 57 +++-- src/backend/v4/magentic_agents/proxy_agent.py | 118 +++++----- .../orchestration/human_approval_manager.py | 8 +- .../v4/orchestration/orchestration_manager.py | 221 ++++++++++-------- 7 files changed, 245 insertions(+), 228 deletions(-) diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index 06dcf2ba6..61e34175a 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.11" dependencies = [ "azure-ai-evaluation==1.11.0", "azure-ai-inference==1.0.0b9", - "azure-ai-projects==2.0.0b3", + "azure-ai-projects==2.0.0", "azure-cosmos==4.9.0", "azure-identity==1.24.0", "azure-monitor-events-extension==0.1.0", @@ -32,8 +32,9 @@ dependencies = [ "mcp==1.26.0", "werkzeug==3.1.5", "azure-core==1.38.0", - "agent-framework-azure-ai==1.0.0b260130", - "agent-framework-core==1.0.0b260130", + "agent-framework-azure-ai==1.0.0rc4", + "agent-framework-core==1.0.0rc4", + "agent-framework-orchestrations==1.0.0b260311", "urllib3==2.6.3", "protobuf==5.29.6", "cryptography==46.0.5", diff --git a/src/backend/uv.lock b/src/backend/uv.lock index e9065f1c6..e68f3e097 100644 --- a/src/backend/uv.lock +++ b/src/backend/uv.lock @@ -9,24 +9,25 @@ resolution-markers = [ [[package]] name = "agent-framework-azure-ai" -version = "1.0.0b260130" +version = "1.0.0rc4" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "agent-framework-core" }, { name = "aiohttp" }, { name = "azure-ai-agents" }, - { name = "azure-ai-projects" }, + { name = "azure-ai-inference" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ac/ef/69ead4fcd2c21608ce35353a507df23df51872552747f803c43d1d81f612/agent_framework_azure_ai-1.0.0b260130.tar.gz", hash = "sha256:c571275089a801f961370ba824568c8b02143b1a6bb5b1d78b97c6debdf4906f", size = 32723, upload-time = "2026-01-30T18:56:41.07Z" } +sdist = { url = "https://files.pythonhosted.org/packages/a7/d4/2641d0584c5859f0054207d0a726a698d82eb3c8cba1d5f9d6d7fcf785ec/agent_framework_azure_ai-1.0.0rc4.tar.gz", hash = "sha256:c397f1bb74d29be4e5842e0989f2006f981f77f7066533899bf977fc79f6e046", size = 48428, upload-time = "2026-03-11T23:19:30.131Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/72/8f/a1467c352fed5eb6ebb9567109251cc39b5b3ebb5137a2d14c71fea51bc8/agent_framework_azure_ai-1.0.0b260130-py3-none-any.whl", hash = "sha256:87f0248fe6d4f2f4146f0a56a53527af6365d4a377dc2e3d56c37cbb9deae098", size = 38542, upload-time = "2026-01-30T19:01:12.102Z" }, + { url = "https://files.pythonhosted.org/packages/ce/8c/703220347d2a656c0979dbb7e788a851e3cc7e6396ff6402a4606a0d7555/agent_framework_azure_ai-1.0.0rc4-py3-none-any.whl", hash = "sha256:538c6782a06dcb9df0631379b776018b6b0ddb81804d142eb3787c36a42ab2c8", size = 54269, upload-time = "2026-03-11T23:20:04.856Z" }, ] [[package]] name = "agent-framework-core" -version = "1.0.0b260130" +version = "1.0.0rc4" source = { registry = "https://pypi.org/simple" } dependencies = [ + { name = "azure-ai-projects" }, { name = "azure-identity" }, { name = "mcp", extra = ["ws"] }, { name = "openai" }, @@ -35,12 +36,24 @@ dependencies = [ { name = "opentelemetry-semantic-conventions-ai" }, { name = "packaging" }, { name = "pydantic" }, - { name = "pydantic-settings" }, + { name = "python-dotenv" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/4d/39/e508e778219bd6d20e023a6f48235861a639e3cf888776f9e873bbad3c6b/agent_framework_core-1.0.0b260130.tar.gz", hash = "sha256:030a5b2ced796eec6839c2dabad90b4bd1ea33d1026f3ed1813050a56ccfa4ec", size = 301823, upload-time = "2026-01-30T19:01:09.629Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f1/5a/b472f9a57235bb72899151ec5cd3c925825e16018689e0300cb822cf00f8/agent_framework_core-1.0.0rc4.tar.gz", hash = "sha256:f394eb95ae877ae854aa7a3e499f76f34b26102808009a66b264ded89c6b6dbd", size = 302446, upload-time = "2026-03-11T23:19:29.198Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/36/68/afe66c72951a279e0fe048fd5af1e775528cde40dbdab8ec03b42c545df4/agent_framework_core-1.0.0b260130-py3-none-any.whl", hash = "sha256:75b4dd0ca2ae52574d406cf5c9ed7adf63e187379f72fce891743254d83dfd56", size = 348724, upload-time = "2026-01-30T18:56:47.15Z" }, + { url = "https://files.pythonhosted.org/packages/06/d7/89776e7e919e46fd83ae464a416966715f4f40083297d42574e3d45214f6/agent_framework_core-1.0.0rc4-py3-none-any.whl", hash = "sha256:f01a6997be0f5e05853eb6be341dbca692c4e5d6999de5f3e8364296de50635f", size = 348882, upload-time = "2026-03-11T23:19:43.158Z" }, +] + +[[package]] +name = "agent-framework-orchestrations" +version = "1.0.0b260311" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "agent-framework-core" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b3/7f/43aeca0b4d1dc6156539d1723ea3d48599ee10bf660280577593e1441b1b/agent_framework_orchestrations-1.0.0b260311.tar.gz", hash = "sha256:a303a156c066954bbed5b1ac6e7b3dd8049ffe3bbf0c1841f5ab24e97a8f1fd9", size = 55139, upload-time = "2026-03-11T23:19:52.793Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/58/83/ef99c5a45c3d45eeaed1ffcb4f3294fa50f4d19c0f69771693b7d295b0bd/agent_framework_orchestrations-1.0.0b260311-py3-none-any.whl", hash = "sha256:cc7cdebe0abb76208d2c6618d410bf77f0806478dbe25ad1467b27f4f70b8dba", size = 61073, upload-time = "2026-03-11T23:19:38.618Z" }, ] [[package]] @@ -270,7 +283,7 @@ wheels = [ [[package]] name = "azure-ai-projects" -version = "2.0.0b3" +version = "2.0.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "azure-core" }, @@ -278,10 +291,11 @@ dependencies = [ { name = "azure-storage-blob" }, { name = "isodate" }, { name = "openai" }, + { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/24/e0/3512d3f07e9dd2eb4af684387c31598c435bd87833b6a81850972963cb9c/azure_ai_projects-2.0.0b3.tar.gz", hash = "sha256:6d09ad110086e450a47b991ee8a3644f1be97fa3085d5981d543f900d78f4505", size = 431749, upload-time = "2026-01-06T05:31:25.849Z" } +sdist = { url = "https://files.pythonhosted.org/packages/0f/3d/6a7d04f61f3befc74a6f09ad7a0c02e8c701fc6db91ad7151c46da44a902/azure_ai_projects-2.0.0.tar.gz", hash = "sha256:0892f075cf287d747be54c25bea93dc9406ad100d44efc2fdaadb26586ecf4ff", size = 491449, upload-time = "2026-03-06T05:59:51.645Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4e/b6/8fbd4786bb5c0dd19eaff86ddce0fbfb53a6f90d712038272161067a076a/azure_ai_projects-2.0.0b3-py3-none-any.whl", hash = "sha256:3b3048a3ba3904d556ba392b7bd20b6e84c93bb39df6d43a6470cdb0ad08af8c", size = 240717, upload-time = "2026-01-06T05:31:27.716Z" }, + { url = "https://files.pythonhosted.org/packages/20/af/7b218cccab8e22af44844bfc16275b55c1fa48ed494145614b9852950fe6/azure_ai_projects-2.0.0-py3-none-any.whl", hash = "sha256:e655e0e495d0c76077d95cc8e0d606fcdbf3f4dbdf1a8379cbd4bea1e34c401d", size = 236354, upload-time = "2026-03-06T05:59:53.536Z" }, ] [[package]] @@ -438,6 +452,7 @@ source = { virtual = "." } dependencies = [ { name = "agent-framework-azure-ai" }, { name = "agent-framework-core" }, + { name = "agent-framework-orchestrations" }, { name = "aiohttp" }, { name = "azure-ai-evaluation" }, { name = "azure-ai-inference" }, @@ -475,12 +490,13 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "agent-framework-azure-ai", specifier = "==1.0.0b260130" }, - { name = "agent-framework-core", specifier = "==1.0.0b260130" }, + { name = "agent-framework-azure-ai", specifier = "==1.0.0rc4" }, + { name = "agent-framework-core", specifier = "==1.0.0rc4" }, + { name = "agent-framework-orchestrations", specifier = "==1.0.0b260311" }, { name = "aiohttp", specifier = "==3.13.3" }, { name = "azure-ai-evaluation", specifier = "==1.11.0" }, { name = "azure-ai-inference", specifier = "==1.0.0b9" }, - { name = "azure-ai-projects", specifier = "==2.0.0b3" }, + { name = "azure-ai-projects", specifier = "==2.0.0" }, { name = "azure-core", specifier = "==1.38.0" }, { name = "azure-cosmos", specifier = "==4.9.0" }, { name = "azure-identity", specifier = "==1.24.0" }, diff --git a/src/backend/v4/callbacks/response_handlers.py b/src/backend/v4/callbacks/response_handlers.py index 0a817ef94..74f11dbe6 100644 --- a/src/backend/v4/callbacks/response_handlers.py +++ b/src/backend/v4/callbacks/response_handlers.py @@ -8,7 +8,7 @@ import re from typing import Any -from agent_framework import ChatMessage +from agent_framework import Message from v4.config.settings import connection_config from v4.models.messages import ( @@ -64,22 +64,22 @@ def _extract_tool_calls_from_contents(contents: list[Any]) -> list[AgentToolCall def agent_response_callback( agent_id: str, - message: ChatMessage, + message: Message, user_id: str | None = None, ) -> None: """ - Final (non-streaming) agent response callback using agent_framework ChatMessage. + Final (non-streaming) agent response callback using agent_framework Message. """ agent_name = getattr(message, "author_name", None) or agent_id or "Unknown Agent" role = getattr(message, "role", "assistant") - # FIX: Properly extract text from ChatMessage - # ChatMessage has a .text property that concatenates all TextContent items + # FIX: Properly extract text from Message + # Message has a .text property that concatenates all TextContent items text = "" - if isinstance(message, ChatMessage): + if isinstance(message, Message): text = message.text # Use the property directly else: - # Fallback for non-ChatMessage objects + # Fallback for non-Message objects text = str(getattr(message, "text", "")) text = clean_citations(text or "") @@ -125,8 +125,8 @@ async def streaming_agent_response_callback( # If text is None, don't fall back to str(update) as that would show object repr # Just skip if there's no actual text content if chunk_text is None: - # Check if update is a ChatMessage - if isinstance(update, ChatMessage): + # Check if update is a Message + if isinstance(update, Message): chunk_text = update.text or "" elif hasattr(update, "content"): chunk_text = str(update.content) if update.content else "" diff --git a/src/backend/v4/magentic_agents/foundry_agent.py b/src/backend/v4/magentic_agents/foundry_agent.py index 38fd0cc6b..69b85cc21 100644 --- a/src/backend/v4/magentic_agents/foundry_agent.py +++ b/src/backend/v4/magentic_agents/foundry_agent.py @@ -3,13 +3,12 @@ import logging from typing import List, Optional -from agent_framework import (ChatAgent, ChatMessage, HostedCodeInterpreterTool, - Role) +from agent_framework import (Agent, Message, ChatOptions) from agent_framework_azure_ai import \ AzureAIClient # Provided by agent_framework from azure.ai.projects.models import ( PromptAgentDefinition, - AzureAISearchAgentTool, + AzureAISearchTool, AzureAISearchToolResource, AISearchIndexResource, ) @@ -92,17 +91,13 @@ def _is_azure_search_requested(self) -> bool: return False async def _collect_tools(self) -> List: - """Collect tool definitions for ChatAgent (MCP path only).""" + """Collect tool definitions for Agent (MCP path only).""" tools: List = [] - # Code Interpreter (only in MCP path per incompatibility note) + # Code Interpreter is now handled server-side via AzureAIClient agent definition. + # HostedCodeInterpreterTool was removed in rc4. if self.enable_code_interpreter: - try: - code_tool = HostedCodeInterpreterTool() - tools.append(code_tool) - self.logger.info("Added Code Interpreter tool.") - except Exception as ie: - self.logger.error("Code Interpreter tool creation failed: %s", ie) + self.logger.info("Code Interpreter requested — handled server-side by AzureAIClient.") # MCP Tool (from base class) if self.mcp_tool: @@ -121,7 +116,7 @@ async def _create_azure_search_enabled_client(self) -> Optional[AzureAIClient]: This uses the AIProjectClient.agents.create_version() approach with: - PromptAgentDefinition for agent configuration - - AzureAISearchAgentTool with AzureAISearchToolResource for search capability + - AzureAISearchTool with AzureAISearchToolResource for search capability - AISearchIndexResource for index configuration with project_connection_id Requirements: @@ -167,7 +162,7 @@ async def _create_azure_search_enabled_client(self) -> Optional[AzureAIClient]: top_k, ) - # Create agent using create_version with PromptAgentDefinition and AzureAISearchAgentTool + # Create agent using create_version with PromptAgentDefinition and AzureAISearchTool # This approach matches the Knowledge Mining Solution Accelerator pattern try: enhanced_instructions = ( @@ -181,7 +176,7 @@ async def _create_azure_search_enabled_client(self) -> Optional[AzureAIClient]: model=self.model_deployment_name, instructions=enhanced_instructions, tools=[ - AzureAISearchAgentTool( + AzureAISearchTool( azure_ai_search=AzureAISearchToolResource( indexes=[ AISearchIndexResource( @@ -253,37 +248,39 @@ async def _after_open(self) -> None: ) # In Azure Search raw tool path, tools/tool_choice are handled server-side. - self._agent = ChatAgent( + self._agent = Agent( id=self.get_agent_id(), - chat_client=chat_client, + client=chat_client, instructions=self.agent_instructions, name=self.agent_name, description=self.agent_description, - tool_choice="required", # Force usage - temperature=temp, - model_id=self.model_deployment_name, - default_options={"store": False}, # Client-managed conversation to avoid stale tool call IDs across rounds + default_options=ChatOptions( + store=False, + tool_choice="required", + temperature=temp, + ), ) else: # MCP path (also used by RAI agent which has no tools) self.logger.info("Initializing agent in MCP mode.") tools = await self._collect_tools() - self._agent = ChatAgent( + self._agent = Agent( id=self.get_agent_id(), - chat_client=self.get_chat_client(), + client=self.get_chat_client(), instructions=self.agent_instructions, name=self.agent_name, description=self.agent_description, tools=tools if tools else None, - tool_choice="auto" if tools else "none", - temperature=temp, - model_id=self.model_deployment_name, - default_options={"store": False}, # Client-managed conversation to avoid stale tool call IDs across rounds + default_options=ChatOptions( + store=False, + tool_choice="auto" if tools else "none", + temperature=temp, + ), ) - self.logger.info("Initialized ChatAgent '%s'", self.agent_name) + self.logger.info("Initialized Agent '%s'", self.agent_name) except Exception as ex: - self.logger.error("Failed to initialize ChatAgent: %s", ex) + self.logger.error("Failed to initialize Agent: %s", ex) raise # Register agent globally @@ -305,9 +302,9 @@ async def invoke(self, prompt: str): if not self._agent: raise RuntimeError("Agent not initialized; call open() first.") - messages = [ChatMessage(role=Role.USER, text=prompt)] + messages = [Message(role="user", text=prompt)] - async for update in self._agent.run_stream(messages): + async for update in self._agent.run(messages, stream=True): yield update # ------------------------- diff --git a/src/backend/v4/magentic_agents/proxy_agent.py b/src/backend/v4/magentic_agents/proxy_agent.py index 79a84492b..cf1130047 100644 --- a/src/backend/v4/magentic_agents/proxy_agent.py +++ b/src/backend/v4/magentic_agents/proxy_agent.py @@ -13,18 +13,18 @@ import logging import time import uuid -from typing import Any, AsyncIterable +from typing import Any, AsyncIterable, Awaitable from agent_framework import ( AgentResponse, AgentResponseUpdate, BaseAgent, - ChatMessage, - Role, + Message, Content, UsageDetails, - AgentThread, + AgentSession, ) +from agent_framework._types import ResponseStream from v4.config.settings import connection_config, orchestration_config from v4.models.messages import ( @@ -42,7 +42,7 @@ class ProxyAgent(BaseAgent): A human-in-the-loop clarification agent extending agent_framework's BaseAgent. This agent mediates human clarification requests rather than using an LLM. - It follows the agent_framework protocol with run() and run_stream() methods. + It follows the agent_framework protocol with run() method (stream=True/False). """ def __init__( @@ -68,79 +68,65 @@ def __init__( # AgentProtocol implementation # --------------------------- - def get_new_thread(self, **kwargs: Any) -> AgentThread: + def create_session(self, *, session_id: str | None = None, **kwargs: Any) -> AgentSession: """ - Create a new thread for ProxyAgent conversations. + Create a new session for ProxyAgent conversations. Required by AgentProtocol for workflow integration. Args: - **kwargs: Additional keyword arguments for thread creation + session_id: Optional session ID + **kwargs: Additional keyword arguments for session creation Returns: - A new AgentThread instance + A new AgentSession instance """ - return AgentThread(**kwargs) + return AgentSession(session_id=session_id, **kwargs) - async def run( + def run( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | Message | list[str] | list[Message] | None = None, *, - thread: AgentThread | None = None, + stream: bool = False, + session: AgentSession | None = None, **kwargs: Any, - ) -> AgentResponse: + ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]: """ - Get complete clarification response (non-streaming). + Run clarification (streaming or non-streaming). - Args: - messages: The message(s) requiring clarification - thread: Optional conversation thread - kwargs: Additional keyword arguments - - Returns: - AgentResponse with the clarification + Must be a regular def (not async def) to match the Agent.run() contract. + The framework calls agent.run() without await and expects either + a ResponseStream (stream=True) or an Awaitable (stream=False). """ - # Collect all streaming updates - response_messages: list[ChatMessage] = [] - response_id = str(uuid.uuid4()) + if stream: + return ResponseStream( + self._invoke_stream_internal(messages, session, **kwargs), + finalizer=lambda updates: AgentResponse.from_updates(updates), + ) - async for update in self.run_stream(messages, thread=thread, **kwargs): - if update.contents: - response_messages.append( - ChatMessage( - role=update.role or Role.ASSISTANT, - contents=update.contents, + async def _run_non_streaming() -> AgentResponse: + response_messages: list[Message] = [] + response_id = str(uuid.uuid4()) + + async for update in self._invoke_stream_internal(messages, session, **kwargs): + if update.contents: + response_messages.append( + Message( + role=update.role or "assistant", + contents=update.contents, + ) ) - ) - - return AgentResponse( - messages=response_messages, - response_id=response_id, - ) - def run_stream( - self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, - *, - thread: AgentThread | None = None, - **kwargs: Any, - ) -> AsyncIterable[AgentResponseUpdate]: - """ - Stream clarification process with human interaction. - - Args: - messages: The message(s) requiring clarification - thread: Optional conversation thread - kwargs: Additional keyword arguments + return AgentResponse( + messages=response_messages, + response_id=response_id, + ) - Yields: - AgentRunResponseUpdate objects with clarification progress - """ - return self._invoke_stream_internal(messages, thread, **kwargs) + return _run_non_streaming() async def _invoke_stream_internal( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None, - thread: AgentThread | None, + messages: str | Message | list[str] | list[Message] | None, + session: AgentSession | None, **kwargs: Any, ) -> AsyncIterable[AgentResponseUpdate]: """ @@ -154,8 +140,8 @@ async def _invoke_stream_internal( message_text = self._extract_message_text(messages) logger.info( - "ProxyAgent: Requesting clarification (thread=%s, user=%s)", - "present" if thread else "None", + "ProxyAgent: Requesting clarification (session=%s, user=%s)", + "present" if session else "None", self.user_id ) logger.debug("ProxyAgent: Message text: %s", message_text[:100]) @@ -204,10 +190,10 @@ async def _invoke_stream_internal( message_id = str(uuid.uuid4()) # Yield final assistant text update with explicit text content - # New API: use Content.from_text() or pass text directly to AgentResponseUpdate + # New API: use Content.from_text() to wrap text in AgentResponseUpdate text_update = AgentResponseUpdate( - role=Role.ASSISTANT, - text=synthetic_reply, # New API accepts text directly + role="assistant", + contents=[Content.from_text(text=synthetic_reply)], author_name=self.name, response_id=response_id, message_id=message_id, @@ -219,7 +205,7 @@ async def _invoke_stream_internal( # Yield synthetic usage update for consistency # Use same message_id to indicate this is part of the same message usage_update = AgentResponseUpdate( - role=Role.ASSISTANT, + role="assistant", contents=[ Content.from_usage( UsageDetails( @@ -244,14 +230,14 @@ async def _invoke_stream_internal( # --------------------------- def _extract_message_text( - self, messages: str | ChatMessage | list[str] | list[ChatMessage] | None + self, messages: str | Message | list[str] | list[Message] | None ) -> str: """Extract text from various message formats.""" if messages is None: return "" if isinstance(messages, str): return messages - if isinstance(messages, ChatMessage): + if isinstance(messages, Message): # Use the .text property which concatenates all TextContent items return messages.text or "" if isinstance(messages, list): @@ -259,7 +245,7 @@ def _extract_message_text( return "" if isinstance(messages[0], str): return " ".join(messages) - if isinstance(messages[0], ChatMessage): + if isinstance(messages[0], Message): # Use .text property for each message return " ".join(msg.text or "" for msg in messages) return str(messages) diff --git a/src/backend/v4/orchestration/human_approval_manager.py b/src/backend/v4/orchestration/human_approval_manager.py index 7f33c9ac4..c2eeedb28 100644 --- a/src/backend/v4/orchestration/human_approval_manager.py +++ b/src/backend/v4/orchestration/human_approval_manager.py @@ -8,8 +8,8 @@ from typing import Any, Optional import v4.models.messages as messages -from agent_framework import ChatMessage -from agent_framework._workflows._magentic import ( +from agent_framework import Message +from agent_framework_orchestrations._magentic import ( MagenticContext, StandardMagenticManager, ORCHESTRATOR_FINAL_ANSWER_PROMPT, @@ -87,7 +87,7 @@ def __init__(self, user_id: str, agent, *args, **kwargs): kwargs["final_answer_prompt"] = ORCHESTRATOR_FINAL_ANSWER_PROMPT + final_append # Override progress ledger prompt to discourage re-calling agents - from agent_framework._workflows._magentic import ORCHESTRATOR_PROGRESS_LEDGER_PROMPT + from agent_framework_orchestrations._magentic import ORCHESTRATOR_PROGRESS_LEDGER_PROMPT kwargs["progress_ledger_prompt"] = ORCHESTRATOR_PROGRESS_LEDGER_PROMPT + progress_append self.current_user_id = user_id @@ -292,7 +292,7 @@ async def _wait_for_user_approval( async def prepare_final_answer( self, magentic_context: MagenticContext - ) -> ChatMessage: + ) -> Message: """ Override to ensure final answer is prepared after all steps are executed. """ diff --git a/src/backend/v4/orchestration/orchestration_manager.py b/src/backend/v4/orchestration/orchestration_manager.py index d38748d83..752df3039 100644 --- a/src/backend/v4/orchestration/orchestration_manager.py +++ b/src/backend/v4/orchestration/orchestration_manager.py @@ -9,15 +9,19 @@ # agent_framework imports from agent_framework_azure_ai import AzureAIClient from agent_framework import ( - ChatAgent, - ChatMessage, - WorkflowOutputEvent, - MagenticBuilder, + Agent, + AgentResponseUpdate, + ChatOptions, + Message, InMemoryCheckpointStorage, - AgentRunUpdateEvent, + WorkflowEvent, +) +from agent_framework_orchestrations import MagenticBuilder +from agent_framework_orchestrations._base_group_chat_orchestrator import ( GroupChatRequestSentEvent, GroupChatResponseReceivedEvent, - ExecutorCompletedEvent, +) +from agent_framework_orchestrations._magentic import ( MagenticOrchestratorEvent, MagenticProgressLedger, ) @@ -28,6 +32,7 @@ from common.database.database_base import DatabaseBase from v4.common.services.team_service import TeamService +import time as _time from v4.callbacks.response_handlers import ( agent_response_callback, streaming_agent_response_callback, @@ -52,7 +57,7 @@ def _extract_response_text(self, data) -> str: Extract text content from various agent_framework response types. Handles: - - ChatMessage: Extract .text + - Message: Extract .text - AgentResponse: Extract .text - AgentExecutorResponse: Extract from agent_response.text or full_conversation[-1].text - List of any of the above @@ -60,8 +65,8 @@ def _extract_response_text(self, data) -> str: if data is None: return "" - # Direct ChatMessage - if isinstance(data, ChatMessage): + # Direct Message + if isinstance(data, Message): return data.text or "" # Has .text attribute directly (AgentResponse, etc.) @@ -77,7 +82,7 @@ def _extract_response_text(self, data) -> str: # Fallback to last message in full_conversation if hasattr(data, "full_conversation") and data.full_conversation: last_msg = data.full_conversation[-1] - if isinstance(last_msg, ChatMessage) and last_msg.text: + if isinstance(last_msg, Message) and last_msg.text: return last_msg.text # List of items - could be AgentExecutorResponse, ChatMessage, etc. @@ -140,15 +145,15 @@ async def init_orchestration( credential=credential, ) - # New API: Create a ChatAgent to wrap the chat client for the manager - manager_agent = ChatAgent( - chat_client=chat_client, + # New API: Create an Agent to wrap the chat client for the manager + manager_agent = Agent( + client=chat_client, name="MagenticManager", - default_options={"store": False}, # Client-managed conversation to avoid stale tool call IDs across rounds + default_options=ChatOptions(store=False), # Client-managed conversation to avoid stale tool call IDs across rounds ) cls.logger.info( - "Created AzureAIClient and manager ChatAgent for orchestration with model '%s' at endpoint '%s'", + "Created AzureAIClient and manager Agent for orchestration with model '%s' at endpoint '%s'", team_config.deployment_name, config.AZURE_AI_PROJECT_ENDPOINT, ) @@ -163,6 +168,8 @@ async def init_orchestration( user_id=user_id, agent=manager_agent, # New API: pass agent instead of chat_client max_round_count=orchestration_config.max_rounds, + max_stall_count=3, + max_reset_count=2 ) cls.logger.info( "Created HumanApprovalMagenticManager for user '%s' with max_rounds=%d", @@ -180,12 +187,12 @@ async def init_orchestration( if not name: name = f"agent_{len(participants) + 1}" - # Extract the inner ChatAgent for wrapper templates - # FoundryAgentTemplate wrap a ChatAgent in self._agent + # Extract the inner Agent for wrapper templates + # FoundryAgentTemplate wrap an Agent in self._agent # ProxyAgent directly extends BaseAgent and can be used as-is if hasattr(ag, "_agent") and ag._agent is not None: # This is a wrapper (FoundryAgentTemplate) - # Use the inner ChatAgent which implements AgentProtocol + # Use the inner Agent which implements AgentProtocol participants[name] = ag._agent cls.logger.debug("Added participant '%s' (extracted inner agent)", name) else: @@ -201,15 +208,13 @@ async def init_orchestration( participant_list = list(participants.values()) cls.logger.info("Participants for workflow: %s", list(participants.keys())) - builder = ( - MagenticBuilder() - .participants(participant_list) # New SDK: pass as list - .with_manager( - manager=manager, # Pass manager instance (extends StandardMagenticManager) - max_round_count=orchestration_config.max_rounds, - max_stall_count=0, # CRITICAL: Prevent re-calling agents when stalled (default is 3!) - ) - .with_checkpointing(storage) + builder = MagenticBuilder( + participants=participant_list, + manager=manager, + checkpoint_storage=storage, + max_round_count=orchestration_config.max_rounds, + max_stall_count=3, # CRITICAL: Prevent re-calling agents when stalled (default is 3!) + intermediate_outputs=True, # Required: yield agent streaming output events, not just orchestrator output ) # Build workflow @@ -372,111 +377,123 @@ async def run_orchestration(self, user_id: str, input_task) -> None: # Track how many times each agent is called (for debugging duplicate calls) agent_call_counts: dict = {} + # Buffer streamed text per-agent so we can emit a complete AGENT_MESSAGE + agent_stream_buffers: dict[str, str] = {} try: - # Execute workflow using run_stream with task as positional parameter + # Execute workflow using run() with stream=True # The execution settings are configured in the manager/client final_output: str | None = None self.logger.info("Starting workflow execution...") last_message_id: str | None = None - async for event in workflow.run_stream(task_text): + async for event in workflow.run(task_text, stream=True): try: - # Only log non-streaming events (reduce noise) - event_type_name = type(event).__name__ - if event_type_name != "AgentRunUpdateEvent": - self.logger.info("[EVENT] %s", event_type_name) + # WorkflowEvent has a .type field (string) instead of specific event classes + event_type = event.type if hasattr(event, "type") else type(event).__name__ + if event_type not in ("status", "output"): + self.logger.info("[EVENT] type=%s", event_type) # Handle orchestrator events (plan, progress ledger) - if isinstance(event, MagenticOrchestratorEvent): + if event_type == "magentic_orchestrator": self.logger.info( - "[Magentic Orchestrator Event] Type: %s", - event.event_type.name + "[Magentic Orchestrator Event]" ) - if isinstance(event.data, ChatMessage): + if isinstance(event.data, Message): self.logger.info("Plan message: %s", event.data.text[:200] if event.data.text else "") elif isinstance(event.data, MagenticProgressLedger): self.logger.info("Progress ledger received") - # Handle agent streaming/updates (replaces MagenticAgentDeltaEvent and MagenticAgentMessageEvent) - elif isinstance(event, AgentRunUpdateEvent): - message_id = event.data.message_id if hasattr(event.data, 'message_id') else None - executor_id = event.executor_id - - # Stream the update - try: - await streaming_agent_response_callback( - executor_id, - event.data, # Pass the data object - False, # Not final yet - user_id, - ) - except Exception as e: - self.logger.error( - "Error in streaming callback for agent %s: %s", - executor_id, e + # Handle group chat request sent + elif event_type == "group_chat": + # Check if this is a request or response via the data type + if isinstance(event.data, GroupChatRequestSentEvent): + agent_name = event.data.participant_name + agent_call_counts[agent_name] = agent_call_counts.get(agent_name, 0) + 1 + call_num = agent_call_counts[agent_name] + + self.logger.info( + "[REQUEST SENT (round %d)] to agent: %s (call #%d)", + event.data.round_index, + agent_name, + call_num ) - # Track message for formatting - if message_id != last_message_id: - last_message_id = message_id + if call_num > 1: + self.logger.warning("Agent '%s' called %d times", agent_name, call_num) - # Handle group chat request sent - elif isinstance(event, GroupChatRequestSentEvent): - agent_name = event.participant_name - agent_call_counts[agent_name] = agent_call_counts.get(agent_name, 0) + 1 - call_num = agent_call_counts[agent_name] - - self.logger.info( - "[REQUEST SENT (round %d)] to agent: %s (call #%d)", - event.round_index, - agent_name, - call_num + elif isinstance(event.data, GroupChatResponseReceivedEvent): + agent_name = event.data.participant_name + self.logger.info( + "[RESPONSE RECEIVED (round %d)] from agent: %s", + event.data.round_index, + agent_name + ) + # Flush accumulated streaming content as a complete AGENT_MESSAGE + buffered = agent_stream_buffers.pop(agent_name, "") + if buffered: + from v4.callbacks.response_handlers import clean_citations + from v4.models.messages import AgentMessage + cleaned = clean_citations(buffered) + if cleaned.strip(): + agent_msg = AgentMessage( + agent_name=agent_name, + timestamp=str(_time.time()), + content=cleaned, + ) + await connection_config.send_status_update_async( + agent_msg, + user_id, + message_type=WebsocketMessageType.AGENT_MESSAGE, + ) + self.logger.info( + "Sent AGENT_MESSAGE for '%s' (%d chars)", + agent_name, len(cleaned) + ) + + # Handle executor completed - just log, don't send to UI + elif event_type == "executor_completed": + self.logger.debug( + "[EXECUTOR COMPLETED] agent: %s", + getattr(event, "executor_id", "unknown") ) + # Don't send to UI here - group_chat events already handle agent messages - if call_num > 1: - self.logger.warning("Agent '%s' called %d times", agent_name, call_num) - - # Handle group chat response received - THIS IS WHERE AGENT RESPONSES COME - elif isinstance(event, GroupChatResponseReceivedEvent): + # Handle workflow output event (streaming chunks AND final result) + elif event_type == "output": + executor_id = getattr(event, "executor_id", None) + output_data = event.data self.logger.info( - "[RESPONSE RECEIVED (round %d)] from agent: %s", - event.round_index, - event.participant_name + "[OUTPUT] executor=%s data_type=%s", + executor_id, type(output_data).__name__ ) - # Send the agent response to the UI - if event.data: - response_text = self._extract_response_text(event.data) - - if response_text: - self.logger.info("Sending agent response to UI from %s", event.participant_name) - agent_response_callback( - event.participant_name, - ChatMessage(role="assistant", text=response_text), + + # Streaming chunk from an agent executor + if isinstance(output_data, AgentResponseUpdate) and executor_id: + chunk_text = output_data.text or "" + if chunk_text: + agent_stream_buffers[executor_id] = agent_stream_buffers.get(executor_id, "") + chunk_text + try: + await streaming_agent_response_callback( + executor_id, + output_data, + False, user_id, ) - - # Handle executor completed - just log, don't send to UI (GroupChatResponseReceivedEvent handles that) - elif isinstance(event, ExecutorCompletedEvent): - self.logger.debug( - "[EXECUTOR COMPLETED] agent: %s", - event.executor_id - ) - # Don't send to UI here - GroupChatResponseReceivedEvent already handles agent messages - # This avoids duplicate messages - - # Handle workflow output event (captures final result) - elif isinstance(event, WorkflowOutputEvent): - output_data = event.data - # Handle different output formats - if isinstance(output_data, ChatMessage): + except Exception as e: + self.logger.error( + "Error in streaming callback for agent %s: %s", + executor_id, e + ) + # Final workflow output (list[Message] or Message) + elif isinstance(output_data, Message): final_output = output_data.text or "" elif isinstance(output_data, list): - # Handle list of ChatMessage objects + # Handle list of Message objects texts = [] for item in output_data: - if isinstance(item, ChatMessage): + if isinstance(item, Message): if item.text: texts.append(item.text) else: From c249025cd0a8ac1e5a7e312fea77cf0523f2e0e3 Mon Sep 17 00:00:00 2001 From: Dhruvkumar-Microsoft Date: Mon, 16 Mar 2026 16:25:22 +0530 Subject: [PATCH 3/4] resolved the human in the loop issue --- src/backend/v4/magentic_agents/proxy_agent.py | 9 ++-- .../orchestration/human_approval_manager.py | 41 ++++++++++++++++++- .../v4/orchestration/orchestration_manager.py | 4 -- src/frontend/src/services/PlanDataService.tsx | 17 ++++++++ 4 files changed, 63 insertions(+), 8 deletions(-) diff --git a/src/backend/v4/magentic_agents/proxy_agent.py b/src/backend/v4/magentic_agents/proxy_agent.py index cf1130047..64ba28292 100644 --- a/src/backend/v4/magentic_agents/proxy_agent.py +++ b/src/backend/v4/magentic_agents/proxy_agent.py @@ -147,16 +147,19 @@ async def _invoke_stream_internal( logger.debug("ProxyAgent: Message text: %s", message_text[:100]) clarification_req_text = f"{message_text}" + request_id = str(uuid.uuid4()) clarification_request = UserClarificationRequest( question=clarification_req_text, - request_id=str(uuid.uuid4()), + request_id=request_id, ) # Dispatch websocket event requesting clarification + # Serialize dataclass to a plain dict so json.dumps produces proper JSON + # instead of relying on str() repr which is fragile for the frontend parser. await connection_config.send_status_update_async( { - "type": WebsocketMessageType.USER_CLARIFICATION_REQUEST, - "data": clarification_request, + "question": clarification_req_text, + "request_id": request_id, }, user_id=self.user_id, message_type=WebsocketMessageType.USER_CLARIFICATION_REQUEST, diff --git a/src/backend/v4/orchestration/human_approval_manager.py b/src/backend/v4/orchestration/human_approval_manager.py index c2eeedb28..24236fa50 100644 --- a/src/backend/v4/orchestration/human_approval_manager.py +++ b/src/backend/v4/orchestration/human_approval_manager.py @@ -8,7 +8,7 @@ from typing import Any, Optional import v4.models.messages as messages -from agent_framework import Message +from agent_framework import AgentResponse, Message from agent_framework_orchestrations._magentic import ( MagenticContext, StandardMagenticManager, @@ -94,6 +94,45 @@ def __init__(self, user_id: str, agent, *args, **kwargs): # New API: StandardMagenticManager takes agent as first positional argument super().__init__(agent, *args, **kwargs) + async def _complete(self, messages: list[Message]) -> Message: + """Override to pass session=None, making each LLM call stateless. + + The base class passes session=self._session which triggers + InMemoryHistoryProvider auto-injection and previous_response_id + chaining in rc4. This causes message payloads to grow with every + internal call (facts, plan, progress ledger, etc.), burning through + TPM quota (429 errors) and confusing the orchestrator LLM's routing + decisions (e.g. skipping ProxyAgent for user clarification). + + Passing session=None restores the old stateless behavior where each + call only sends the messages explicitly provided. + """ + from openai import RateLimitError + + max_retries = 5 + base_delay = 2.0 # seconds + + for attempt in range(max_retries): + try: + response: AgentResponse = await self._agent.run(messages, session=None) + if not response.messages: + raise RuntimeError("Agent returned no messages in response.") + if len(response.messages) > 1: + logger.warning("Agent returned multiple messages; using the last one.") + return response.messages[-1] + except Exception as exc: + inner = getattr(exc, "inner_exception", None) + is_rate_limit = isinstance(inner, RateLimitError) or "429" in str(exc) + if is_rate_limit and attempt < max_retries - 1: + delay = base_delay * (2 ** attempt) + logger.warning( + "Rate limit hit (attempt %d/%d). Retrying in %.1fs...", + attempt + 1, max_retries, delay, + ) + await asyncio.sleep(delay) + continue + raise + async def plan(self, magentic_context: MagenticContext) -> Any: """ Override the plan method to create the plan first, then ask for approval before execution. diff --git a/src/backend/v4/orchestration/orchestration_manager.py b/src/backend/v4/orchestration/orchestration_manager.py index 752df3039..039d74ea7 100644 --- a/src/backend/v4/orchestration/orchestration_manager.py +++ b/src/backend/v4/orchestration/orchestration_manager.py @@ -14,7 +14,6 @@ ChatOptions, Message, InMemoryCheckpointStorage, - WorkflowEvent, ) from agent_framework_orchestrations import MagenticBuilder from agent_framework_orchestrations._base_group_chat_orchestrator import ( @@ -22,7 +21,6 @@ GroupChatResponseReceivedEvent, ) from agent_framework_orchestrations._magentic import ( - MagenticOrchestratorEvent, MagenticProgressLedger, ) @@ -34,7 +32,6 @@ from v4.common.services.team_service import TeamService import time as _time from v4.callbacks.response_handlers import ( - agent_response_callback, streaming_agent_response_callback, ) from v4.config.settings import connection_config, orchestration_config @@ -387,7 +384,6 @@ async def run_orchestration(self, user_id: str, input_task) -> None: self.logger.info("Starting workflow execution...") - last_message_id: str | None = None async for event in workflow.run(task_text, stream=True): try: # WorkflowEvent has a .type field (string) instead of specific event classes diff --git a/src/frontend/src/services/PlanDataService.tsx b/src/frontend/src/services/PlanDataService.tsx index 2960965af..9d249f377 100644 --- a/src/frontend/src/services/PlanDataService.tsx +++ b/src/frontend/src/services/PlanDataService.tsx @@ -765,6 +765,23 @@ export class PlanDataService { */ static parseUserClarificationRequest(rawData: any): ParsedUserClarification | null { try { + // First try direct JSON extraction (clean dict format from backend) + const extractDirect = (val: any, depth = 0): ParsedUserClarification | null => { + if (depth > 10 || !val || typeof val !== 'object') return null; + if (typeof val.question === 'string' && typeof val.request_id === 'string') { + return { + type: WebsocketMessageType.USER_CLARIFICATION_REQUEST, + question: val.question.trim(), + request_id: val.request_id, + }; + } + if (val.data !== undefined) return extractDirect(val.data, depth + 1); + return null; + }; + const direct = extractDirect(rawData); + if (direct) return direct; + + // Fallback: extract from Python repr string (legacy format) const extractString = (val: any, depth = 0): string | null => { if (depth > 15) return null; if (typeof val === 'string') { From 63e2221b8a248f5cccb28244f6aae7f95057371d Mon Sep 17 00:00:00 2001 From: Dhruvkumar-Microsoft Date: Tue, 17 Mar 2026 11:48:56 +0530 Subject: [PATCH 4/4] updated the cooments --- src/backend/v4/orchestration/orchestration_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/v4/orchestration/orchestration_manager.py b/src/backend/v4/orchestration/orchestration_manager.py index 039d74ea7..42fb43dc5 100644 --- a/src/backend/v4/orchestration/orchestration_manager.py +++ b/src/backend/v4/orchestration/orchestration_manager.py @@ -210,7 +210,7 @@ async def init_orchestration( manager=manager, checkpoint_storage=storage, max_round_count=orchestration_config.max_rounds, - max_stall_count=3, # CRITICAL: Prevent re-calling agents when stalled (default is 3!) + max_stall_count=3, # Allow up to 3 stalled rounds before stopping; set to 0 to strictly prevent re-calling stalled agents. intermediate_outputs=True, # Required: yield agent streaming output events, not just orchestrator output )