-
Notifications
You must be signed in to change notification settings - Fork 62
CM-62381-add-session-start-hook #434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| """Reader for ~/.cursor/mcp.json configuration file. | ||
|
|
||
| Extracts MCP server definitions from the Cursor global config file | ||
| for use in AI guardrails data-flow reporting. | ||
| """ | ||
|
|
||
| import json | ||
| from pathlib import Path | ||
| from typing import Optional | ||
|
|
||
| from cycode.logger import get_logger | ||
|
|
||
| logger = get_logger('AI Guardrails Cursor Config') | ||
|
|
||
| _CURSOR_MCP_CONFIG_PATH = Path.home() / '.cursor' / 'mcp.json' | ||
|
|
||
|
|
||
| def load_cursor_config(config_path: Optional[Path] = None) -> Optional[dict]: | ||
| """Load and parse ~/.cursor/mcp.json. | ||
|
|
||
| Args: | ||
| config_path: Override path for testing. Defaults to ~/.cursor/mcp.json. | ||
|
|
||
| Returns: | ||
| Parsed dict or None if file is missing or invalid. | ||
| """ | ||
| path = config_path or _CURSOR_MCP_CONFIG_PATH | ||
| if not path.exists(): | ||
| logger.debug('Cursor MCP config file not found', extra={'path': str(path)}) | ||
| return None | ||
| try: | ||
| content = path.read_text(encoding='utf-8') | ||
| return json.loads(content) | ||
| except Exception as e: | ||
| logger.debug('Failed to load Cursor MCP config file', exc_info=e) | ||
| return None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| import sys | ||
| from typing import Annotated | ||
|
|
||
| import typer | ||
|
|
||
| from cycode.cli.apps.ai_guardrails.consts import AIIDEType | ||
| from cycode.cli.apps.ai_guardrails.scan.claude_config import get_mcp_servers, get_user_email, load_claude_config | ||
| from cycode.cli.apps.ai_guardrails.scan.cursor_config import load_cursor_config | ||
| from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload | ||
| from cycode.cli.apps.ai_guardrails.scan.utils import safe_json_parse | ||
| from cycode.cli.apps.auth.auth_common import get_authorization_info | ||
| from cycode.cli.apps.auth.auth_manager import AuthManager | ||
| from cycode.cli.exceptions.handle_auth_errors import handle_auth_exception | ||
| from cycode.cli.utils.get_api_client import get_ai_security_manager_client | ||
| from cycode.logger import get_logger | ||
|
|
||
| logger = get_logger('AI Guardrails') | ||
|
|
||
|
|
||
| def _build_session_payload(payload: dict, ide: str) -> AIHookPayload: | ||
| """Build an AIHookPayload from a session-start stdin payload.""" | ||
| if ide == AIIDEType.CLAUDE_CODE: | ||
| claude_config = load_claude_config() | ||
| ide_user_email = get_user_email(claude_config) if claude_config else None | ||
|
|
||
| return AIHookPayload( | ||
| conversation_id=payload.get('session_id'), | ||
| ide_user_email=ide_user_email, | ||
| model=payload.get('model'), | ||
| ide_provider=AIIDEType.CLAUDE_CODE.value, | ||
| ide_version=None, | ||
| ) | ||
|
|
||
| # Cursor | ||
| return AIHookPayload( | ||
| conversation_id=payload.get('conversation_id'), | ||
| ide_user_email=payload.get('user_email'), | ||
| model=payload.get('model'), | ||
| ide_provider=AIIDEType.CURSOR.value, | ||
| ide_version=payload.get('cursor_version'), | ||
| ) | ||
|
|
||
|
|
||
| def _get_mcp_servers_for_ide(ide: str) -> dict: | ||
| """Return configured MCP servers for the given IDE, or empty dict.""" | ||
| if ide == AIIDEType.CLAUDE_CODE: | ||
| config = load_claude_config() | ||
| elif ide == AIIDEType.CURSOR: | ||
| config = load_cursor_config() | ||
| else: | ||
| return {} | ||
| return get_mcp_servers(config) or {} if config else {} | ||
|
|
||
|
|
||
| def _report_data_flow(ai_client, ide: str) -> None: | ||
|
Check failure on line 55 in cycode/cli/apps/ai_guardrails/session_start_command.py
|
||
| """Report IDE MCP servers to the AI security manager. Never raises.""" | ||
| mcp_servers = _get_mcp_servers_for_ide(ide) | ||
| if not mcp_servers: | ||
| return | ||
| try: | ||
| ai_client.report_data_flow(mcp_servers) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename |
||
| except Exception as e: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you already have try catch inside report_data_flow |
||
| logger.debug('Failed to report MCP servers', exc_info=e) | ||
|
|
||
|
|
||
| def session_start_command( | ||
| ctx: typer.Context, | ||
| ide: Annotated[ | ||
| str, | ||
| typer.Option( | ||
| '--ide', | ||
| help='IDE that triggered the session start.', | ||
| hidden=True, | ||
| ), | ||
| ] = AIIDEType.CURSOR.value, | ||
| ) -> None: | ||
| """Handle session start: ensure auth, create conversation, report data flow.""" | ||
| # Step 1: Ensure authentication | ||
| auth_info = get_authorization_info(ctx) | ||
| if auth_info is None: | ||
| logger.debug('Not authenticated, starting authentication') | ||
| try: | ||
| auth_manager = AuthManager() | ||
| auth_manager.authenticate() | ||
| except Exception as err: | ||
| handle_auth_exception(ctx, err) | ||
| return | ||
| else: | ||
| logger.debug('Already authenticated') | ||
|
|
||
| # Step 2: Read stdin payload (backward compat: old hooks pipe no stdin) | ||
| if sys.stdin.isatty(): | ||
| logger.debug('No stdin payload (TTY), skipping session initialization') | ||
| return | ||
|
|
||
|
Comment on lines
+91
to
+95
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you explain
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What it does: sys.stdin.isatty() returns True when stdin is attached to a terminal — i.e. nothing is piped in. If we didn't have this guard, sys.stdin.read() on the next line would block forever waiting Why it's needed:
If you want to make the intent clearer, the comment could be:
|
||
| stdin_data = sys.stdin.read().strip() | ||
| payload = safe_json_parse(stdin_data) | ||
| if not payload: | ||
| logger.debug('Empty or invalid stdin payload, skipping session initialization') | ||
| return | ||
|
|
||
| # Step 3: Build session payload and initialize API client | ||
| session_payload = _build_session_payload(payload, ide) | ||
|
|
||
| try: | ||
| ai_client = get_ai_security_manager_client(ctx) | ||
| except Exception as e: | ||
| logger.debug('Failed to initialize AI security client', exc_info=e) | ||
| return | ||
|
|
||
| # Step 4: Create conversation | ||
| try: | ||
| ai_client.create_conversation(session_payload) | ||
| except Exception as e: | ||
| logger.debug('Failed to create conversation during session start', exc_info=e) | ||
|
|
||
| # Step 5: Report data flow (MCP servers) | ||
| _report_data_flow(ai_client, ide) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need to get the version