-
Notifications
You must be signed in to change notification settings - Fork 3
Fast search parallel #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| from typing import List, Optional | ||
| import hashlib | ||
| from typing import Dict, List, Optional | ||
|
|
||
| from azure.ai.inference import EmbeddingsClient | ||
| from azure.core.credentials import AzureKeyCredential | ||
|
|
@@ -24,6 +25,9 @@ def __init__( | |
| self.model_name = model_name or "text-embedding-3-large" | ||
| self.token = token or settings.github_token | ||
|
|
||
| # Instance-level cache for query embeddings (saves API calls in reflexion cycles) | ||
| self._embedding_cache: Dict[str, List[float]] = {} | ||
|
|
||
| if not self.token: | ||
| raise EmbeddingException("GitHub token is required for Azure AI Inference") | ||
|
|
||
|
|
@@ -66,7 +70,14 @@ def _extract_embedding(self, embedding_data) -> List[float]: | |
| ) | ||
|
|
||
| async def embed_text(self, text: str) -> List[float]: | ||
| """Embed single text using Azure AI Inference""" | ||
| """Embed single text using Azure AI Inference with caching""" | ||
| # Check cache first (saves 300-900ms per repeated query) | ||
| cache_key = hashlib.md5(text.encode()).hexdigest() | ||
|
|
||
| if cache_key in self._embedding_cache: | ||
| logger.debug("Using cached query embedding", text_length=len(text)) | ||
| return self._embedding_cache[cache_key] | ||
|
|
||
| try: | ||
| response = self.client.embed(input=[text]) | ||
|
|
||
|
|
@@ -77,6 +88,9 @@ async def embed_text(self, text: str) -> List[float]: | |
| raw_embedding = response.data[0].embedding | ||
| embedding = self._extract_embedding(raw_embedding) | ||
|
|
||
| # Cache the embedding for future use | ||
| self._embedding_cache[cache_key] = embedding | ||
|
|
||
|
Comment on lines
28
to
+93
|
||
| logger.debug( | ||
| "Text embedded successfully", | ||
| text_length=len(text), | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -8,21 +8,38 @@ | |||||
|
|
||||||
|
|
||||||
| class ReflexionMemoryCache: | ||||||
| """Memory cache for reflexion loops with LRU eviction""" | ||||||
| """Memory cache for reflexion loops with LRU eviction and TTL""" | ||||||
|
|
||||||
| def __init__(self, max_size: Optional[int] = None): | ||||||
| self.max_size = max_size or settings.max_cache_size | ||||||
| def __init__(self, max_size: Optional[int] = None, cache_ttl: int = 86400): | ||||||
| self.max_size = max_size or 500 # Increased from 100 to 500 | ||||||
|
||||||
| self.max_size = max_size or 500 # Increased from 100 to 500 | |
| self.max_size = max_size or settings.max_cache_size |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -150,24 +150,46 @@ async def query_with_reflexion_stream( | |||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| if should_perform_web_search: | ||||||||||||||
| # Use combined search with token limits | ||||||||||||||
| # TRUE PARALLEL execution - DB search + web search simultaneously | ||||||||||||||
| logger.info( | ||||||||||||||
| "Performing combined DB and web search", | ||||||||||||||
| "Performing parallel DB and web search", | ||||||||||||||
| cycle=cycle_number, | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| # Use the combined search method with proper limits | ||||||||||||||
| k_docs = min(k, 2) # Limit DB docs | ||||||||||||||
| k_web = min( | ||||||||||||||
| settings.web_search_retrieval_k, 2 | ||||||||||||||
| ) # Limit web docs | ||||||||||||||
|
|
||||||||||||||
| try: | ||||||||||||||
| retrieved_docs = ( | ||||||||||||||
| await self.vector_store.similarity_search_combined( | ||||||||||||||
| # Execute DB search and web search in parallel | ||||||||||||||
| db_task = asyncio.create_task( | ||||||||||||||
| self.vector_store.similarity_search_combined( | ||||||||||||||
| current_query, k_docs=k_docs, k_web=k_web | ||||||||||||||
| ) | ||||||||||||||
| ) | ||||||||||||||
| web_task = asyncio.create_task( | ||||||||||||||
| self._perform_web_search(current_query) | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| # Wait for both to complete | ||||||||||||||
| results = await asyncio.gather( | ||||||||||||||
| db_task, web_task, return_exceptions=True | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| # Handle results with explicit type assertions | ||||||||||||||
| retrieved_docs = [] | ||||||||||||||
| web_search_results = [] | ||||||||||||||
|
|
||||||||||||||
| if isinstance(results[0], Exception): | ||||||||||||||
| logger.error("DB retrieval error", error=str(results[0])) | ||||||||||||||
| elif isinstance(results[0], list): | ||||||||||||||
| retrieved_docs = results[0] | ||||||||||||||
|
|
||||||||||||||
| if isinstance(results[1], Exception): | ||||||||||||||
| logger.error("Web search error", error=str(results[1])) | ||||||||||||||
| elif isinstance(results[1], list): | ||||||||||||||
| web_search_results = results[1] | ||||||||||||||
|
|
||||||||||||||
| # Separate for reporting | ||||||||||||||
| db_docs = [ | ||||||||||||||
|
|
@@ -181,13 +203,8 @@ async def query_with_reflexion_stream( | |||||||||||||
| if d.metadata.get("source_type") == "web_search" | ||||||||||||||
| ] | ||||||||||||||
|
|
||||||||||||||
| # For web search results, we still need to perform the search for new content | ||||||||||||||
| web_search_results = await self._perform_web_search( | ||||||||||||||
| current_query | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| logger.info( | ||||||||||||||
| "Combined retrieval completed", | ||||||||||||||
| "Parallel retrieval completed", | ||||||||||||||
| db_docs=len(db_docs), | ||||||||||||||
| web_docs=len(web_docs), | ||||||||||||||
| new_web_results=len(web_search_results), | ||||||||||||||
|
|
@@ -257,8 +274,12 @@ async def query_with_reflexion_stream( | |||||||||||||
| current_query, truncated_context, cycle_number | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| # Step 4: Generate partial answer | ||||||||||||||
| # Step 4: Generate partial answer with PARALLEL EVALUATION & EARLY STOPPING | ||||||||||||||
| partial_answer_chunks = [] | ||||||||||||||
| eval_task = None | ||||||||||||||
| eval_started = False | ||||||||||||||
| early_stopped = False | ||||||||||||||
|
|
||||||||||||||
| try: | ||||||||||||||
| async for chunk in self.generation_llm.generate_stream( | ||||||||||||||
| generation_prompt | ||||||||||||||
|
|
@@ -276,6 +297,35 @@ async def query_with_reflexion_stream( | |||||||||||||
| "web_results_count": len(web_search_results), | ||||||||||||||
| }, | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| partial_content = "".join(partial_answer_chunks) | ||||||||||||||
|
|
||||||||||||||
| # HEURISTIC EARLY STOPPING: Check quick confidence at ~50% through expected answer | ||||||||||||||
| if len(partial_content) > 750 and cycle_number == 1: | ||||||||||||||
| quick_confidence = self.reflexion_evaluator.predict_quick_confidence( | ||||||||||||||
| partial_content | ||||||||||||||
| ) | ||||||||||||||
| if quick_confidence >= 0.92: | ||||||||||||||
| logger.info( | ||||||||||||||
| f"High confidence detected early ({quick_confidence:.2f}), stopping generation" | ||||||||||||||
| ) | ||||||||||||||
| early_stopped = True | ||||||||||||||
|
||||||||||||||
| early_stopped = True |
Copilot
AI
Feb 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
| pass | |
| logger.debug( | |
| "Speculative followup task was cancelled as expected", | |
| cycle=cycle_number, | |
| ) |
Copilot
AI
Feb 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
| pass | |
| # Speculative follow-up task was cancelled intentionally; safe to ignore. | |
| logger.debug( | |
| "Speculative follow-up task cancelled", | |
| cycle=cycle_number, | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unbounded cache may cause memory issues in long-running processes.
The
_embedding_cachedictionary grows indefinitely without any eviction policy or size limit. For a long-running service processing many unique queries, this could lead to memory exhaustion.Consider adding a max size with LRU eviction (similar to
ReflexionMemoryCache) or usefunctools.lru_cacheon the text hash.🔧 Proposed fix using a bounded cache
Then in
embed_text, after storing:🤖 Prompt for AI Agents