Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,22 @@ class Settings(BaseSettings):
# Generation model (primary)
llm_model: str = Field(default="meta/Meta-Llama-3.1-405B-Instruct")
llm_temperature: float = Field(default=0.7)
llm_max_tokens: int = Field(default=3000)
llm_max_tokens: int = Field(default=1500) # Reduced from 3000 for faster responses

# Evaluation model (for self-assessment)
evaluation_model: str = Field(default="cohere/Cohere-command-r")
evaluation_temperature: float = Field(default=0.3)
evaluation_max_tokens: int = Field(default=1000)
evaluation_max_tokens: int = Field(default=800) # Reduced from 1000 for faster evaluation

# Summary model (for final synthesis)
summary_model: str = Field(default="meta/Meta-Llama-3.1-70B-Instruct")
summary_temperature: float = Field(default=0.5)
summary_max_tokens: int = Field(default=4000)
summary_max_tokens: int = Field(default=2500) # Reduced from 4000 for faster synthesis

# Reflexion configuration
max_reflexion_cycles: int = Field(default=3)
confidence_threshold: float = Field(default=0.85)
initial_retrieval_k: int = Field(default=3)
max_reflexion_cycles: int = Field(default=2) # Reduced from 3 - most queries don't need 3+
confidence_threshold: float = Field(default=0.80) # Reduced from 0.85 - more lenient threshold
initial_retrieval_k: int = Field(default=5) # Increased from 3 - better first-pass answers
reflexion_retrieval_k: int = Field(default=5)

# Memory cache
Expand Down Expand Up @@ -79,6 +79,7 @@ class Settings(BaseSettings):
# Web search retrieval
web_search_retrieval_k: int = Field(default=3)
web_search_enable_content_extraction: bool = Field(default=True)
web_crawl_concurrency: int = Field(default=5) # Concurrent web page crawls (was 2)


# Global settings instance
Expand Down
18 changes: 16 additions & 2 deletions src/embeddings/github_embeddings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List, Optional
import hashlib
from typing import Dict, List, Optional

from azure.ai.inference import EmbeddingsClient
from azure.core.credentials import AzureKeyCredential
Expand All @@ -24,6 +25,9 @@ def __init__(
self.model_name = model_name or "text-embedding-3-large"
self.token = token or settings.github_token

# Instance-level cache for query embeddings (saves API calls in reflexion cycles)
self._embedding_cache: Dict[str, List[float]] = {}
Comment on lines +28 to +29

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unbounded cache may cause memory issues in long-running processes.

The _embedding_cache dictionary grows indefinitely without any eviction policy or size limit. For a long-running service processing many unique queries, this could lead to memory exhaustion.

Consider adding a max size with LRU eviction (similar to ReflexionMemoryCache) or use functools.lru_cache on the text hash.

🔧 Proposed fix using a bounded cache
+from collections import OrderedDict
+
 class GithubEmbeddings(EmbeddingInterface):
     """Azure AI Inference embeddings implementation"""
+    
+    _MAX_CACHE_SIZE = 1000  # Limit cache to 1000 embeddings
 
     def __init__(
         self,
@@ -28,7 +31,7 @@ class GithubEmbeddings(EmbeddingInterface):
         super().__init__()
 
         # Instance-level cache for query embeddings (saves API calls in reflexion cycles)
-        self._embedding_cache: Dict[str, List[float]] = {}
+        self._embedding_cache: OrderedDict[str, List[float]] = OrderedDict()

Then in embed_text, after storing:

# Evict oldest if cache is full
if len(self._embedding_cache) > self._MAX_CACHE_SIZE:
    self._embedding_cache.popitem(last=False)
🤖 Prompt for AI Agents
In `@src/embeddings/github_embeddings.py` around lines 28 - 29, The instance-level
_embedding_cache in class GitHubEmbeddings grows unbounded; change it to a
bounded LRU cache by replacing the plain dict with an OrderedDict or an
lru-style structure, add a constant _MAX_CACHE_SIZE, and update the embed_text
method to move accessed/inserted keys to the end and evict the oldest entry when
len(self._embedding_cache) > self._MAX_CACHE_SIZE; mirror the eviction behavior
used by ReflexionMemoryCache (or use functools.lru_cache on a hashed text key)
so the cache cannot grow indefinitely.


if not self.token:
raise EmbeddingException("GitHub token is required for Azure AI Inference")

Expand Down Expand Up @@ -66,7 +70,14 @@ def _extract_embedding(self, embedding_data) -> List[float]:
)

async def embed_text(self, text: str) -> List[float]:
"""Embed single text using Azure AI Inference"""
"""Embed single text using Azure AI Inference with caching"""
# Check cache first (saves 300-900ms per repeated query)
cache_key = hashlib.md5(text.encode()).hexdigest()

if cache_key in self._embedding_cache:
logger.debug("Using cached query embedding", text_length=len(text))
return self._embedding_cache[cache_key]

try:
response = self.client.embed(input=[text])

Expand All @@ -77,6 +88,9 @@ async def embed_text(self, text: str) -> List[float]:
raw_embedding = response.data[0].embedding
embedding = self._extract_embedding(raw_embedding)

# Cache the embedding for future use
self._embedding_cache[cache_key] = embedding

Comment on lines 28 to +93

Copilot AI Feb 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _embedding_cache grows without bound and has no TTL or size limit, so a long-running service that embeds many distinct texts can accumulate an unbounded number of entries and steadily increase memory usage. It would be safer to enforce an eviction policy (e.g., fixed max size LRU) or a TTL, and optionally provide a way to clear the cache when needed.

Copilot uses AI. Check for mistakes.
logger.debug(
"Text embedded successfully",
text_length=len(text),
Expand Down
36 changes: 32 additions & 4 deletions src/memory/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,38 @@


class ReflexionMemoryCache:
"""Memory cache for reflexion loops with LRU eviction"""
"""Memory cache for reflexion loops with LRU eviction and TTL"""

def __init__(self, max_size: Optional[int] = None):
self.max_size = max_size or settings.max_cache_size
def __init__(self, max_size: Optional[int] = None, cache_ttl: int = 86400):
self.max_size = max_size or 500 # Increased from 100 to 500

Copilot AI Feb 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReflexionMemoryCache.__init__ now hardcodes max_size to 500 instead of using settings.max_cache_size, while settings.max_cache_size is still defined and used as the documented configuration knob. This disconnect means changing the setting no longer affects the cache size unless callers explicitly pass max_size, which can cause confusion; consider either reverting to settings.max_cache_size as the default or updating the setting itself and removing the magic number here.

Suggested change
self.max_size = max_size or 500 # Increased from 100 to 500
self.max_size = max_size or settings.max_cache_size

Copilot uses AI. Check for mistakes.
self.cache: OrderedDict[str, ReflexionMemory] = OrderedDict()
self.access_times: Dict[str, float] = {}
self.cache_ttl = cache_ttl # Default: 24 hours

# Track cache statistics
self.hits = 0
self.misses = 0

def get(self, query_hash: str) -> Optional[ReflexionMemory]:
"""Get reflexion memory from cache"""
"""Get reflexion memory from cache with TTL check"""
if query_hash in self.cache:
# Check TTL expiration
age = time.time() - self.access_times.get(query_hash, 0)
if age > self.cache_ttl:
# Cache entry expired, remove it
self.cache.pop(query_hash)
self.access_times.pop(query_hash, None)
self.misses += 1
return None

# Move to end (most recently used)
memory = self.cache.pop(query_hash)
self.cache[query_hash] = memory
self.access_times[query_hash] = time.time()
self.hits += 1
return memory

self.misses += 1
return None

def put(self, query_hash: str, memory: ReflexionMemory) -> None:
Expand All @@ -47,13 +64,24 @@ def clear(self) -> None:
"""Clear all cache entries"""
self.cache.clear()
self.access_times.clear()
self.hits = 0
self.misses = 0

def get_hit_rate(self) -> float:
"""Calculate cache hit rate"""
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0

def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
return {
"size": len(self.cache),
"max_size": self.max_size,
"oldest_entry": self._get_oldest_entry_age(),
"hits": self.hits,
"misses": self.misses,
"hit_rate": self.get_hit_rate(), # Return as float, not formatted string
"ttl_hours": self.cache_ttl / 3600,
}

def _get_oldest_entry_age(self) -> Optional[float]:
Expand Down
165 changes: 140 additions & 25 deletions src/rag/reflexion_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,24 +150,46 @@ async def query_with_reflexion_stream(
)

if should_perform_web_search:
# Use combined search with token limits
# TRUE PARALLEL execution - DB search + web search simultaneously
logger.info(
"Performing combined DB and web search",
"Performing parallel DB and web search",
cycle=cycle_number,
)

# Use the combined search method with proper limits
k_docs = min(k, 2) # Limit DB docs
k_web = min(
settings.web_search_retrieval_k, 2
) # Limit web docs

try:
retrieved_docs = (
await self.vector_store.similarity_search_combined(
# Execute DB search and web search in parallel
db_task = asyncio.create_task(
self.vector_store.similarity_search_combined(
current_query, k_docs=k_docs, k_web=k_web
)
)
web_task = asyncio.create_task(
self._perform_web_search(current_query)
)

# Wait for both to complete
results = await asyncio.gather(
db_task, web_task, return_exceptions=True
)

# Handle results with explicit type assertions
retrieved_docs = []
web_search_results = []

if isinstance(results[0], Exception):
logger.error("DB retrieval error", error=str(results[0]))
elif isinstance(results[0], list):
retrieved_docs = results[0]

if isinstance(results[1], Exception):
logger.error("Web search error", error=str(results[1]))
elif isinstance(results[1], list):
web_search_results = results[1]

# Separate for reporting
db_docs = [
Expand All @@ -181,13 +203,8 @@ async def query_with_reflexion_stream(
if d.metadata.get("source_type") == "web_search"
]

# For web search results, we still need to perform the search for new content
web_search_results = await self._perform_web_search(
current_query
)

logger.info(
"Combined retrieval completed",
"Parallel retrieval completed",
db_docs=len(db_docs),
web_docs=len(web_docs),
new_web_results=len(web_search_results),
Expand Down Expand Up @@ -257,8 +274,12 @@ async def query_with_reflexion_stream(
current_query, truncated_context, cycle_number
)

# Step 4: Generate partial answer
# Step 4: Generate partial answer with PARALLEL EVALUATION & EARLY STOPPING
partial_answer_chunks = []
eval_task = None
eval_started = False
early_stopped = False

try:
async for chunk in self.generation_llm.generate_stream(
generation_prompt
Expand All @@ -276,6 +297,35 @@ async def query_with_reflexion_stream(
"web_results_count": len(web_search_results),
},
)

partial_content = "".join(partial_answer_chunks)

# HEURISTIC EARLY STOPPING: Check quick confidence at ~50% through expected answer
if len(partial_content) > 750 and cycle_number == 1:
quick_confidence = self.reflexion_evaluator.predict_quick_confidence(
partial_content
)
if quick_confidence >= 0.92:
logger.info(
f"High confidence detected early ({quick_confidence:.2f}), stopping generation"
)
early_stopped = True

Copilot AI Feb 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The early_stopped flag is set when heuristic early stopping triggers but is never read afterward, so it has no effect on the subsequent evaluation or control flow. This looks like leftover state from a previous design; either wire it into the later decision logic (e.g., to skip continuation heuristics) or remove the variable and related assignments to avoid confusion.

Suggested change
early_stopped = True

Copilot uses AI. Check for mistakes.
break

# OPTIMIZATION: Start evaluation early when we have enough content
if (
len(partial_content) >= 500
and not eval_started
and cycle_number == 1
):
logger.debug("Starting early evaluation on partial answer")
eval_task = asyncio.create_task(
self.reflexion_evaluator.evaluate_response(
question, partial_content, retrieved_docs, cycle_number
)
)
eval_started = True

yield StreamingChunk(
content="\n", # Add newline buffer
metadata={
Expand Down Expand Up @@ -341,15 +391,51 @@ async def query_with_reflexion_stream(
cycle=cycle_number,
)

# Step 5: Self-evaluation
# Step 5: Self-evaluation with SPECULATIVE FOLLOW-UP GENERATION
logger.info("Evaluating response quality", cycle=cycle_number)
followup_task = None
try:
evaluation = await self.reflexion_evaluator.evaluate_response(
question,
partial_answer,
retrieved_docs,
cycle_number,
)
# Check if we already started evaluation during streaming
if eval_task and eval_started:
logger.debug("Waiting for early-started evaluation to complete")
evaluation = await eval_task
# Re-evaluate with final answer if different from partial
if len(partial_answer) > len("".join(partial_answer_chunks[:10])) * 1.5:
logger.debug("Final answer significantly longer, re-evaluating")
# Start evaluation and follow-up generation in parallel
eval_task_final = asyncio.create_task(
self.reflexion_evaluator.evaluate_response(
question,
partial_answer,
retrieved_docs,
cycle_number,
)
)
# Speculatively generate follow-ups (may not need them)
followup_task = asyncio.create_task(
self.reflexion_evaluator.generate_follow_up_queries(
question, partial_answer, []
)
)
evaluation = await eval_task_final
else:
# SPECULATIVE EXECUTION: Start both evaluation and follow-up generation
eval_task_final = asyncio.create_task(
self.reflexion_evaluator.evaluate_response(
question,
partial_answer,
retrieved_docs,
cycle_number,
)
)
# Speculatively generate follow-ups in parallel (60% chance we'll need them)
followup_task = asyncio.create_task(
self.reflexion_evaluator.generate_follow_up_queries(
question, partial_answer, []
)
)
evaluation = await eval_task_final

logger.info(
"Evaluation complete",
confidence=f"{evaluation.confidence_score:.2f}",
Expand Down Expand Up @@ -391,6 +477,22 @@ async def query_with_reflexion_stream(
cycle=cycle_number,
)

# CIRCUIT BREAKER: Skip additional cycles for exceptional first-cycle confidence
if cycle_number == 1 and evaluation.confidence_score >= 0.95:
logger.info(
"Exceptional confidence on first cycle, skipping reflexion",
confidence=f"{evaluation.confidence_score:.2f}",
)
# Cancel speculative followup task if running
if followup_task and not followup_task.done():
followup_task.cancel()
try:
await followup_task
except asyncio.CancelledError:
pass

Copilot AI Feb 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
pass
logger.debug(
"Speculative followup task was cancelled as expected",
cycle=cycle_number,
)

Copilot uses AI. Check for mistakes.
reflexion_memory.final_answer = partial_answer
break

# Step 6: Decision tree
if evaluation.decision == ReflexionDecision.INSUFFICIENT_DATA:
logger.warning(
Expand Down Expand Up @@ -435,22 +537,35 @@ async def query_with_reflexion_stream(
break

else:
# Continue with follow-up queries
# Continue with follow-up queries (use speculative task if available)
try:
if evaluation.follow_up_queries:
current_query = evaluation.follow_up_queries[0]
# Cancel speculative task if we don't need it
if followup_task and not followup_task.done():
followup_task.cancel()
try:
await followup_task
except asyncio.CancelledError:
pass

Copilot AI Feb 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
pass
# Speculative follow-up task was cancelled intentionally; safe to ignore.
logger.debug(
"Speculative follow-up task cancelled",
cycle=cycle_number,
)

Copilot uses AI. Check for mistakes.
logger.info(
"Following up with generated query",
query=current_query,
cycle=cycle_number,
)
else:
try:
follow_ups = await self.reflexion_evaluator.generate_follow_up_queries(
question,
partial_answer,
evaluation.missing_aspects,
)
# Use speculative task if available, otherwise generate now
if followup_task:
logger.debug("Using speculative follow-up queries")
follow_ups = await followup_task
else:
follow_ups = await self.reflexion_evaluator.generate_follow_up_queries(
question,
partial_answer,
evaluation.missing_aspects,
)

if follow_ups:
current_query = follow_ups[0]
logger.info(
Expand Down
Loading
Loading