-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathmcp_deep_research.py
More file actions
279 lines (238 loc) · 9.52 KB
/
mcp_deep_research.py
File metadata and controls
279 lines (238 loc) · 9.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
#!/usr/bin/env python3
"""
MCP Deep Research Tool
Provides web research capabilities through Model Context Protocol
"""
import os
import json
import asyncio
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
# Load environment variables
try:
from dotenv import load_dotenv
project_root = Path(__file__).parent
env_path = project_root / ".env.local"
load_dotenv(env_path)
except ImportError:
print("Warning: python-dotenv not installed")
# Import the deep research functionality
from src.deep_research import deep_research, write_final_answer, write_final_report
from src.feedback import generate_feedback
# Import MCP with proper error handling
try:
import mcp
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
except ImportError as e:
print(f"Error importing MCP: {e}")
print("Please install MCP with: pip install mcp")
sys.exit(1)
# Create the MCP server instance
server = Server("deep-research")
@server.call_tool()
async def deep_web_research(_name: str, arguments: dict) -> Sequence[TextContent]:
"""
Perform deep web research on any topic using iterative search and analysis.
This tool conducts comprehensive research by:
1. Generating targeted search queries
2. Scraping and analyzing web content
3. Iteratively diving deeper based on findings
4. Producing detailed reports or specific answers
"""
# Extract arguments with defaults
query = arguments.get("query", "")
breadth = arguments.get("breadth", 3)
depth = arguments.get("depth", 2)
output_type = arguments.get("output_type", "report")
generate_followup = arguments.get("generate_followup", True)
# Validate arguments
if not query:
return [TextContent(
type="text",
text="Error: Query parameter is required for research."
)]
# Validate numeric parameters
breadth = max(1, min(10, int(breadth)))
depth = max(1, min(5, int(depth)))
if output_type not in ["report", "answer"]:
output_type = "report"
try:
combined_query = query
# Generate follow-up questions if requested
if generate_followup and output_type == "report":
try:
follow_up_questions = await generate_feedback(query)
if follow_up_questions:
questions_text = "\n".join([f"- {q}" for q in follow_up_questions])
combined_query = f"{query}\n\nAdditional research directions:\n{questions_text}"
except Exception as e:
# Continue without follow-up questions if there's an error
pass
# Perform deep research
result = await deep_research(
query=combined_query,
breadth=breadth,
depth=depth
)
# Generate appropriate output
if output_type == "answer":
# Generate concise answer
answer = await write_final_answer(
prompt=query,
learnings=result.learnings
)
response_text = f"**Research Answer:**\n\n{answer}\n\n"
# Add key findings
if result.learnings:
response_text += "**Key Findings:**\n"
for i, learning in enumerate(result.learnings[:5], 1):
response_text += f"{i}. {learning}\n"
response_text += "\n"
# Add sources
if result.visited_urls:
response_text += f"**Sources:** {len(result.visited_urls)} URLs researched\n"
for url in result.visited_urls[:5]:
response_text += f"- {url}\n"
if len(result.visited_urls) > 5:
response_text += f"... and {len(result.visited_urls) - 5} more sources\n"
else:
# Generate detailed report
report = await write_final_report(
prompt=query,
learnings=result.learnings,
visited_urls=result.visited_urls
)
response_text = f"**Deep Research Report**\n\n{report}\n\n"
response_text += f"**Research Statistics:**\n"
response_text += f"- Breadth: {breadth} queries per iteration\n"
response_text += f"- Depth: {depth} research iterations\n"
response_text += f"- Total findings: {len(result.learnings)}\n"
response_text += f"- Sources analyzed: {len(result.visited_urls)}\n"
return [TextContent(
type="text",
text=response_text
)]
except Exception as e:
error_msg = f"Error during research: {str(e)}"
return [TextContent(
type="text",
text=error_msg
)]
@server.call_tool()
async def generate_research_questions(_name: str, arguments: dict) -> Sequence[TextContent]:
"""
Generate clarifying follow-up questions for a research topic.
This tool helps refine research direction by generating relevant
follow-up questions that can help narrow down or expand the research scope.
"""
query = arguments.get("query", "")
num_questions = arguments.get("num_questions", 3)
if not query:
return [TextContent(
type="text",
text="Error: Query parameter is required."
)]
# Validate num_questions
num_questions = max(1, min(10, int(num_questions)))
try:
questions = await generate_feedback(query, num_questions)
if questions:
response_text = f"**Follow-up Research Questions for:** {query}\n\n"
for i, question in enumerate(questions, 1):
response_text += f"{i}. {question}\n"
response_text += f"\n*These questions can help refine your research direction and ensure comprehensive coverage of the topic.*"
else:
response_text = f"The research query '{query}' appears to be sufficiently clear and specific. No additional clarifying questions are needed."
return [TextContent(
type="text",
text=response_text
)]
except Exception as e:
error_msg = f"Error generating follow-up questions: {str(e)}"
return [TextContent(
type="text",
text=error_msg
)]
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available tools"""
return [
Tool(
name="deep_web_research",
description="Perform comprehensive web research on any topic using iterative search and analysis",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The research question or topic to investigate"
},
"breadth": {
"type": "integer",
"description": "Number of parallel search queries per iteration (1-10, default: 3)",
"minimum": 1,
"maximum": 10,
"default": 3
},
"depth": {
"type": "integer",
"description": "Number of research iterations to perform (1-5, default: 2)",
"minimum": 1,
"maximum": 5,
"default": 2
},
"output_type": {
"type": "string",
"description": "Type of output - 'report' for detailed analysis or 'answer' for concise response",
"enum": ["report", "answer"],
"default": "report"
},
"generate_followup": {
"type": "boolean",
"description": "Whether to generate clarifying follow-up questions (default: True)",
"default": True
}
},
"required": ["query"]
}
),
Tool(
name="generate_research_questions",
description="Generate clarifying follow-up questions for a research topic",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The initial research question or topic"
},
"num_questions": {
"type": "integer",
"description": "Number of questions to generate (1-10, default: 3)",
"minimum": 1,
"maximum": 10,
"default": 3
}
},
"required": ["query"]
}
)
]
async def main():
"""Main entry point for the MCP server"""
try:
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
except Exception as e:
print(f"Error starting MCP server: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())