Skip to content

Commit d6db82d

Browse files
committed
CM-59844: add some basic test coverage to validate updates
1 parent 15b2da6 commit d6db82d

File tree

5 files changed

+926
-0
lines changed

5 files changed

+926
-0
lines changed

tests/cli/apps/__init__.py

Whitespace-only changes.

tests/cli/apps/mcp/__init__.py

Whitespace-only changes.
Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
import json
2+
import os
3+
import sys
4+
from unittest.mock import AsyncMock, patch
5+
6+
import pytest
7+
8+
if sys.version_info < (3, 10):
9+
pytest.skip('MCP requires Python 3.10+', allow_module_level=True)
10+
11+
from cycode.cli.apps.mcp.mcp_command import (
12+
_sanitize_file_path,
13+
_TempFilesManager,
14+
)
15+
16+
pytestmark = pytest.mark.anyio
17+
18+
19+
@pytest.fixture
20+
def anyio_backend() -> str:
21+
return 'asyncio'
22+
23+
# --- _sanitize_file_path input validation ---
24+
25+
26+
def test_sanitize_file_path_rejects_empty_string() -> None:
27+
with pytest.raises(ValueError, match='non-empty string'):
28+
_sanitize_file_path('')
29+
30+
31+
def test_sanitize_file_path_rejects_none() -> None:
32+
with pytest.raises(ValueError, match='non-empty string'):
33+
_sanitize_file_path(None)
34+
35+
36+
def test_sanitize_file_path_rejects_non_string() -> None:
37+
with pytest.raises(ValueError, match='non-empty string'):
38+
_sanitize_file_path(123)
39+
40+
41+
def test_sanitize_file_path_strips_null_bytes() -> None:
42+
result = _sanitize_file_path('foo/bar\x00baz.py')
43+
assert '\x00' not in result
44+
45+
46+
def test_sanitize_file_path_passes_valid_path_through() -> None:
47+
assert _sanitize_file_path('src/main.py') == 'src/main.py'
48+
49+
50+
# --- _TempFilesManager: path traversal prevention ---
51+
#
52+
# _sanitize_file_path delegates to pathvalidate which does NOT block
53+
# path traversal (../ passes through). The real security boundary is
54+
# the normpath containment check in _TempFilesManager.__enter__ (lines 136-139).
55+
# These tests verify that the two layers together prevent escaping the temp dir.
56+
57+
58+
def test_traversal_simple_dotdot_rejected() -> None:
59+
"""../../../etc/passwd must not escape the temp directory."""
60+
files = {
61+
'../../../etc/passwd': 'malicious',
62+
'safe.py': 'ok',
63+
}
64+
with _TempFilesManager(files, 'test-traversal') as temp_files:
65+
assert len(temp_files) == 1
66+
assert temp_files[0].endswith('safe.py')
67+
for tf in temp_files:
68+
assert '/etc/passwd' not in tf
69+
70+
71+
def test_traversal_backslash_dotdot_rejected() -> None:
72+
"""..\\..\\windows\\system32 must not escape the temp directory."""
73+
files = {
74+
'..\\..\\windows\\system32\\config': 'malicious',
75+
'safe.py': 'ok',
76+
}
77+
with _TempFilesManager(files, 'test-backslash') as temp_files:
78+
assert len(temp_files) == 1
79+
assert temp_files[0].endswith('safe.py')
80+
81+
82+
def test_traversal_embedded_dotdot_rejected() -> None:
83+
"""foo/../../../etc/passwd resolves outside temp dir and must be rejected."""
84+
files = {
85+
'foo/../../../etc/passwd': 'malicious',
86+
'safe.py': 'ok',
87+
}
88+
with _TempFilesManager(files, 'test-embedded') as temp_files:
89+
assert len(temp_files) == 1
90+
assert temp_files[0].endswith('safe.py')
91+
92+
93+
def test_traversal_absolute_path_rejected() -> None:
94+
"""Absolute paths must not be written outside the temp directory."""
95+
files = {
96+
'/etc/passwd': 'malicious',
97+
'safe.py': 'ok',
98+
}
99+
with _TempFilesManager(files, 'test-absolute') as temp_files:
100+
assert len(temp_files) == 1
101+
assert temp_files[0].endswith('safe.py')
102+
103+
104+
def test_traversal_dotdot_only_rejected() -> None:
105+
"""A bare '..' path must be rejected."""
106+
files = {
107+
'..': 'malicious',
108+
'safe.py': 'ok',
109+
}
110+
with _TempFilesManager(files, 'test-bare-dotdot') as temp_files:
111+
assert len(temp_files) == 1
112+
113+
114+
def test_traversal_all_malicious_raises() -> None:
115+
"""If every file path is a traversal attempt, no files are created and ValueError is raised."""
116+
files = {
117+
'../../../etc/passwd': 'malicious',
118+
'../../shadow': 'also malicious',
119+
}
120+
with pytest.raises(ValueError, match='No valid files'), _TempFilesManager(files, 'test-all-malicious'):
121+
pass
122+
123+
124+
def test_all_created_files_are_inside_temp_dir() -> None:
125+
"""Every created file must be under the temp base directory."""
126+
files = {
127+
'a.py': 'aaa',
128+
'sub/b.py': 'bbb',
129+
'sub/deep/c.py': 'ccc',
130+
}
131+
manager = _TempFilesManager(files, 'test-containment')
132+
with manager as temp_files:
133+
base = os.path.normpath(manager.temp_base_dir)
134+
for tf in temp_files:
135+
normalized = os.path.normpath(tf)
136+
assert normalized.startswith(base + os.sep), f'{tf} escaped temp dir {base}'
137+
138+
139+
def test_mixed_valid_and_traversal_only_creates_valid() -> None:
140+
"""Valid files are created, traversal attempts are silently skipped."""
141+
files = {
142+
'../escape.py': 'bad',
143+
'legit.py': 'good',
144+
'foo/../../escape2.py': 'bad',
145+
'src/app.py': 'good',
146+
}
147+
manager = _TempFilesManager(files, 'test-mixed')
148+
with manager as temp_files:
149+
base = os.path.normpath(manager.temp_base_dir)
150+
assert len(temp_files) == 2
151+
for tf in temp_files:
152+
assert os.path.normpath(tf).startswith(base + os.sep)
153+
basenames = [os.path.basename(tf) for tf in temp_files]
154+
assert 'legit.py' in basenames
155+
assert 'app.py' in basenames
156+
157+
158+
# --- _TempFilesManager: general functionality ---
159+
160+
161+
def test_temp_files_manager_creates_files() -> None:
162+
files = {
163+
'test1.py': 'print("hello")',
164+
'subdir/test2.js': 'console.log("world")',
165+
}
166+
with _TempFilesManager(files, 'test-call-id') as temp_files:
167+
assert len(temp_files) == 2
168+
for tf in temp_files:
169+
assert os.path.exists(tf)
170+
171+
172+
def test_temp_files_manager_writes_correct_content() -> None:
173+
files = {'hello.py': 'print("hello world")'}
174+
with _TempFilesManager(files, 'test-content') as temp_files, open(temp_files[0]) as f:
175+
assert f.read() == 'print("hello world")'
176+
177+
178+
def test_temp_files_manager_cleans_up_on_exit() -> None:
179+
files = {'cleanup.py': 'code'}
180+
manager = _TempFilesManager(files, 'test-cleanup')
181+
with manager as temp_files:
182+
temp_dir = manager.temp_base_dir
183+
assert os.path.exists(temp_dir)
184+
assert len(temp_files) == 1
185+
assert not os.path.exists(temp_dir)
186+
187+
188+
def test_temp_files_manager_empty_path_raises() -> None:
189+
files = {'': 'empty path'}
190+
with pytest.raises(ValueError, match='No valid files'), _TempFilesManager(files, 'test-empty-path'):
191+
pass
192+
193+
194+
def test_temp_files_manager_preserves_subdirectory_structure() -> None:
195+
files = {
196+
'src/main.py': 'main',
197+
'src/utils/helper.py': 'helper',
198+
}
199+
with _TempFilesManager(files, 'test-dirs') as temp_files:
200+
assert len(temp_files) == 2
201+
paths = [os.path.basename(tf) for tf in temp_files]
202+
assert 'main.py' in paths
203+
assert 'helper.py' in paths
204+
205+
206+
# --- _run_cycode_command (async) ---
207+
208+
209+
@pytest.mark.anyio
210+
async def test_run_cycode_command_returns_dict() -> None:
211+
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
212+
213+
mock_process = AsyncMock()
214+
mock_process.communicate.return_value = (b'', b'error output')
215+
mock_process.returncode = 1
216+
217+
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
218+
result = await _run_cycode_command('--invalid-flag-for-test')
219+
assert isinstance(result, dict)
220+
assert 'error' in result
221+
222+
223+
@pytest.mark.anyio
224+
async def test_run_cycode_command_parses_json_output() -> None:
225+
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
226+
227+
mock_process = AsyncMock()
228+
mock_process.communicate.return_value = (b'{"status": "ok"}', b'')
229+
mock_process.returncode = 0
230+
231+
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
232+
result = await _run_cycode_command('version')
233+
assert result == {'status': 'ok'}
234+
235+
236+
@pytest.mark.anyio
237+
async def test_run_cycode_command_handles_invalid_json() -> None:
238+
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
239+
240+
mock_process = AsyncMock()
241+
mock_process.communicate.return_value = (b'not json{', b'')
242+
mock_process.returncode = 0
243+
244+
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
245+
result = await _run_cycode_command('version')
246+
assert result['error'] == 'Failed to parse JSON output'
247+
248+
249+
@pytest.mark.anyio
250+
async def test_run_cycode_command_timeout() -> None:
251+
import asyncio
252+
253+
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
254+
255+
async def slow_communicate() -> tuple[bytes, bytes]:
256+
await asyncio.sleep(10)
257+
return b'', b''
258+
259+
mock_process = AsyncMock()
260+
mock_process.communicate = slow_communicate
261+
262+
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
263+
result = await _run_cycode_command('status', timeout=0.001)
264+
assert isinstance(result, dict)
265+
assert 'error' in result
266+
assert 'timeout' in result['error'].lower()
267+
268+
269+
# --- _cycode_scan_tool ---
270+
271+
272+
@pytest.mark.anyio
273+
async def test_cycode_scan_tool_no_files() -> None:
274+
from cycode.cli.apps.mcp.mcp_command import _cycode_scan_tool
275+
from cycode.cli.cli_types import ScanTypeOption
276+
277+
result = await _cycode_scan_tool(ScanTypeOption.SECRET, {})
278+
parsed = json.loads(result)
279+
assert 'error' in parsed
280+
assert 'No files provided' in parsed['error']
281+
282+
283+
@pytest.mark.anyio
284+
async def test_cycode_scan_tool_invalid_files() -> None:
285+
from cycode.cli.apps.mcp.mcp_command import _cycode_scan_tool
286+
from cycode.cli.cli_types import ScanTypeOption
287+
288+
result = await _cycode_scan_tool(ScanTypeOption.SECRET, {'': 'content'})
289+
parsed = json.loads(result)
290+
assert 'error' in parsed
291+
292+
293+
# --- _create_mcp_server ---
294+
295+
296+
def test_create_mcp_server() -> None:
297+
from cycode.cli.apps.mcp.mcp_command import _create_mcp_server
298+
299+
server = _create_mcp_server('127.0.0.1', 8000)
300+
assert server is not None
301+
assert server.name == 'cycode'
302+
303+
304+
def test_create_mcp_server_registers_tools() -> None:
305+
from cycode.cli.apps.mcp.mcp_command import _create_mcp_server
306+
307+
server = _create_mcp_server('127.0.0.1', 8000)
308+
tool_names = [t.name for t in server._tool_manager._tools.values()]
309+
assert 'cycode_status' in tool_names
310+
assert 'cycode_secret_scan' in tool_names
311+
assert 'cycode_sca_scan' in tool_names
312+
assert 'cycode_iac_scan' in tool_names
313+
assert 'cycode_sast_scan' in tool_names

0 commit comments

Comments
 (0)