-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlocal_mcp_server.py
More file actions
260 lines (223 loc) · 10.6 KB
/
local_mcp_server.py
File metadata and controls
260 lines (223 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
"""应用入口,使用 FastMCP 框架实现 MCP Server,通过 HTTP 后台服务暴露接口"""
import argparse
import sys
import logging
import os
from pathlib import Path
from typing import Optional, Any, Dict
from config import get_server_config, get_database_config
from logging_config import setup_logging
from storage.db import get_db
from storage.scheduler import start_scheduler, stop_scheduler
from storage.models import (
save_before_edit,
get_before_edit,
delete_before_edit,
save_session_summary,
save_code_diff_lines,
)
from compute.diff_engine import extract_diff_lines
from utils.port_utils import find_available_port
# 设置日志(初始化为INFO级别,main函数中会根据配置调整)
logger = setup_logging(log_level="INFO", module_name="mcp_server")
def _import_fastmcp():
"""Import FastMCP from the official MCP SDK, avoiding local mcp/ shadowing."""
original_sys_path = list(sys.path)
try:
project_root = os.path.abspath(os.path.dirname(__file__))
sys.path = [p for p in sys.path if p not in ("", ".", project_root)]
from mcp.server.fastmcp import FastMCP # type: ignore
return FastMCP
finally:
sys.path = original_sys_path
def create_mcp_app(host: str = "127.0.0.1", port: int = 8000):
"""创建 FastMCP 应用并注册工具"""
FastMCP = _import_fastmcp()
# FastMCP 构造函数接受 host, port, streamable_http_path 等参数
# streamable_http_path 默认为 '/mcp',但 Cursor 可能请求根路径,所以设置为 '/'
mcp_app = FastMCP(
"local-code-stat",
host=host,
port=port,
streamable_http_path="/" # 设置为根路径,兼容 Cursor 的请求
)
@mcp_app.tool()
def RecordBeforeEdit(session_id: str, file_path: str, code_before: str) -> Dict[str, Any]:
"""
Record the complete code content before file editing (temporary local storage, will be cleaned up automatically, no redundant retention)
Args:
session_id: Current session ID, generated by Agent, must match RecordAfterEdit's session_id
file_path: Absolute path of the target file
code_before: Complete code content before editing (preserve original format, blank lines, ensure accurate line numbers)
Returns:
Dictionary containing status and message
"""
if not session_id or not session_id.strip():
return {"status": "error", "message": "session_id cannot be empty"}
if not file_path or not file_path.strip():
return {"status": "error", "message": "file_path cannot be empty"}
if code_before is None or code_before == "":
return {"status": "error", "message": "code_before cannot be empty"}
success = save_before_edit(session_id=session_id, file_path=file_path, code_before=code_before)
if not success:
return {"status": "error", "message": "Failed to save code before editing"}
logger.info(f"RecordBeforeEdit: session={session_id}, file={file_path}")
return {"status": "success", "message": "Code before editing recorded successfully"}
@mcp_app.tool()
def RecordAfterEdit(
session_id: str,
file_path: str,
code_after: str,
session_info: Optional[str] = None,
) -> Dict[str, Any]:
"""
Record the complete code content after file editing, extract specific diff lines (added/modified) with line numbers, clean up temporary data, and retain only diff information
Args:
session_id: Current session ID, must match RecordBeforeEdit's session_id to associate the same editing operation
file_path: Absolute path of the target file, must match RecordBeforeEdit's file_path
code_after: Complete code content after editing (preserve original format, blank lines, ensure accurate line numbers)
session_info: Optional session supplementary information (e.g., user instructions, Agent type, operation time) for subsequent statistical filtering
Returns:
Dictionary containing status, message, and data
"""
if not session_id or not session_id.strip():
return {"status": "error", "message": "session_id cannot be empty"}
if not file_path or not file_path.strip():
return {"status": "error", "message": "file_path cannot be empty"}
if code_after is None or code_after == "":
return {"status": "error", "message": "code_after cannot be empty"}
code_before = get_before_edit(session_id, file_path)
if code_before is None:
return {
"status": "error",
"message": (
f"Code record before editing not found, please call RecordBeforeEdit Tool first "
f"(session_id={session_id}, file_path={file_path})"
),
}
diff_lines = extract_diff_lines(code_before, code_after)
add_lines_count = sum(1 for d in diff_lines if d["diff_type"] == "add")
modify_lines_count = sum(1 for d in diff_lines if d["diff_type"] == "modify")
total_lines_after = len(code_after.split("\n"))
success = save_session_summary(
session_id=session_id,
file_path=file_path,
add_lines_count=add_lines_count,
modify_lines_count=modify_lines_count,
total_lines_after=total_lines_after,
session_info=session_info,
)
if not success:
return {"status": "error", "message": "Failed to save session summary"}
if diff_lines:
ok = save_code_diff_lines(session_id=session_id, file_path=file_path, diff_lines=diff_lines)
if not ok:
logger.warning("Failed to save diff lines details, but session summary has been saved")
delete_before_edit(session_id, file_path)
logger.info(
f"RecordAfterEdit: session={session_id}, file={file_path}, "
f"add={add_lines_count}, modify={modify_lines_count}, total_diff={len(diff_lines)}"
)
return {
"status": "success",
"message": "Code after editing recorded successfully, diff lines extracted and stored",
"data": {
"add_lines_count": add_lines_count,
"modify_lines_count": modify_lines_count,
"total_diff_lines": len(diff_lines),
"total_lines_after": total_lines_after,
},
}
return mcp_app
def main():
"""主函数:启动 FastMCP HTTP 服务器"""
parser = argparse.ArgumentParser(description="本地MCP Server AI代码统计系统(基于FastMCP)")
parser.add_argument(
"command",
choices=["start"],
help="启动服务器"
)
parser.add_argument(
"--host",
type=str,
help="服务器监听地址(默认从config.json读取)"
)
parser.add_argument(
"--port",
type=int,
help="服务器监听端口(默认从config.json读取)"
)
parser.add_argument(
"--daemon",
action="store_true",
help="后台运行(Windows下使用简单后台进程,Unix下建议使用nohup或systemd)"
)
args = parser.parse_args()
if args.command == "start":
# 初始化数据库和定时任务
try:
db = get_db()
db.initialize()
logger.info("Database initialized successfully")
start_scheduler()
logger.info("Scheduler started (auto backup and cleanup)")
except Exception as e:
logger.error(f"Failed to initialize: {e}", exc_info=True)
sys.exit(1)
# 获取配置
server_config = get_server_config()
host = args.host or server_config["host"]
preferred_port = args.port or server_config["port"]
log_level = server_config.get("log_level", "info")
log_file = server_config.get("log_file")
# 检查端口是否可用,如果被占用则选择随机端口
try:
port = find_available_port(host, preferred_port)
if port != preferred_port:
logger.warning(f"Port {preferred_port} is in use, using port {port} instead")
except RuntimeError as e:
logger.error(f"Failed to find available port: {e}")
sys.exit(1)
# 设置日志(如果配置了日志文件,添加文件handler)
if log_file:
# 添加文件handler到现有logger
from pathlib import Path
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(getattr(logging, log_level.upper(), logging.INFO))
formatter = logging.Formatter(
fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.info(f"Logging to file: {log_file}")
# 设置日志级别
logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
# 创建 FastMCP 应用(传入 host 和 port)
mcp_app = create_mcp_app(host=host, port=port)
logger.info(f"Starting FastMCP Server on {host}:{port} (transport: streamable-http)")
logger.info("MCP Tools available:")
logger.info(" - RecordBeforeEdit: Record code before file editing")
logger.info(" - RecordAfterEdit: Record code after file editing and extract diff lines")
if args.daemon:
# 后台运行(简单实现,生产环境建议使用systemd/supervisor等)
logger.warning("Daemon mode is a simple implementation. For production, use systemd/supervisor/nohup.")
import multiprocessing
def run_server():
mcp_app.run(transport="streamable-http")
process = multiprocessing.Process(target=run_server, daemon=True)
process.start()
logger.info(f"Server started in background (PID: {process.pid})")
logger.info("To stop the server, use: kill <PID>")
else:
# 前台运行 FastMCP HTTP 服务器
try:
mcp_app.run(transport="streamable-http")
except KeyboardInterrupt:
logger.info("Server stopped by user")
stop_scheduler()
logger.info("Scheduler stopped")
if __name__ == "__main__":
main()