diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..5122f9e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,9 @@ +{ + "chat.tools.terminal.autoApprove": { + "/^cd H:\\\\Works\\\\Code-Migration\\\\Container-Migration-Solution-Accelerator\\\\src\\\\backend-api ; python -m ruff check src/ --fix 2>&1$/": { + "approve": true, + "matchCommandLine": true + }, + "npx eslint": true + } +} \ No newline at end of file diff --git a/infra/main.bicep b/infra/main.bicep index b4efa0f..16a2440 100644 --- a/infra/main.bicep +++ b/infra/main.bicep @@ -78,6 +78,16 @@ param aiModelVersion string = '2025-04-16' @description('Optional. AI model deployment token capacity. Lower this if initial provisioning fails due to capacity. Defaults to 50K tokens per minute to improve regional success rate.') param aiModelCapacity int = 500 +@minLength(1) +@description('Optional. Name of the embedding model to deploy. Defaults to text-embedding-3-large.') +param aiEmbeddingModelName string = 'text-embedding-3-large' + +@description('Optional. Version of the embedding model. Defaults to 1.') +param aiEmbeddingModelVersion string = '1' + +@description('Optional. Embedding model deployment token capacity. Defaults to 500.') +param aiEmbeddingModelCapacity int = 500 + @description('Optional. The tags to apply to all deployed Azure resources.') param tags resourceInput<'Microsoft.Resources/resourceGroups@2025-04-01'>.tags = {} @@ -761,6 +771,18 @@ module existingAiFoundryAiServicesDeployments 'modules/ai-services-deployments.b capacity: aiModelCapacity } } + { + name: aiEmbeddingModelName + model: { + format: 'OpenAI' + name: aiEmbeddingModelName + version: aiEmbeddingModelVersion + } + sku: { + name: 'Standard' + capacity: aiEmbeddingModelCapacity + } + } ] roleAssignments: [ // Service Principal permissions @@ -857,6 +879,18 @@ module aiFoundry 'br/public:avm/ptn/ai-ml/ai-foundry:0.4.0' = if(!useExistingAiF capacity: aiModelCapacity } } + { + name: aiEmbeddingModelName + model: { + format: 'OpenAI' + name: aiEmbeddingModelName + version: aiEmbeddingModelVersion + } + sku: { + name: 'Standard' + capacity: aiEmbeddingModelCapacity + } + } ] tags: allTags enableTelemetry: enableTelemetry @@ -905,6 +939,10 @@ module appConfiguration 'br/public:avm/res/app-configuration/configuration-store name: 'AZURE_OPENAI_CHAT_DEPLOYMENT_NAME' value: aiModelDeploymentName } + { + name: 'AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME' + value: aiEmbeddingModelName + } { name: 'AZURE_OPENAI_ENDPOINT' value: 'https://${aiServicesName}.cognitiveservices.azure.com/' diff --git a/src/processor/src/libs/agent_framework/azure_openai_response_retry.py b/src/processor/src/libs/agent_framework/azure_openai_response_retry.py index 251e029..46943b2 100644 --- a/src/processor/src/libs/agent_framework/azure_openai_response_retry.py +++ b/src/processor/src/libs/agent_framework/azure_openai_response_retry.py @@ -246,14 +246,14 @@ class ContextTrimConfig: """ enabled: bool = True - # GPT-5.1 supports 272K input tokens (~800K chars). These defaults stay well - # within that budget while guarding against accidental large blob injection. - # Progressive trimming on retry will reduce these further if needed. - max_total_chars: int = 600_000 - max_message_chars: int = 40_000 - keep_last_messages: int = 50 - keep_head_chars: int = 15_000 - keep_tail_chars: int = 5_000 + # GPT-5.1 supports 272K input tokens (~800K chars). With workspace context + # injected into system instructions (never trimmed) and Qdrant shared memory + # providing cross-step context, we can keep fewer conversation messages. + max_total_chars: int = 400_000 + max_message_chars: int = 30_000 + keep_last_messages: int = 30 + keep_head_chars: int = 12_000 + keep_tail_chars: int = 4_000 keep_system_messages: bool = True retry_on_context_error: bool = True diff --git a/src/processor/src/libs/agent_framework/mem0_async_memory.py b/src/processor/src/libs/agent_framework/mem0_async_memory.py index 35b18c7..5740790 100644 --- a/src/processor/src/libs/agent_framework/mem0_async_memory.py +++ b/src/processor/src/libs/agent_framework/mem0_async_memory.py @@ -3,6 +3,8 @@ """Lazy-initialized async wrapper around the Mem0 vector-store memory backend.""" +import os + from mem0 import AsyncMemory @@ -17,6 +19,13 @@ async def get_memory(self): return self._memory_instance async def _create_memory(self): + endpoint = os.getenv("AZURE_OPENAI_ENDPOINT", "") + chat_deployment = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME", "gpt-5.1") + embedding_deployment = os.getenv( + "AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME", "text-embedding-3-large" + ) + api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-12-01-preview") + config = { "vector_store": { "provider": "redis", @@ -29,27 +38,24 @@ async def _create_memory(self): "llm": { "provider": "azure_openai", "config": { - "model": "gpt-5.1", + "model": chat_deployment, "temperature": 0.1, - "max_tokens": 100000, + "max_tokens": 4000, "azure_kwargs": { - "azure_deployment": "gpt-5.1", - "api_version": "2024-12-01-preview", - "azure_endpoint": "https://aifappframework.cognitiveservices.azure.com/", + "azure_deployment": chat_deployment, + "api_version": api_version, + "azure_endpoint": endpoint, }, }, }, "embedder": { "provider": "azure_openai", "config": { - "model": "text-embedding-3-large", + "model": embedding_deployment, "azure_kwargs": { - "api_version": "2024-02-01", - "azure_deployment": "text-embedding-3-large", - "azure_endpoint": "https://aifappframework.openai.azure.com/", - "default_headers": { - "CustomHeader": "container migration", - }, + "api_version": api_version, + "azure_deployment": embedding_deployment, + "azure_endpoint": endpoint, }, }, }, diff --git a/src/processor/src/libs/agent_framework/qdrant_memory_store.py b/src/processor/src/libs/agent_framework/qdrant_memory_store.py new file mode 100644 index 0000000..b314f25 --- /dev/null +++ b/src/processor/src/libs/agent_framework/qdrant_memory_store.py @@ -0,0 +1,285 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Qdrant-backed shared memory store for multi-agent context sharing. + +This module provides a vector memory store using Qdrant (in-process embedded mode) +that enables agents to share relevant context without carrying full conversation +history. Each migration process gets its own isolated collection. + +Usage: + store = QdrantMemoryStore(process_id="abc-123") + await store.initialize(embedding_client) + + # Store a memory + await store.add("AKS supports node auto-provisioning via Karpenter", + agent_name="AKS Expert", step="analysis", turn=3) + + # Retrieve relevant memories + memories = await store.search("How should we handle node scaling?", top_k=5) + + # Cleanup when process completes + await store.close() +""" + +from __future__ import annotations + +import logging +import time +import uuid +from dataclasses import dataclass + +from openai import AsyncAzureOpenAI +from qdrant_client import AsyncQdrantClient, models + +logger = logging.getLogger(__name__) + +# Qdrant collection settings +EMBEDDING_DIM = 3072 # text-embedding-3-large dimension +DISTANCE_METRIC = models.Distance.COSINE + + +@dataclass +class MemoryEntry: + """A single memory retrieved from the store.""" + + content: str + agent_name: str + step: str + turn: int + score: float + memory_id: str + + +class QdrantMemoryStore: + """Qdrant-backed vector memory store for sharing context across agents. + + Uses Qdrant embedded (in-process) mode — no external server needed. + Each migration process gets its own collection for isolation. + """ + + def __init__(self, process_id: str): + self.process_id = process_id + self.collection_name = f"migration_{process_id.replace('-', '_')}" + self._client: AsyncQdrantClient | None = None + self._embedding_client: AsyncAzureOpenAI | None = None + self._embedding_deployment: str | None = None + self._initialized = False + self._turn_counter = 0 + + async def initialize( + self, + embedding_client: AsyncAzureOpenAI, + embedding_deployment: str, + ) -> None: + """Initialize the Qdrant client and create the collection. + + Args: + embedding_client: Azure OpenAI async client for generating embeddings. + embedding_deployment: Deployment name for the embedding model. + """ + if self._initialized: + return + + self._embedding_client = embedding_client + self._embedding_deployment = embedding_deployment + + # In-memory Qdrant — no server, no persistence, auto-cleanup + self._client = AsyncQdrantClient(":memory:") + + await self._client.create_collection( + collection_name=self.collection_name, + vectors_config=models.VectorParams( + size=EMBEDDING_DIM, + distance=DISTANCE_METRIC, + ), + ) + + self._initialized = True + logger.info( + "[MEMORY] QdrantMemoryStore initialized for process %s (collection: %s)", + self.process_id, + self.collection_name, + ) + + async def add( + self, + content: str, + *, + agent_name: str, + step: str, + turn: int | None = None, + metadata: dict | None = None, + ) -> str: + """Store a memory entry with its embedding. + + Args: + content: The text content to store. + agent_name: Name of the agent that produced this content. + step: Migration step (analysis, design, convert, documentation). + turn: Conversation turn number (auto-incremented if None). + metadata: Optional additional metadata. + + Returns: + The unique ID of the stored memory. + """ + if not self._initialized: + raise RuntimeError("QdrantMemoryStore not initialized. Call initialize() first.") + + if not content or not content.strip(): + return "" + + if turn is None: + self._turn_counter += 1 + turn = self._turn_counter + + # Generate embedding + embedding = await self._embed(content) + if embedding is None: + logger.warning("[MEMORY] Failed to generate embedding, skipping store") + return "" + + memory_id = str(uuid.uuid4()) + payload = { + "content": content, + "agent_name": agent_name, + "step": step, + "turn": turn, + "process_id": self.process_id, + "timestamp": time.time(), + } + if metadata: + payload["metadata"] = metadata + + await self._client.upsert( + collection_name=self.collection_name, + points=[ + models.PointStruct( + id=memory_id, + vector=embedding, + payload=payload, + ) + ], + ) + + logger.debug( + "[MEMORY] Stored memory from %s (step=%s, turn=%d, %d chars)", + agent_name, + step, + turn, + len(content), + ) + return memory_id + + async def search( + self, + query: str, + *, + top_k: int = 10, + step_filter: str | None = None, + agent_filter: str | None = None, + score_threshold: float = 0.3, + ) -> list[MemoryEntry]: + """Search for relevant memories using semantic similarity. + + Args: + query: The search query text. + top_k: Maximum number of results to return. + step_filter: Optional filter by migration step. + agent_filter: Optional filter by agent name. + score_threshold: Minimum similarity score (0-1). + + Returns: + List of MemoryEntry objects sorted by relevance. + """ + if not self._initialized: + return [] + + embedding = await self._embed(query) + if embedding is None: + return [] + + # Build optional filters + conditions = [] + if step_filter: + conditions.append( + models.FieldCondition( + key="step", + match=models.MatchValue(value=step_filter), + ) + ) + if agent_filter: + conditions.append( + models.FieldCondition( + key="agent_name", + match=models.MatchValue(value=agent_filter), + ) + ) + + query_filter = models.Filter(must=conditions) if conditions else None + + results = await self._client.query_points( + collection_name=self.collection_name, + query=embedding, + query_filter=query_filter, + limit=top_k, + score_threshold=score_threshold, + ) + + memories = [] + for point in results.points: + payload = point.payload or {} + memories.append( + MemoryEntry( + content=payload.get("content", ""), + agent_name=payload.get("agent_name", ""), + step=payload.get("step", ""), + turn=payload.get("turn", 0), + score=point.score, + memory_id=str(point.id), + ) + ) + + logger.debug( + "[MEMORY] Search returned %d results (query: %.80s...)", + len(memories), + query, + ) + return memories + + async def get_count(self) -> int: + """Return the number of memories stored.""" + if not self._initialized: + return 0 + info = await self._client.get_collection(self.collection_name) + return info.points_count + + async def close(self) -> None: + """Close the Qdrant client and release resources.""" + if self._client: + try: + await self._client.delete_collection(self.collection_name) + except Exception: + pass + await self._client.close() + self._client = None + self._initialized = False + logger.info("[MEMORY] QdrantMemoryStore closed for process %s", self.process_id) + + async def _embed(self, text: str) -> list[float] | None: + """Generate an embedding vector for the given text.""" + if not self._embedding_client or not self._embedding_deployment: + return None + + # Truncate to avoid embedding API limits (8191 tokens ≈ ~30K chars) + truncated = text[:30_000] if len(text) > 30_000 else text + + try: + response = await self._embedding_client.embeddings.create( + input=truncated, + model=self._embedding_deployment, + ) + return response.data[0].embedding + except Exception as e: + logger.warning("[MEMORY] Embedding call failed: %s", e) + return None diff --git a/src/processor/src/libs/agent_framework/shared_memory_context_provider.py b/src/processor/src/libs/agent_framework/shared_memory_context_provider.py new file mode 100644 index 0000000..5984f9f --- /dev/null +++ b/src/processor/src/libs/agent_framework/shared_memory_context_provider.py @@ -0,0 +1,278 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""ContextProvider that injects shared Qdrant-backed memories into agent context. + +This provider is attached to each agent in a GroupChat. Before each LLM call, +it queries the shared QdrantMemoryStore for relevant memories and injects them +as additional context. After each LLM response, it stores the agent's response +back into the shared memory for other agents to discover. + +This enables agents to share knowledge without carrying the full conversation +history in their context window. +""" + +from __future__ import annotations + +import logging +from collections.abc import MutableSequence, Sequence +from typing import TYPE_CHECKING + +from agent_framework import ChatMessage, Context, ContextProvider + +if TYPE_CHECKING: + from libs.agent_framework.qdrant_memory_store import QdrantMemoryStore + +logger = logging.getLogger(__name__) + +# Maximum characters of memory context to inject (prevents context bloat) +MAX_MEMORY_CONTEXT_CHARS = 15_000 + +# Minimum content length to store (skip trivial messages) +MIN_CONTENT_LENGTH_TO_STORE = 50 + + +# Step order for determining cross-step queries +_STEP_ORDER = ["analysis", "design", "convert", "documentation"] + + +class SharedMemoryContextProvider(ContextProvider): + """ContextProvider that reads/writes shared memory via Qdrant. + + Attached to each agent individually, but all agents share the same + QdrantMemoryStore instance, enabling cross-agent knowledge sharing. + + Optimized for cross-step memory sharing: + - invoking(): only searches memories from PREVIOUS steps (within-step context + is already available via GroupChat conversation broadcast) + - invoked(): only stores the LAST response per agent per step (avoids + redundant embedding calls for intermediate turns) + """ + + def __init__( + self, + memory_store: QdrantMemoryStore, + agent_name: str, + step: str, + top_k: int = 10, + score_threshold: float = 0.3, + ): + """Initialize the shared memory context provider. + + Args: + memory_store: Shared QdrantMemoryStore instance (same across all agents). + agent_name: Name of the agent this provider is attached to. + step: Current migration step (analysis, design, convert, documentation). + top_k: Number of relevant memories to retrieve per turn. + score_threshold: Minimum similarity score for memory retrieval. + """ + self._memory_store = memory_store + self._agent_name = agent_name + self._step = step + self._top_k = top_k + self._score_threshold = score_threshold + self._turn_counter = 0 + self._last_content: str | None = None # Track last response for deferred storage + + # Determine which prior steps to search (skip current step) + step_lower = step.lower() + step_idx = None + for i, s in enumerate(_STEP_ORDER): + if s == step_lower: + step_idx = i + break + self._prior_steps = _STEP_ORDER[:step_idx] if step_idx else [] + + async def invoking( + self, + messages: ChatMessage | MutableSequence[ChatMessage], + **kwargs, + ) -> Context: + """Called before the agent's LLM call. Injects relevant shared memories. + + Only searches memories from PREVIOUS steps. Within the current step, + agents already see all messages via GroupChat broadcast. + """ + # Skip if this is the first step (no prior memories exist) + if not self._prior_steps: + return Context() + + # Extract query from the most recent messages + query = self._extract_query(messages) + if not query: + return Context() + + try: + memories = await self._memory_store.search( + query=query, + top_k=self._top_k, + score_threshold=self._score_threshold, + ) + except Exception as e: + logger.warning( + "[MEMORY] Failed to search memories for %s: %s", + self._agent_name, + e, + ) + return Context() + + if not memories: + return Context() + + # Format memories into context instructions + formatted = self._format_memories(memories) + if not formatted: + return Context() + + instructions = ( + f"{self.DEFAULT_CONTEXT_PROMPT}\n\n{formatted}" + ) + + logger.info( + "[MEMORY] Injecting %d memories for %s (step=%s, %d chars)", + len(memories), + self._agent_name, + self._step, + len(instructions), + ) + + return Context(instructions=instructions) + + async def invoked( + self, + request_messages: ChatMessage | Sequence[ChatMessage], + response_messages: ChatMessage | Sequence[ChatMessage] | None = None, + invoke_exception: Exception | None = None, + **kwargs, + ) -> None: + """Called after the agent's LLM response. Buffers the response for storage. + + Instead of storing every turn (expensive), we buffer the latest response + and only store it when the next invocation happens or the step ends. + This means only the agent's last response per step gets stored, + which is the most complete and useful summary. + """ + if invoke_exception is not None: + return + + if response_messages is None: + return + + # Extract text from response + content = self._extract_text(response_messages) + if not content or len(content) < MIN_CONTENT_LENGTH_TO_STORE: + return + + # Store previous buffered content before replacing + if self._last_content is not None: + await self._flush_memory() + + self._last_content = content + self._turn_counter += 1 + + async def flush(self) -> None: + """Flush any buffered memory to the store. + + Called at step completion to ensure the last agent response is stored. + """ + if self._last_content is not None: + await self._flush_memory() + + async def _flush_memory(self) -> None: + """Store the buffered content into the memory store.""" + content = self._last_content + self._last_content = None + if not content: + return + + # Guard: skip if memory store is no longer available + if not getattr(self._memory_store, "_initialized", False): + return + + try: + await self._memory_store.add( + content=content, + agent_name=self._agent_name, + step=self._step, + turn=self._turn_counter, + ) + logger.info( + "[MEMORY] Stored memory from %s (step=%s, turn=%d, %d chars)", + self._agent_name, + self._step, + self._turn_counter, + len(content), + ) + except Exception as e: + logger.warning( + "[MEMORY] Failed to store memory for %s: %s", + self._agent_name, + e, + ) + + def _extract_query( + self, messages: ChatMessage | MutableSequence[ChatMessage] + ) -> str: + """Extract a search query from the input messages. + + Uses the last non-system message as the query, truncated for embedding. + """ + # Single message (not a list/sequence) + if not isinstance(messages, (list, MutableSequence)): + return self._get_text(messages)[:2000] + + if not messages: + return "" + + # Search from the end for the most recent substantive message + for msg in reversed(messages): + text = self._get_text(msg) + if text and len(text) > 20: + return text[:2000] + + return "" + + def _format_memories(self, memories: list) -> str: + """Format retrieved memories into a readable context block.""" + if not memories: + return "" + + lines = [] + total_chars = 0 + + for mem in memories: + # Truncate individual memories to prevent a single one from dominating + content = mem.content[:3000] if len(mem.content) > 3000 else mem.content + entry = f"- [{mem.agent_name} / {mem.step}] {content}" + + if total_chars + len(entry) > MAX_MEMORY_CONTEXT_CHARS: + break + + lines.append(entry) + total_chars += len(entry) + + return "\n".join(lines) + + @staticmethod + def _get_text(message: ChatMessage) -> str: + """Extract text content from a ChatMessage.""" + if hasattr(message, "text") and message.text: + return message.text + if hasattr(message, "content"): + return str(message.content) if message.content else "" + return str(message) if message else "" + + @staticmethod + def _extract_text( + messages: ChatMessage | Sequence[ChatMessage], + ) -> str: + """Extract text content from response message(s).""" + if not isinstance(messages, (list, Sequence)) or isinstance(messages, str): + return SharedMemoryContextProvider._get_text(messages) + + parts = [] + for msg in messages: + text = SharedMemoryContextProvider._get_text(msg) + if text: + parts.append(text) + return "\n".join(parts) diff --git a/src/processor/src/libs/base/orchestrator_base.py b/src/processor/src/libs/base/orchestrator_base.py index c58ebb3..6888c56 100644 --- a/src/processor/src/libs/base/orchestrator_base.py +++ b/src/processor/src/libs/base/orchestrator_base.py @@ -20,6 +20,10 @@ AgentResponseStream, OrchestrationResult, ) +from libs.agent_framework.qdrant_memory_store import QdrantMemoryStore +from libs.agent_framework.shared_memory_context_provider import ( + SharedMemoryContextProvider, +) from utils.agent_telemetry import TelemetryManager from utils.console_util import format_agent_message @@ -29,10 +33,15 @@ ResultT = TypeVar("ResultT") +logger = logging.getLogger(__name__) + + class OrchestratorBase(AgentBase, Generic[TaskParamT, ResultT]): def __init__(self, app_context=None): super().__init__(app_context) self.initialized = False + self.memory_store: QdrantMemoryStore | None = None + self.step_name: str = "" def is_console_summarization_enabled(self) -> bool: """Return True if console summarization (extra LLM call per turn) is enabled. @@ -57,9 +66,46 @@ async def initialize(self, process_id: str): | Sequence[ToolProtocol | Callable[..., Any] | MutableMapping[str, Any]] ) = await self.prepare_mcp_tools() self.agentinfos = await self.prepare_agent_infos() + + # Resolve workflow-level shared memory store from AppContext (if registered) + if self.app_context.is_registered(QdrantMemoryStore): + try: + self.memory_store = self.app_context.get_service(QdrantMemoryStore) + logger.info( + "[MEMORY] Resolved memory store for step=%s, initialized=%s, id=%s", + self.step_name, + getattr(self.memory_store, "_initialized", "?"), + id(self.memory_store), + ) + except Exception: + self.memory_store = None + self.agents = await self.create_agents(self.agentinfos, process_id=process_id) self.initialized = True + async def flush_agent_memories(self) -> None: + """Flush buffered memories from all agent context providers. + + Called at step completion to ensure each agent's last response + is stored in the shared memory before the next step begins. + """ + for agent in (self.agents or {}).values(): + # ChatAgent stores providers in agent.context_provider (AggregateContextProvider) + # which has a .providers list of individual ContextProvider instances + agg_provider = getattr(agent, "context_provider", None) + if agg_provider is None: + continue + inner_providers = getattr(agg_provider, "providers", None) + if not inner_providers: + continue + for provider in inner_providers: + flush = getattr(provider, "flush", None) + if callable(flush): + try: + await flush() + except Exception as e: + logger.warning("[MEMORY] flush failed: %s", e) + def load_platform_registry(self, registry_path: str) -> list[dict[str, Any]]: with open(registry_path, "r", encoding="utf-8") as f: data = json.load(f) @@ -101,11 +147,25 @@ async def create_agents( ) -> list[ChatAgent]: agents = dict[str, ChatAgent]() agent_client = await self.get_client(thread_id=process_id) + + # Workspace context — injected into every agent's system instructions + # so it survives context trimming (system messages are never trimmed) + workspace_context = ( + f"\n\n## WORKSPACE CONTEXT\n" + f"- Process ID: {process_id}\n" + f"- Container: processes\n" + f"- Source folder: {process_id}/source\n" + f"- Output folder: {process_id}/converted\n" + ) + for agent_info in agent_infos: + # Append workspace context to every agent's instruction + instruction = agent_info.agent_instruction + workspace_context + builder = ( AgentBuilder(agent_client) .with_name(agent_info.agent_name) - .with_instructions(agent_info.agent_instruction) + .with_instructions(instruction) ) # Only attach tools when provided. (Coordinator should typically have none.) @@ -134,6 +194,20 @@ async def create_agents( .with_max_tokens(12_000) .with_tool_choice("none") ) + + # Attach shared memory context provider to expert agents + # (not Coordinator, not ResultGenerator — they don't need memory) + if ( + self.memory_store is not None + and agent_info.agent_name not in ("Coordinator", "ResultGenerator") + ): + memory_provider = SharedMemoryContextProvider( + memory_store=self.memory_store, + agent_name=agent_info.agent_name, + step=self.step_name, + ) + builder = builder.with_context_providers(memory_provider) + agent = builder.build() agents[agent_info.agent_name] = agent diff --git a/src/processor/src/libs/mcp_server/mermaid/mcp_mermaid.py b/src/processor/src/libs/mcp_server/mermaid/mcp_mermaid.py index fde7dc7..7652ca7 100644 --- a/src/processor/src/libs/mcp_server/mermaid/mcp_mermaid.py +++ b/src/processor/src/libs/mcp_server/mermaid/mcp_mermaid.py @@ -373,7 +373,7 @@ def _mermaid_render_check(code: str, timeout: int = 10) -> tuple[bool, str]: # Extract just the error message lines = stderr.split("\n") error_line = next( - (l for l in lines if "Error" in l or "error" in l), lines[0] + (line for line in lines if "Error" in line or "error" in line), lines[0] ) return False, error_line[:200] return True, "" diff --git a/src/processor/src/steps/analysis/orchestration/analysis_orchestrator.py b/src/processor/src/steps/analysis/orchestration/analysis_orchestrator.py index 40aeba0..93f8f2f 100644 --- a/src/processor/src/steps/analysis/orchestration/analysis_orchestrator.py +++ b/src/processor/src/steps/analysis/orchestration/analysis_orchestrator.py @@ -92,6 +92,7 @@ async def execute( on_agent_response_stream=self.on_agent_response_stream, on_workflow_complete=self.on_orchestration_complete, ) + await self.flush_agent_memories() return orchestration_result async def prepare_mcp_tools( @@ -209,9 +210,9 @@ async def prepare_agent_infos(self) -> list[AgentInfo]: # Render coordinator prompt with the current participant list. participant_names = [ai.agent_name for ai in agent_infos] - valid_participants_block = "\n".join( - [f'- "{name}"' for name in participant_names] - ) + valid_participants_block = "\n".join([ + f'- "{name}"' for name in participant_names + ]) coordinator_agent_info.render( **self.task_param.model_dump(), current_timestamp=get_current_timestamp_utc(), @@ -223,34 +224,9 @@ async def prepare_agent_infos(self) -> list[AgentInfo]: agent_infos.append(coordinator_agent_info) # ResultGenerator: serializes the completed conversation into the output schema. - result_generator_instruction = """ - You are a Result Generator. - - ROLE & RESPONSIBILITY (do not exceed scope): - - You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers. - - The step outcome has already happened via stakeholder discussion and coordinator termination. - - Your only job is to serialize the final outcome into the required schema exactly. - - STRICT JSON RULES: - - Output MUST be valid JSON only (no markdown, no prose). - - Do NOT call tools. - - Do NOT verify file existence. - - Do NOT invent termination codes or blockers. - - HARD-TERMINATION SERIALIZATION RULE (IMPORTANT): - - Set `is_hard_terminated=true` ONLY if a participant explicitly provided a hard-termination decision with a termination code - from this exact set: NO_YAML_FILES, NO_KUBERNETES_CONTENT, ALL_CORRUPTED, SECURITY_POLICY_VIOLATION, RAI_POLICY_VIOLATION, PROFANITY_DETECTED, MIXED_PLATFORM_DETECTED. - - If hard-terminated, `blocking_issues` must be a list of those exact codes ONLY (no extra explanation text inside the list). - - EVIDENCE PRESERVATION (when hard-terminated): - - The `reason` MUST include a short **Evidence** section listing which file(s) triggered the termination and what was detected. - - NEVER include secret values (tokens/passwords/private keys/base64 blobs). For Secrets, include only key names + resource metadata. - - WHAT TO DO: - 1) Review the conversation (excluding the Coordinator). - 2) Extract the final agreed facts and any explicit PASS/FAIL sign-offs exactly as stated. - 3) Emit JSON that conforms exactly to `Analysis_ExtendedBooleanResult`. - """ + result_generator_instruction = self.read_prompt_file( + registry_dir / "prompt_resultgenerator.txt" + ) result_generator_info = AgentInfo( agent_name="ResultGenerator", agent_instruction=result_generator_instruction, diff --git a/src/processor/src/steps/analysis/orchestration/prompt_resultgenerator.txt b/src/processor/src/steps/analysis/orchestration/prompt_resultgenerator.txt new file mode 100644 index 0000000..a82f201 --- /dev/null +++ b/src/processor/src/steps/analysis/orchestration/prompt_resultgenerator.txt @@ -0,0 +1,26 @@ +You are a Result Generator. + +ROLE & RESPONSIBILITY (do not exceed scope): +- You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers. +- The step outcome has already happened via stakeholder discussion and coordinator termination. +- Your only job is to serialize the final outcome into the required schema exactly. + +STRICT JSON RULES: +- Output MUST be valid JSON only (no markdown, no prose). +- Do NOT call tools. +- Do NOT verify file existence. +- Do NOT invent termination codes or blockers. + +HARD-TERMINATION SERIALIZATION RULE (IMPORTANT): +- Set `is_hard_terminated=true` ONLY if a participant explicitly provided a hard-termination decision with a termination code + from this exact set: NO_YAML_FILES, NO_KUBERNETES_CONTENT, ALL_CORRUPTED, SECURITY_POLICY_VIOLATION, RAI_POLICY_VIOLATION, PROFANITY_DETECTED, MIXED_PLATFORM_DETECTED. +- If hard-terminated, `blocking_issues` must be a list of those exact codes ONLY (no extra explanation text inside the list). + + EVIDENCE PRESERVATION (when hard-terminated): + - The `reason` MUST include a short **Evidence** section listing which file(s) triggered the termination and what was detected. + - NEVER include secret values (tokens/passwords/private keys/base64 blobs). For Secrets, include only key names + resource metadata. + +WHAT TO DO: +1) Review the conversation (excluding the Coordinator). +2) Extract the final agreed facts and any explicit PASS/FAIL sign-offs exactly as stated. +3) Emit JSON that conforms exactly to `Analysis_ExtendedBooleanResult`. diff --git a/src/processor/src/steps/analysis/workflow/analysis_executor.py b/src/processor/src/steps/analysis/workflow/analysis_executor.py index 35bd355..e9b2777 100644 --- a/src/processor/src/steps/analysis/workflow/analysis_executor.py +++ b/src/processor/src/steps/analysis/workflow/analysis_executor.py @@ -50,7 +50,7 @@ async def handle_execute( TelemetryManager ) await telemetry.transition_to_phase( - process_id=message.process_id, step="analysis", phase="Analysis" + process_id=message.process_id, step="analysis", phase="Initializing Analysis" ) print(text2art("Analysis")) diff --git a/src/processor/src/steps/convert/orchestration/prompt_coordinator.txt b/src/processor/src/steps/convert/orchestration/prompt_coordinator.txt index 8378023..4a510a1 100644 --- a/src/processor/src/steps/convert/orchestration/prompt_coordinator.txt +++ b/src/processor/src/steps/convert/orchestration/prompt_coordinator.txt @@ -67,7 +67,9 @@ STATE-AWARE ROUTING: - Chief Architect MUST merge/de-duplicate blockers into one prioritized fix plan 5) Else if there are any Open blockers → select YAML Expert - YAML Expert MUST apply merged fix plan in one pass, re-save YAMLs, update file_converting_result.md -6) Else select Chief Architect to finalize +6) Else if YAML Expert's sign-off is PENDING → select YAML Expert + - Instruction: "All reviewers have provided PASS. Update your own SIGN-OFF from PENDING to PASS in file_converting_result.md and populate the ## References section with validated Microsoft Learn URLs." +7) Else select Chief Architect to finalize PHASE 3: CONFLICT RESOLUTION (Chief Architect, if any FAIL) - Chief Architect reconciles conflicts and de-duplicates blockers diff --git a/src/processor/src/steps/convert/orchestration/prompt_resultgenerator.txt b/src/processor/src/steps/convert/orchestration/prompt_resultgenerator.txt new file mode 100644 index 0000000..8393f43 --- /dev/null +++ b/src/processor/src/steps/convert/orchestration/prompt_resultgenerator.txt @@ -0,0 +1,19 @@ +You are a Result Generator. + +ROLE & RESPONSIBILITY (do not exceed scope): +- You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers. +- The step outcome has already happened via stakeholder discussion and coordinator termination. +- Your only job is to serialize the final outcome into the required schema exactly. + +RULES: +- Output MUST be valid JSON only. +- Do NOT call tools. +- Do NOT verify file existence. +- Do NOT invent missing files, blockers, or metrics. +- Only summarize what participants explicitly stated. +- Keep `reason` short (one sentence). + +WHAT TO DO: +1) Review the conversation (excluding the Coordinator). +2) Extract conversion results (converted files, concerns, metrics, and report path) as stated. +3) Emit JSON that conforms exactly to `Yaml_ExtendedBooleanResult`. diff --git a/src/processor/src/steps/convert/orchestration/yaml_convert_orchestrator.py b/src/processor/src/steps/convert/orchestration/yaml_convert_orchestrator.py index 6ecf5b2..c096cc7 100644 --- a/src/processor/src/steps/convert/orchestration/yaml_convert_orchestrator.py +++ b/src/processor/src/steps/convert/orchestration/yaml_convert_orchestrator.py @@ -98,6 +98,7 @@ async def execute( on_workflow_complete=self.on_orchestration_complete, on_agent_response_stream=self.on_agent_response_stream, ) + await self.flush_agent_memories() return orchestration_result async def prepare_mcp_tools( @@ -212,27 +213,9 @@ async def prepare_agent_infos(self) -> list[Any]: agent_infos.append(coordinator_agent_info) # Result generator - result_generator_instruction = """ - You are a Result Generator. - - ROLE & RESPONSIBILITY (do not exceed scope): - - You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers. - - The step outcome has already happened via stakeholder discussion and coordinator termination. - - Your only job is to serialize the final outcome into the required schema exactly. - - RULES: - - Output MUST be valid JSON only. - - Do NOT call tools. - - Do NOT verify file existence. - - Do NOT invent missing files, blockers, or metrics. - - Only summarize what participants explicitly stated. - - Keep `reason` short (one sentence). - - WHAT TO DO: - 1) Review the conversation (excluding the Coordinator). - 2) Extract conversion results (converted files, concerns, metrics, and report path) as stated. - 3) Emit JSON that conforms exactly to `Yaml_ExtendedBooleanResult`. -""" + result_generator_instruction = self.read_prompt_file( + str(Path(__file__).parent / "prompt_resultgenerator.txt") + ) result_generator_info = AgentInfo( agent_name="ResultGenerator", agent_instruction=result_generator_instruction, diff --git a/src/processor/src/steps/design/orchestration/design_orchestrator.py b/src/processor/src/steps/design/orchestration/design_orchestrator.py index 379b0a1..d2dd47f 100644 --- a/src/processor/src/steps/design/orchestration/design_orchestrator.py +++ b/src/processor/src/steps/design/orchestration/design_orchestrator.py @@ -92,6 +92,7 @@ async def execute( on_workflow_complete=self.on_orchestration_complete, on_agent_response_stream=self.on_agent_response_stream, ) + await self.flush_agent_memories() return orchestration_result async def prepare_mcp_tools( @@ -225,27 +226,9 @@ async def prepare_agent_infos(self) -> list[Any]: agent_infos.append(coordinator_agent_info) # ResultGenerator: Generates structured Design_ExtendedBooleanResult AFTER GroupChat completes - result_generator_instruction = """ - You are a Result Generator. - - ROLE & RESPONSIBILITY (do not exceed scope): - - You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers. - - The step outcome has already happened via stakeholder discussion and coordinator termination. - - Your only job is to serialize the final outcome into the required schema exactly. - - RULES: - - Output MUST be valid JSON only (no markdown, no prose). - - Do NOT call tools. - - Do NOT verify file existence. - - Do NOT add new requirements. - - Only summarize what participants explicitly said/did. - - Keep `reason` short (one sentence). - - WHAT TO DO: - 1) Review the conversation (excluding the Coordinator). - 2) Extract the final, agreed design summary, key decisions, and the expected output artifact paths. - 3) Emit JSON that conforms exactly to `Design_ExtendedBooleanResult`. -""" + result_generator_instruction = self.read_prompt_file( + str(Path(__file__).parent / "prompt_resultgenerator.txt") + ) result_generator_info = AgentInfo( agent_name="ResultGenerator", agent_instruction=result_generator_instruction, diff --git a/src/processor/src/steps/design/orchestration/prompt_resultgenerator.txt b/src/processor/src/steps/design/orchestration/prompt_resultgenerator.txt new file mode 100644 index 0000000..62481fb --- /dev/null +++ b/src/processor/src/steps/design/orchestration/prompt_resultgenerator.txt @@ -0,0 +1,19 @@ +You are a Result Generator. + +ROLE & RESPONSIBILITY (do not exceed scope): +- You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers. +- The step outcome has already happened via stakeholder discussion and coordinator termination. +- Your only job is to serialize the final outcome into the required schema exactly. + +RULES: +- Output MUST be valid JSON only (no markdown, no prose). +- Do NOT call tools. +- Do NOT verify file existence. +- Do NOT add new requirements. +- Only summarize what participants explicitly said/did. +- Keep `reason` short (one sentence). + +WHAT TO DO: +1) Review the conversation (excluding the Coordinator). +2) Extract the final, agreed design summary, key decisions, and the expected output artifact paths. +3) Emit JSON that conforms exactly to `Design_ExtendedBooleanResult`. diff --git a/src/processor/src/steps/documentation/orchestration/documentation_orchestrator.py b/src/processor/src/steps/documentation/orchestration/documentation_orchestrator.py index 5470d13..3037aa7 100644 --- a/src/processor/src/steps/documentation/orchestration/documentation_orchestrator.py +++ b/src/processor/src/steps/documentation/orchestration/documentation_orchestrator.py @@ -103,6 +103,7 @@ async def execute( on_workflow_complete=self.on_orchestration_complete, on_agent_response_stream=self.on_agent_response_stream, ) + await self.flush_agent_memories() return orchestration_result async def prepare_mcp_tools( @@ -250,27 +251,9 @@ async def prepare_agent_infos(self) -> list[Any]: ) agent_infos.append(coordinator_info) - result_generator_instruction = """ - You are a Result Generator. - - ROLE & RESPONSIBILITY (do not exceed scope): - - You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers. - - The step outcome has already happened via stakeholder discussion and coordinator termination. - - Your only job is to serialize the final outcome into the required schema exactly. - - RULES: - - Output MUST be valid JSON only. - - Do NOT call tools. - - Do NOT verify file existence. - - Do NOT invent metrics, blockers, or sign-offs. - - Only summarize what participants explicitly stated. - - Keep `reason` short (one sentence). - - WHAT TO DO: - 1) Review the conversation (excluding the Coordinator). - 2) Extract roll-up metrics, expert collaboration/consensus notes, and generated file references as stated. - 3) Emit JSON that conforms exactly to `Documentation_ExtendedBooleanResult`. - """ + result_generator_instruction = self.read_prompt_file( + str(Path(__file__).parent / "prompt_resultgenerator.txt") + ) result_generator_info = AgentInfo( agent_name="ResultGenerator", agent_instruction=result_generator_instruction, diff --git a/src/processor/src/steps/documentation/orchestration/prompt_resultgenerator.txt b/src/processor/src/steps/documentation/orchestration/prompt_resultgenerator.txt new file mode 100644 index 0000000..11df6bf --- /dev/null +++ b/src/processor/src/steps/documentation/orchestration/prompt_resultgenerator.txt @@ -0,0 +1,19 @@ +You are a Result Generator. + +ROLE & RESPONSIBILITY (do not exceed scope): +- You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers. +- The step outcome has already happened via stakeholder discussion and coordinator termination. +- Your only job is to serialize the final outcome into the required schema exactly. + +RULES: +- Output MUST be valid JSON only. +- Do NOT call tools. +- Do NOT verify file existence. +- Do NOT invent metrics, blockers, or sign-offs. +- Only summarize what participants explicitly stated. +- Keep `reason` short (one sentence). + +WHAT TO DO: +1) Review the conversation (excluding the Coordinator). +2) Extract roll-up metrics, expert collaboration/consensus notes, and generated file references as stated. +3) Emit JSON that conforms exactly to `Documentation_ExtendedBooleanResult`. diff --git a/src/processor/src/steps/migration_processor.py b/src/processor/src/steps/migration_processor.py index c7bf859..6d759f3 100644 --- a/src/processor/src/steps/migration_processor.py +++ b/src/processor/src/steps/migration_processor.py @@ -26,6 +26,8 @@ """ import json +import logging +import os import time from datetime import datetime from typing import Any @@ -41,7 +43,9 @@ WorkflowStartedEvent, ) from art import text2art +from openai import AsyncAzureOpenAI +from libs.agent_framework.qdrant_memory_store import QdrantMemoryStore from libs.application.application_context import AppContext from libs.reporting import ( MigrationReportCollector, @@ -50,6 +54,7 @@ ) from libs.reporting.models.failure_context import FailureType from utils.agent_telemetry import TelemetryManager +from utils.credential_util import get_bearer_token_provider from .analysis.models.step_param import Analysis_TaskParam from .analysis.workflow.analysis_executor import AnalysisExecutor @@ -57,6 +62,8 @@ from .design.workflow.design_executor import DesignExecutor from .documentation.workflow.documentation_executor import DocumentationExecutor +logger = logging.getLogger(__name__) + class WorkflowExecutorFailedException(Exception): """Raised when an executor fails, preserving WorkflowErrorDetails payload.""" @@ -187,6 +194,62 @@ def _init_workflow(self) -> Workflow: return workflow + async def _create_memory_store( + self, process_id: str + ) -> QdrantMemoryStore | None: + """Create a workflow-scoped shared memory store. + + The memory store lives for the entire workflow (analysis → design → convert + → documentation) so memories from earlier steps are available to later steps. + Returns None if disabled or misconfigured (workflow proceeds without memory). + """ + enabled = os.getenv("SHARED_MEMORY_ENABLED", "true").strip().lower() + if enabled not in ("1", "true", "yes", "on"): + logger.info("[MEMORY] Shared memory disabled via SHARED_MEMORY_ENABLED") + return None + + try: + from libs.agent_framework.agent_framework_helper import AgentFrameworkHelper + + helper: AgentFrameworkHelper = self.app_context.get_service( + AgentFrameworkHelper + ) + service_config = helper.settings.get_service_config("default") + if not service_config: + logger.warning("[MEMORY] No default service config — skipping memory") + return None + + embedding_deployment = service_config.embedding_deployment_name + if not embedding_deployment: + logger.warning( + "[MEMORY] No embedding deployment configured — skipping memory. " + "Set AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME to enable." + ) + return None + + token_provider = get_bearer_token_provider() + embedding_client = AsyncAzureOpenAI( + azure_endpoint=service_config.endpoint, + azure_ad_token_provider=token_provider, + api_version=service_config.api_version, + ) + + store = QdrantMemoryStore(process_id=process_id) + await store.initialize( + embedding_client=embedding_client, + embedding_deployment=embedding_deployment, + ) + logger.info( + "[MEMORY] Workflow-level shared memory store initialized (process=%s)", + process_id, + ) + return store + except Exception as e: + logger.warning( + "[MEMORY] Failed to create memory store: %s — continuing without", e + ) + return None + async def run(self, input_data: Analysis_TaskParam) -> Any: """Run the migration workflow. @@ -224,6 +287,11 @@ async def run(self, input_data: Analysis_TaskParam) -> Any: report_generator = MigrationReportGenerator(report_collector) step_start_perf: dict[str, float] = {} + # Initialize shared memory store at workflow level (shared across all 4 steps) + memory_store = await self._create_memory_store(input_data.process_id) + if memory_store is not None: + self.app_context.add_singleton(QdrantMemoryStore, memory_store) + try: telemetry: TelemetryManager = await self.app_context.get_service_async( TelemetryManager @@ -574,18 +642,20 @@ async def _generate_report_summary( telemetry: TelemetryManager = ( await self.app_context.get_service_async(TelemetryManager) ) - # Map executor IDs to human-readable phase names - phase_names = { + # Map executor IDs to human-readable step names + step_display_names = { "design": "Design", "yaml_conversion": "YAML", + "yaml": "YAML", "documentation": "Documentation", } + step_display = step_display_names.get( + event.executor_id, event.executor_id.capitalize() + ) await telemetry.transition_to_phase( process_id=event.data.process_id, step=event.executor_id, - phase=phase_names.get( - event.executor_id, event.executor_id.capitalize() - ), + phase=f"Initializing {step_display}", ) print(f"Executor invoked ({event.executor_id})") print(text2art(event.executor_id.capitalize())) @@ -602,6 +672,18 @@ async def _generate_report_summary( elif isinstance(event, ExecutorCompletedEvent): # print(f"Executor completed ({event.executor_id}): {event.data}") + # Log shared memory stats after each step + if memory_store is not None: + try: + mem_count = await memory_store.get_count() + logger.info( + "[MEMORY] Step '%s' completed — %d total memories in store", + event.executor_id, + mem_count, + ) + except Exception: + pass + # step name -> executor_id # output result -> event.data => if event.data is not None if event.data: @@ -626,6 +708,18 @@ async def _generate_report_summary( # print(f"{event.__class__.__name__} ({event.origin.value}): {event}") pass finally: + # Clean up shared memory store + if memory_store is not None: + try: + count = await memory_store.get_count() + logger.info( + "[MEMORY] Workflow complete — closing memory store (%d memories)", + count, + ) + await memory_store.close() + except Exception as e: + logger.warning("[MEMORY] Error closing memory store: %s", e) + elapsed_seconds = time.perf_counter() - start_perf end_dt = datetime.now() elapsed_mins, elapsed_secs = divmod(int(elapsed_seconds), 60) diff --git a/src/processor/src/tests/unit/libs/agent_framework/test_qdrant_memory_store.py b/src/processor/src/tests/unit/libs/agent_framework/test_qdrant_memory_store.py new file mode 100644 index 0000000..a891cca --- /dev/null +++ b/src/processor/src/tests/unit/libs/agent_framework/test_qdrant_memory_store.py @@ -0,0 +1,385 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Unit tests for QdrantMemoryStore.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock + +from libs.agent_framework.qdrant_memory_store import QdrantMemoryStore + + +def _make_embedding_client(): + """Create a mock Azure OpenAI embedding client.""" + client = AsyncMock() + embedding_obj = MagicMock() + embedding_obj.embedding = [0.1] * 3072 + response = MagicMock() + response.data = [embedding_obj] + client.embeddings.create = AsyncMock(return_value=response) + return client + + +def _make_failing_embedding_client(): + """Create a mock embedding client that fails.""" + client = AsyncMock() + client.embeddings.create = AsyncMock(side_effect=Exception("API error")) + return client + + +# --------------------------------------------------------------------------- +# Initialization & Lifecycle +# --------------------------------------------------------------------------- + + +def test_initialize_creates_collection(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="test-001") + assert not store._initialized + + await store.initialize( + embedding_client=client, embedding_deployment="text-embedding-3-large" + ) + assert store._initialized + assert store._client is not None + assert await store.get_count() == 0 + + await store.close() + + asyncio.run(_run()) + + +def test_initialize_idempotent(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="test-002") + await store.initialize(embedding_client=client, embedding_deployment="emb") + qdrant_before = store._client + + await store.initialize(embedding_client=client, embedding_deployment="emb") + assert store._client is qdrant_before + + await store.close() + + asyncio.run(_run()) + + +def test_close_releases_resources(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="test-003") + await store.initialize(embedding_client=client, embedding_deployment="emb") + await store.close() + + assert store._client is None + assert not store._initialized + + asyncio.run(_run()) + + +def test_close_idempotent(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="test-004") + await store.initialize(embedding_client=client, embedding_deployment="emb") + await store.close() + await store.close() # Should not raise + + asyncio.run(_run()) + + +def test_collection_name_from_process_id(): + store = QdrantMemoryStore(process_id="abc-def-123") + assert store.collection_name == "migration_abc_def_123" + + +# --------------------------------------------------------------------------- +# Add +# --------------------------------------------------------------------------- + + +def test_add_stores_memory(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="add-001") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + memory_id = await store.add( + "AKS supports Karpenter", agent_name="AKS Expert", step="analysis", turn=1 + ) + assert memory_id + assert await store.get_count() == 1 + + await store.close() + + asyncio.run(_run()) + + +def test_add_multiple_memories(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="add-002") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + await store.add("Mem 1", agent_name="A", step="analysis", turn=1) + await store.add("Mem 2", agent_name="B", step="analysis", turn=2) + await store.add("Mem 3", agent_name="C", step="design", turn=3) + assert await store.get_count() == 3 + + await store.close() + + asyncio.run(_run()) + + +def test_add_empty_content_skipped(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="add-003") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + result = await store.add("", agent_name="A", step="analysis") + assert result == "" + assert await store.get_count() == 0 + + await store.close() + + asyncio.run(_run()) + + +def test_add_whitespace_content_skipped(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="add-004") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + result = await store.add(" ", agent_name="A", step="analysis") + assert result == "" + + await store.close() + + asyncio.run(_run()) + + +def test_add_auto_increments_turn(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="add-005") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + await store.add("First", agent_name="A", step="analysis") + await store.add("Second", agent_name="B", step="analysis") + assert store._turn_counter == 2 + + await store.close() + + asyncio.run(_run()) + + +def test_add_without_initialization_raises(): + async def _run(): + store = QdrantMemoryStore(process_id="add-006") + try: + await store.add("test", agent_name="A", step="analysis") + assert False, "Should have raised RuntimeError" + except RuntimeError as e: + assert "not initialized" in str(e) + + asyncio.run(_run()) + + +def test_add_with_embedding_failure_returns_empty(): + async def _run(): + client = _make_failing_embedding_client() + store = QdrantMemoryStore(process_id="add-007") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + result = await store.add("content", agent_name="A", step="analysis") + assert result == "" + + await store.close() + + asyncio.run(_run()) + + +# --------------------------------------------------------------------------- +# Search +# --------------------------------------------------------------------------- + + +def test_search_returns_results(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="search-001") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + await store.add("GKE Filestore CSI", agent_name="GKE", step="analysis", turn=1) + await store.add("AKS Azure Files", agent_name="AKS", step="analysis", turn=2) + + results = await store.search("storage drivers", top_k=5) + assert len(results) == 2 + assert all(r.content for r in results) + assert all(r.score > 0 for r in results) + + await store.close() + + asyncio.run(_run()) + + +def test_search_empty_store(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="search-002") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + results = await store.search("anything") + assert results == [] + + await store.close() + + asyncio.run(_run()) + + +def test_search_respects_top_k(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="search-003") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + for i in range(5): + await store.add(f"Entry {i}", agent_name="A", step="analysis", turn=i) + + results = await store.search("entry", top_k=3) + assert len(results) <= 3 + + await store.close() + + asyncio.run(_run()) + + +def test_search_uninitialized_returns_empty(): + async def _run(): + store = QdrantMemoryStore(process_id="search-004") + results = await store.search("anything") + assert results == [] + + asyncio.run(_run()) + + +def test_search_with_embedding_failure(): + async def _run(): + embedding_obj = MagicMock() + embedding_obj.embedding = [0.1] * 3072 + ok_response = MagicMock() + ok_response.data = [embedding_obj] + + client = AsyncMock() + client.embeddings.create = AsyncMock( + side_effect=[ok_response, Exception("API error")] + ) + + store = QdrantMemoryStore(process_id="search-005") + await store.initialize(embedding_client=client, embedding_deployment="emb") + await store.add("content", agent_name="A", step="analysis", turn=1) + + results = await store.search("query") + assert results == [] + + await store.close() + + asyncio.run(_run()) + + +def test_search_result_fields(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="search-006") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + await store.add( + "Karpenter for scaling", agent_name="AKS Expert", step="design", turn=5 + ) + + results = await store.search("scaling") + assert len(results) == 1 + entry = results[0] + assert entry.content == "Karpenter for scaling" + assert entry.agent_name == "AKS Expert" + assert entry.step == "design" + assert entry.turn == 5 + assert entry.memory_id + assert isinstance(entry.score, float) + + await store.close() + + asyncio.run(_run()) + + +# --------------------------------------------------------------------------- +# Workflow Lifecycle +# --------------------------------------------------------------------------- + + +def test_memories_persist_across_steps(): + """Analysis adds memories, design reads them — simulating workflow scope.""" + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="lifecycle-001") + await store.initialize(embedding_client=client, embedding_deployment="emb") + + # Analysis step + await store.add("GKE 3 node pools", agent_name="GKE", step="analysis", turn=1) + await store.add("GPU training nodes", agent_name="AKS", step="analysis", turn=2) + + # Design step reads analysis + results = await store.search("node pools", top_k=5) + assert len(results) == 2 + + # Design adds its own + await store.add("Use NC6s_v3 for GPU", agent_name="Arch", step="design", turn=3) + assert await store.get_count() == 3 + + # Convert step sees all + results = await store.search("GPU", top_k=10) + assert len(results) == 3 + + await store.close() + + asyncio.run(_run()) + + +def test_fresh_store_per_process(): + """Different process IDs get independent stores.""" + async def _run(): + client = _make_embedding_client() + s1 = QdrantMemoryStore(process_id="proc-1") + s2 = QdrantMemoryStore(process_id="proc-2") + + await s1.initialize(embedding_client=client, embedding_deployment="emb") + await s2.initialize(embedding_client=client, embedding_deployment="emb") + + await s1.add("Only in proc 1", agent_name="A", step="analysis") + assert await s1.get_count() == 1 + assert await s2.get_count() == 0 + + await s1.close() + await s2.close() + + asyncio.run(_run()) + + +def test_close_disposes_all_memories(): + async def _run(): + client = _make_embedding_client() + store = QdrantMemoryStore(process_id="dispose-001") + await store.initialize(embedding_client=client, embedding_deployment="emb") + await store.add("content", agent_name="A", step="analysis") + assert await store.get_count() == 1 + + await store.close() + assert await store.get_count() == 0 + assert await store.search("anything") == [] + + asyncio.run(_run()) diff --git a/src/processor/src/tests/unit/libs/agent_framework/test_shared_memory_context_provider.py b/src/processor/src/tests/unit/libs/agent_framework/test_shared_memory_context_provider.py new file mode 100644 index 0000000..1d75ee7 --- /dev/null +++ b/src/processor/src/tests/unit/libs/agent_framework/test_shared_memory_context_provider.py @@ -0,0 +1,306 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Unit tests for SharedMemoryContextProvider.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock + +from libs.agent_framework.qdrant_memory_store import MemoryEntry +from libs.agent_framework.shared_memory_context_provider import ( + MAX_MEMORY_CONTEXT_CHARS, + MIN_CONTENT_LENGTH_TO_STORE, + SharedMemoryContextProvider, +) + + +def _make_chat_message(text: str, role: str = "assistant") -> MagicMock: + msg = MagicMock() + msg.text = text + msg.content = text + msg.role = MagicMock() + msg.role.value = role + return msg + + +def _make_memory_entry( + content: str, + agent_name: str = "Agent", + step: str = "analysis", + turn: int = 1, + score: float = 0.9, +) -> MemoryEntry: + return MemoryEntry( + content=content, + agent_name=agent_name, + step=step, + turn=turn, + score=score, + memory_id="test-id", + ) + + +def _make_mock_store(): + store = AsyncMock() + store.search = AsyncMock(return_value=[]) + store.add = AsyncMock(return_value="test-id") + return store + + +def _make_provider(store=None): + if store is None: + store = _make_mock_store() + return SharedMemoryContextProvider( + memory_store=store, + agent_name="AKS Expert", + step="design", + top_k=5, + score_threshold=0.3, + ), store + + +# --------------------------------------------------------------------------- +# invoking() — Pre-LLM memory injection +# --------------------------------------------------------------------------- + + +def test_invoking_injects_memories(): + async def _run(): + provider, store = _make_provider() + store.search.return_value = [ + _make_memory_entry("GKE Filestore CSI", agent_name="GKE Expert"), + _make_memory_entry("Azure Files for AKS", agent_name="AKS Expert"), + ] + messages = [_make_chat_message("How should we handle storage configuration?")] + + context = await provider.invoking(messages) + + assert context.instructions is not None + assert "GKE Filestore CSI" in context.instructions + assert "Azure Files for AKS" in context.instructions + store.search.assert_called_once() + + asyncio.run(_run()) + + +def test_invoking_empty_messages_returns_empty(): + async def _run(): + provider, _ = _make_provider() + context = await provider.invoking([]) + assert context.instructions is None + assert context.messages == [] + + asyncio.run(_run()) + + +def test_invoking_no_memories_returns_empty(): + async def _run(): + provider, store = _make_provider() + store.search.return_value = [] + messages = [_make_chat_message("What is the overall migration plan for AKS?")] + + context = await provider.invoking(messages) + assert context.instructions is None + + asyncio.run(_run()) + + +def test_invoking_search_failure_graceful(): + async def _run(): + provider, store = _make_provider() + store.search.side_effect = Exception("search failed") + messages = [_make_chat_message("What is the networking plan for AKS?")] + + context = await provider.invoking(messages) + assert context.instructions is None + + asyncio.run(_run()) + + +def test_invoking_truncates_long_query(): + async def _run(): + provider, store = _make_provider() + long_text = "x" * 5000 + messages = [_make_chat_message(long_text)] + + await provider.invoking(messages) + + query = store.search.call_args.kwargs["query"] + assert len(query) <= 2000 + + asyncio.run(_run()) + + +def test_invoking_uses_last_message_as_query(): + async def _run(): + provider, store = _make_provider() + messages = [ + _make_chat_message("First"), + _make_chat_message("Second"), + _make_chat_message("Latest question about storage"), + ] + + await provider.invoking(messages) + + query = store.search.call_args.kwargs["query"] + assert "Latest question about storage" in query + + asyncio.run(_run()) + + +def test_invoking_respects_max_context_chars(): + async def _run(): + provider, store = _make_provider() + large_memories = [ + _make_memory_entry("x" * 4000, agent_name=f"Agent{i}") for i in range(10) + ] + store.search.return_value = large_memories + messages = [_make_chat_message("What storage configuration should we use for persistent volumes?")] + + context = await provider.invoking(messages) + + assert context.instructions is not None + assert len(context.instructions) <= MAX_MEMORY_CONTEXT_CHARS + 200 + + asyncio.run(_run()) + + +def test_invoking_formats_with_agent_and_step(): + async def _run(): + provider, store = _make_provider() + store.search.return_value = [ + _make_memory_entry("Use Premium SSD", agent_name="Chief Architect", step="design"), + ] + messages = [_make_chat_message("What storage class should we choose for the cluster?")] + + context = await provider.invoking(messages) + + assert "Chief Architect" in context.instructions + assert "design" in context.instructions + + asyncio.run(_run()) + + +def test_invoking_with_single_message(): + async def _run(): + provider, store = _make_provider() + store.search.return_value = [_make_memory_entry("some memory")] + single = _make_chat_message("What about networking configuration for AKS?") + + context = await provider.invoking(single) + + assert context.instructions is not None + store.search.assert_called_once() + + asyncio.run(_run()) + + +# --------------------------------------------------------------------------- +# invoked() — Post-LLM memory storage +# --------------------------------------------------------------------------- + + +def test_invoked_stores_response(): + async def _run(): + provider, store = _make_provider() + request = [_make_chat_message("What is the networking plan for AKS?")] + response = [_make_chat_message("We should use Azure CNI for networking configuration in the AKS cluster")] + + await provider.invoked(request, response) + await provider.flush() + + store.add.assert_called_once() + kwargs = store.add.call_args + assert kwargs.kwargs["agent_name"] == "AKS Expert" + assert kwargs.kwargs["step"] == "design" + + asyncio.run(_run()) + + +def test_invoked_skips_on_exception(): + async def _run(): + provider, store = _make_provider() + request = [_make_chat_message("Q")] + response = [_make_chat_message("A" * 100)] + + await provider.invoked(request, response, invoke_exception=Exception("fail")) + store.add.assert_not_called() + + asyncio.run(_run()) + + +def test_invoked_skips_none_response(): + async def _run(): + provider, store = _make_provider() + request = [_make_chat_message("Q")] + + await provider.invoked(request, None) + store.add.assert_not_called() + + asyncio.run(_run()) + + +def test_invoked_skips_short_response(): + async def _run(): + provider, store = _make_provider() + request = [_make_chat_message("Q")] + short = [_make_chat_message("x" * (MIN_CONTENT_LENGTH_TO_STORE - 1))] + + await provider.invoked(request, short) + store.add.assert_not_called() + + asyncio.run(_run()) + + +def test_invoked_stores_long_response(): + async def _run(): + provider, store = _make_provider() + request = [_make_chat_message("Q")] + long_resp = [_make_chat_message("x" * (MIN_CONTENT_LENGTH_TO_STORE + 1))] + + await provider.invoked(request, long_resp) + await provider.flush() + store.add.assert_called_once() + + asyncio.run(_run()) + + +def test_invoked_increments_turn_counter(): + async def _run(): + provider, store = _make_provider() + request = [_make_chat_message("Q")] + response = [_make_chat_message("A" * 100)] + + await provider.invoked(request, response) + await provider.invoked(request, response) + assert provider._turn_counter == 2 + + asyncio.run(_run()) + + +def test_invoked_store_failure_does_not_raise(): + async def _run(): + provider, store = _make_provider() + store.add.side_effect = Exception("store failed") + request = [_make_chat_message("Q")] + response = [_make_chat_message("A" * 100)] + + await provider.invoked(request, response) + await provider.flush() # Should not raise + + asyncio.run(_run()) + + +def test_invoked_with_single_message(): + async def _run(): + provider, store = _make_provider() + request = _make_chat_message("What is the question about networking?") + response = _make_chat_message("We should use Azure CNI Overlay for the networking configuration in AKS") + + await provider.invoked(request, response) + await provider.flush() + store.add.assert_called_once() + + asyncio.run(_run()) diff --git a/src/processor/src/tests/unit/steps/analysis/test_analysis_executor.py b/src/processor/src/tests/unit/steps/analysis/test_analysis_executor.py index 0cf18a1..c0f2691 100644 --- a/src/processor/src/tests/unit/steps/analysis/test_analysis_executor.py +++ b/src/processor/src/tests/unit/steps/analysis/test_analysis_executor.py @@ -82,7 +82,7 @@ async def execute(self, task_param=None): await executor.handle_execute(message, ctx) # type: ignore[arg-type] - assert telemetry.transitions == [("p1", "analysis", "Analysis")] + assert telemetry.transitions == [("p1", "analysis", "Initializing Analysis")] assert len(ctx.sent) == 1 assert len(ctx.yielded) == 0 assert isinstance(ctx.sent[0], Analysis_BooleanExtendedResult) @@ -133,7 +133,7 @@ async def execute(self, task_param=None): await executor.handle_execute(message, ctx) # type: ignore[arg-type] - assert telemetry.transitions == [("p1", "analysis", "Analysis")] + assert telemetry.transitions == [("p1", "analysis", "Initializing Analysis")] assert len(ctx.sent) == 0 assert len(ctx.yielded) == 1 assert isinstance(ctx.yielded[0], Analysis_BooleanExtendedResult)