Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# xAI API Key for AI Investment Analysis (required)
# Get your API key from: https://console.x.ai/
OPENAI_API_KEY=your_xai_api_key_here

# xAI Model Configuration
XAI_MODEL_S=grok-3-mini
XAI_MODEL_M=grok-3-fast
XAI_MODEL_L=grok-3-latest
XAI_MODEL_XL=grok-4-0709
39 changes: 39 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
from typing import List

from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse
from fastapi_mcp import FastApiMCP

from app.models import CompanySearchResult
from app.services.ai_investment import investment_agent
from app.services.search import search_companies
from app.utils import lessthan_x

Expand All @@ -21,7 +24,43 @@
def search_company(
company_name: str = Query(..., description="Company name to search for")
) -> List[CompanySearchResult]:
print("company_name", company_name)
return search_companies(company_name)


@app.get("/investment-analysis")
async def get_investment_analysis(
ticker_symbol: str = Query(
..., description="Stock ticker symbol to analyze for investment"
)
):
"""
Stream AI-powered investment analysis for a given stock ticker.
"""

def investment_stream():
try:
analysis_generator = investment_agent(ticker_symbol)
for chunk in analysis_generator:
if chunk:
yield str(chunk)
except Exception as error:
yield f"Error during analysis: {str(error)}\n"
yield "Please try again or contact support.\n"

async def async_investment_stream():
for chunk in investment_stream():
await asyncio.sleep(0) # Yield control to event loop
yield chunk

return StreamingResponse(
async_investment_stream(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)


mcp.setup_server()
170 changes: 170 additions & 0 deletions app/services/ai_investment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import os

from agno.agent import Agent
from agno.models.xai import xAI
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.yfinance import YFinanceTools
from dotenv import load_dotenv

from app.utils import validate_env_variables

# Load environment variables from .env file
load_dotenv()

# Validate all required environment variables
_, xai_model_s, xai_model_m, xai_model_l, xai_model_xl = validate_env_variables(
["OPENAI_API_KEY", "XAI_MODEL_S", "XAI_MODEL_M", "XAI_MODEL_L", "XAI_MODEL_XL"]
)

# Agent 1: Market Research Agent - Uses web search to gather market news and trends
market_research_agent = Agent(
name="Market Research Agent",
role="Gather latest market news, trends, and qualitative information from the web",
model=xAI(id=xai_model_s),
tools=[DuckDuckGoTools()],
instructions=[
"Always cite sources and provide summaries of key articles or reports",
"Keep tool calls simple and avoid complex JSON structures",
"Be precise with search queries",
"If tool calls fail, provide analysis based on general knowledge",
],
markdown=True,
show_tool_calls=True,
)

# Agent 2: Financial Data Agent - Retrieves quantitative financial data using YFinance
financial_data_agent = Agent(
name="Financial Data Agent",
role="Fetch stock prices, analyst recommendations, company info, and financial metrics",
model=xAI(id=xai_model_m),
tools=[
YFinanceTools(
stock_price=True,
analyst_recommendations=True,
company_info=True,
historical_prices=True,
)
],
instructions=[
"Present financial data in tables for clarity and include historical trends where relevant",
"Use precise stock symbols and avoid ambiguous queries",
"Keep tool calls simple and properly formatted",
"If tool calls fail, provide analysis based on general knowledge",
],
markdown=True,
show_tool_calls=True,
)

# Agent 3: Investment Analysis Agent - Analyzes data from other
# agents to provide insights and recommendations
investment_analysis_agent = Agent(
name="Investment Analysis Agent",
role="Analyze market research and financial data to provide investment insights,"
" risks, and recommendations",
model=xAI(id=xai_model_xl),
instructions="Synthesize information from team members, evaluate risks, and "
"suggest buy/sell/hold recommendations with reasoning",
markdown=True,
show_tool_calls=True,
)

# Create the main Investment Team Agent that coordinates the specialized agents
investment_team = Agent(
team=[market_research_agent, financial_data_agent, investment_analysis_agent],
model=xAI(id=xai_model_l),
instructions=[
"Coordinate between agents to gather comprehensive data",
"Ensure responses are well-structured with sections for research, data, and analysis",
"Always include sources and use tables for data",
"Keep tool calls simple and avoid complex JSON structures",
],
markdown=True,
show_tool_calls=False,
)


def investment_agent(ticker_symbol: str):
"""
Investment analysis using multi-agent approach.
Returns a generator for streaming responses.
"""

def generate_streaming_analysis():
"""Generate streaming analysis using the investment team"""
try:
yield f"🔍 Starting comprehensive investment analysis for {ticker_symbol}...\n\n"

# Use the run method to get the complete response
response = investment_team.run(
f"Provide a detailed investment analysis for {ticker_symbol} stock, including market outlook, financial performance, and recommendations." # noqa: E501
)

# Extract and stream the content
if hasattr(response, "content") and response.content:
content = response.content
# Stream in chunks for better user experience
chunk_size = 150
for i in range(0, len(content), chunk_size):
chunk = content[i : i + chunk_size] # noqa : E203
yield chunk
else:
yield f"Analysis completed for {ticker_symbol}"

except Exception as e:
yield f"❌ Error during analysis: {str(e)}\n"
yield "🔄 Attempting simplified analysis...\n"

try:
# Fallback to simple analysis
simplified_response = investment_analysis_agent.run(
f"Provide a basic investment analysis for {ticker_symbol} based on general knowledge." # noqa: E501
)
if (
hasattr(simplified_response, "content")
and simplified_response.content
):
yield simplified_response.content
else:
yield f"Basic analysis completed for {ticker_symbol}"
except Exception as fallback_error:
yield f"Unable to complete analysis: {fallback_error}\n"

return generate_streaming_analysis()


# To test the code through the command line instead of FastAPI
if __name__ == "__main__":
import sys

# Add the project root to Python path for imports
project_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
sys.path.insert(0, project_root)

# Check if company name is provided as command line argument
if len(sys.argv) < 2:
print("Usage: python -m app.services.ai_investment <stock ticker>")
print("Example: python -m app.services.ai_investment NVDA")
print("Example: python -m app.services.ai_investment TSLA")
sys.exit(1)

stock_ticker = " ".join(
sys.argv[1:]
) # Join all arguments in case company name has spaces
print(f"Starting investment analysis for: {stock_ticker}")
print("=" * 50)
print("Note: Using simplified approach to avoid JSON parsing issues")
print("=" * 50)

try:
# Run the investment analysis
analysis_gen = investment_agent(stock_ticker)
for chunk in analysis_gen:
print(chunk, end="")
print("\n" + "=" * 50)
print("Analysis completed successfully!")
except Exception as e:
print(f"Error: {e}")
print("Try using the stock symbol instead (e.g., TSLA instead of Tesla)")
sys.exit(1)
27 changes: 27 additions & 0 deletions app/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,36 @@
import inspect
import os
from functools import wraps
from typing import List, Tuple

from fastapi import HTTPException


def validate_env_variables(env_var_names: List[str]) -> Tuple[str, ...]:
"""
Validate that multiple environment variables are set and return their values.

Args:
env_var_names (List[str]): List of environment variable names to check

Returns:
Tuple[str, ...]: Tuple of environment variable values in the same order as input

Raises:
ValueError: If any environment variable is not set
"""
values = []
for var_name in env_var_names:
value = os.getenv(var_name)
if not value:
raise ValueError(
f"{var_name} environment variable is required. Please set it in your .env file." # noqa: E501
)
values.append(value)

return tuple(values)


def lessthan_x(x: int, arg_name=None, message="Input is too short."):
"""
Decorator factory to validate the minimum length of a string argument for FastAPI endpoints.
Expand Down
Loading