diff --git a/docs/source/conf.py b/docs/source/conf.py index 8e6be28..a13eae7 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -17,7 +17,7 @@ # -- Project information ----------------------------------------------------- project = "PyBreeze" -copyright = "2020 ~ Now, JE-Chen" +copyright = "2020 ~ Now, JE-Chen" # noqa: A001 — Sphinx requires this exact variable name author = "JE-Chen" # -- General configuration --------------------------------------------------- diff --git a/pybreeze/extend/process_executor/file_runner_process.py b/pybreeze/extend/process_executor/file_runner_process.py index 67f05e3..8f9ca25 100644 --- a/pybreeze/extend/process_executor/file_runner_process.py +++ b/pybreeze/extend/process_executor/file_runner_process.py @@ -75,7 +75,9 @@ def _compile_and_run(self, compiler: str, args: list, output_flag: str, file_pat self._append_text(f"[Compile] {' '.join(compile_cmd)}\n", is_error=False) try: - result = subprocess.run( + # Runs the plugin-configured compiler against a file the user opened. + # shell=False, bounded timeout. nosec B603. + result = subprocess.run( # nosec B603 # nosemgrep # noqa: S603 compile_cmd, capture_output=True, timeout=60, @@ -107,7 +109,9 @@ def _start_process(self, command: list[str], cleanup_binary: str | None = None) self._append_text(f"> {cmd_display}\n", is_error=False) try: - self.process = subprocess.Popen( + # Run the user's plugin-configured command. shell=False is explicit; + # argv comes from a plugin run_config + user-opened file path. nosec B603. + self.process = subprocess.Popen( # nosec B603 # nosemgrep # noqa: S603 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, diff --git a/pybreeze/extend/process_executor/python_task_process_manager.py b/pybreeze/extend/process_executor/python_task_process_manager.py index baddfc9..c62f871 100644 --- a/pybreeze/extend/process_executor/python_task_process_manager.py +++ b/pybreeze/extend/process_executor/python_task_process_manager.py @@ -15,6 +15,7 @@ from je_editor.pyside_ui.main_ui.save_settings.user_color_setting_file import actually_color_dict from je_editor.utils.venv_check.check_venv import check_and_choose_venv +from pybreeze.extend.process_executor.queue_pump import pump_message_queue from pybreeze.pybreeze_ui.show_code_window.code_window import CodeWindow from pybreeze.utils.logging.logger import pybreeze_logger @@ -82,7 +83,10 @@ def start_test_process(self, package: str, exec_str: str): "--execute_str", exec_str ] - self.process = subprocess.Popen( + # Launch user-authored automation script in a child interpreter. + # Argument list is validated upstream; shell=False, no user string ever + # reaches a shell. nosec B603 — intentional local process execution. + self.process = subprocess.Popen( # nosec B603 # nosemgrep # noqa: S603 args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, @@ -121,28 +125,18 @@ def _append_text(self, text: str, is_error: bool = False) -> None: # Pyside UI update method def pull_text(self): - try: - if not self.run_output_queue.empty(): - output_message = str(self.run_output_queue.get_nowait()).strip() - if output_message: - self._append_text(output_message) - if not self.run_error_queue.empty(): - error_message = str(self.run_error_queue.get_nowait()).strip() - if error_message: - self._append_text(error_message, is_error=True) - except queue.Empty: - pass - if self.process is not None: - if self.process.returncode is not None: - if self.timer.isActive(): - self.timer.stop() - self.exit_program() - elif self.still_run_program: - # poll return code - self.process.poll() - else: + pump_message_queue(self.run_output_queue, self._append_text, is_error=False) + pump_message_queue(self.run_error_queue, self._append_text, is_error=True) + if self.process is None: + if self.timer.isActive(): + self.timer.stop() + return + if self.process.returncode is not None: if self.timer.isActive(): self.timer.stop() + self.exit_program() + elif self.still_run_program: + self.process.poll() # exit program change run flag to false and clean read thread and queue and process def exit_program(self): diff --git a/pybreeze/extend/process_executor/queue_pump.py b/pybreeze/extend/process_executor/queue_pump.py new file mode 100644 index 0000000..e3c13dc --- /dev/null +++ b/pybreeze/extend/process_executor/queue_pump.py @@ -0,0 +1,33 @@ +"""Shared helper for the process-executor QTimer pump loop. + +The Python task runner and the Test Pioneer runner both drain the same +``(output_queue, error_queue) → append_text`` pattern on a ~100 ms timer. +This module factors the per-tick drain into a single function so changes +(e.g., multi-message batching) can be made in one place. +""" +from __future__ import annotations + +import queue +from collections.abc import Callable +from queue import Queue + + +def pump_message_queue( + q: Queue, + append_fn: Callable[[str, bool], None], + *, + is_error: bool, +) -> None: + """Drain one pending message from *q* and forward it to *append_fn*. + + Silently ignores ``queue.Empty`` (the ``empty()`` check is racy) and empty + strings after stripping. + """ + try: + if q.empty(): + return + message = str(q.get_nowait()).strip() + if message: + append_fn(message, is_error) + except queue.Empty: + pass diff --git a/pybreeze/extend/process_executor/test_pioneer/test_pioneer_process_manager.py b/pybreeze/extend/process_executor/test_pioneer/test_pioneer_process_manager.py index 7684c6e..6062b9c 100644 --- a/pybreeze/extend/process_executor/test_pioneer/test_pioneer_process_manager.py +++ b/pybreeze/extend/process_executor/test_pioneer/test_pioneer_process_manager.py @@ -2,9 +2,7 @@ import queue import subprocess -import sys import threading -from pathlib import Path from queue import Queue from typing import TYPE_CHECKING @@ -13,8 +11,9 @@ from je_editor.pyside_ui.main_ui.save_settings.user_color_setting_file import actually_color_dict from je_editor.utils.venv_check.check_venv import check_and_choose_venv -from pybreeze.pybreeze_ui.show_code_window.code_window import CodeWindow from pybreeze.extend.process_executor.python_task_process_manager import find_venv_path +from pybreeze.extend.process_executor.queue_pump import pump_message_queue +from pybreeze.pybreeze_ui.show_code_window.code_window import CodeWindow if TYPE_CHECKING: from pybreeze.pybreeze_ui.editor_main.main_ui import PyBreezeMainWindow @@ -54,7 +53,10 @@ def __init__( "-e", executable_path ] - self._process: subprocess.Popen | None = subprocess.Popen( + # Launch the test_pioneer CLI in the user's configured Python interpreter. + # Argument list is assembled from a curated template + an executable path the + # user selected via file dialog. shell=False. nosec B603. + self._process: subprocess.Popen | None = subprocess.Popen( # nosec B603 # nosemgrep # noqa: S603 args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, @@ -72,28 +74,18 @@ def _append_text(self, text: str, is_error: bool = False) -> None: # Pyside UI update method def pull_text(self): - try: - if not self._run_output_queue.empty(): - output_message = str(self._run_output_queue.get_nowait()).strip() - if output_message: - self._append_text(output_message) - if not self._run_error_queue.empty(): - error_message = str(self._run_error_queue.get_nowait()).strip() - if error_message: - self._append_text(error_message, is_error=True) - except queue.Empty: - pass - if self._process is not None: - if self._process.returncode is not None: - if self._timer.isActive(): - self._timer.stop() - self.exit_program() - elif self._still_run_program: - # poll return code - self._process.poll() - else: + pump_message_queue(self._run_output_queue, self._append_text, is_error=False) + pump_message_queue(self._run_error_queue, self._append_text, is_error=True) + if self._process is None: + if self._timer.isActive(): + self._timer.stop() + return + if self._process.returncode is not None: if self._timer.isActive(): self._timer.stop() + self.exit_program() + elif self._still_run_program: + self._process.poll() # exit program change run flag to false and clean read thread and queue and process def exit_program(self): diff --git a/pybreeze/extend_multi_language/extend_english.py b/pybreeze/extend_multi_language/extend_english.py index 36abc3e..ef3e67a 100644 --- a/pybreeze/extend_multi_language/extend_english.py +++ b/pybreeze/extend_multi_language/extend_english.py @@ -2,6 +2,9 @@ from je_editor import english_word_dict +_COT_PROMPT_EDITOR = "CoT Prompt Editor" +_SKILL_PROMPT_EDITOR = "Skill Prompt Editor" + # PyBreeze-specific English translations pybreeze_english_word_dict = { # application name @@ -201,7 +204,7 @@ "ai_code_review_gui_status_rejected": "[Rejected]", "ai_code_review_gui_status_save_failed": "Save failed", # CoT Prompt Editor - "cot_prompt_editor_window_title": "CoT Prompt Editor", + "cot_prompt_editor_window_title": _COT_PROMPT_EDITOR, "cot_prompt_editor_groupbox_edit_file_content": "Edit File Content", "cot_prompt_editor_button_create_file": "Create File", "cot_prompt_editor_button_save_file": "Save", @@ -215,7 +218,7 @@ "cot_prompt_editor_msgbox_no_file_selected": "No file selected", "cot_prompt_editor_file_not_exist": "(File {filename} does not exist)", # Skill Prompt Editor - "skill_prompt_editor_window_title": "Skill Prompt Editor", + "skill_prompt_editor_window_title": _SKILL_PROMPT_EDITOR, "skill_prompt_editor_groupbox_edit_file_content": "Edit File Content", "skill_prompt_editor_button_create_file": "Create File", "skill_prompt_editor_button_save_file": "Save", @@ -236,10 +239,10 @@ "extend_tools_menu_ssh_client_tab_label": "SSH Client", "extend_tools_menu_ai_code_review_tab_action": "AI Code-Review Tab", "extend_tools_menu_ai_code_review_tab_label": "AI Code-Review", - "extend_tools_menu_cot_prompt_editor_tab_action": "CoT Prompt Editor", - "extend_tools_menu_cot_prompt_editor_tab_label": "CoT Prompt Editor", - "extend_tools_menu_skill_prompt_editor_tab_action": "Skill Prompt Editor", - "extend_tools_menu_skill_prompt_editor_tab_label": "Skill Prompt Editor", + "extend_tools_menu_cot_prompt_editor_tab_action": _COT_PROMPT_EDITOR, + "extend_tools_menu_cot_prompt_editor_tab_label": _COT_PROMPT_EDITOR, + "extend_tools_menu_skill_prompt_editor_tab_action": _SKILL_PROMPT_EDITOR, + "extend_tools_menu_skill_prompt_editor_tab_label": _SKILL_PROMPT_EDITOR, "extend_tools_menu_skill_prompt_send_tab_label": "Skill Send GUI", "extend_tools_menu_dock_ssh_menu": "SSH", "extend_tools_menu_dock_ai_menu": "AI", diff --git a/pybreeze/extend_multi_language/extend_traditional_chinese.py b/pybreeze/extend_multi_language/extend_traditional_chinese.py index 0d12eb7..0795ec2 100644 --- a/pybreeze/extend_multi_language/extend_traditional_chinese.py +++ b/pybreeze/extend_multi_language/extend_traditional_chinese.py @@ -2,6 +2,9 @@ from je_editor import traditional_chinese_word_dict +_COT_PROMPT_EDITOR = "CoT 提示詞編輯器" +_SKILL_PROMPT_EDITOR = "Skill 提示詞編輯器" + # PyBreeze-specific Traditional Chinese translations pybreeze_traditional_chinese_word_dict = { # application name @@ -236,10 +239,10 @@ "extend_tools_menu_ssh_client_tab_label": "SSH 用戶端", "extend_tools_menu_ai_code_review_tab_action": "AI 程式碼審查分頁", "extend_tools_menu_ai_code_review_tab_label": "AI 程式碼審查", - "extend_tools_menu_cot_prompt_editor_tab_action": "CoT 提示詞編輯器", - "extend_tools_menu_cot_prompt_editor_tab_label": "CoT 提示詞編輯器", - "extend_tools_menu_skill_prompt_editor_tab_action": "Skill 提示詞編輯器", - "extend_tools_menu_skill_prompt_editor_tab_label": "Skill 提示詞編輯器", + "extend_tools_menu_cot_prompt_editor_tab_action": _COT_PROMPT_EDITOR, + "extend_tools_menu_cot_prompt_editor_tab_label": _COT_PROMPT_EDITOR, + "extend_tools_menu_skill_prompt_editor_tab_action": _SKILL_PROMPT_EDITOR, + "extend_tools_menu_skill_prompt_editor_tab_label": _SKILL_PROMPT_EDITOR, "extend_tools_menu_skill_prompt_send_tab_label": "Skill 提示詞傳送 GUI", "extend_tools_menu_dock_ssh_menu": "SSH", "extend_tools_menu_dock_ai_menu": "AI", @@ -249,8 +252,8 @@ "extend_tools_menu_skill_prompt_editor_dock_action": "Skill 提示詞編輯器停駐窗格", "extend_tools_menu_ssh_client_dock_title": "SSH 用戶端", "extend_tools_menu_ai_code_review_dock_title": "AI 程式碼審查", - "extend_tools_menu_cot_prompt_editor_dock_title": "CoT 提示詞編輯器", - "extend_tools_menu_skill_prompt_editor_dock_title": "Skill 提示詞編輯器", + "extend_tools_menu_cot_prompt_editor_dock_title": _COT_PROMPT_EDITOR, + "extend_tools_menu_skill_prompt_editor_dock_title": _SKILL_PROMPT_EDITOR, "extend_tools_menu_skill_prompt_send_dock_action": "Skill Prompt 傳送停駐窗格", "extend_tools_menu_skill_prompt_send_dock_title": "Skill 提示詞傳送 GUI", # CoT code-review GUI diff --git a/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_command_widget.py b/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_command_widget.py index 44f8a76..09562db 100644 --- a/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_command_widget.py +++ b/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_command_widget.py @@ -13,6 +13,7 @@ from je_editor import language_wrapper from pybreeze.pybreeze_ui.connect_gui.ssh.ssh_host_key_policy import apply_host_key_policy +from pybreeze.pybreeze_ui.connect_gui.ssh.ssh_key_loader import load_private_key from pybreeze.pybreeze_ui.connect_gui.ssh.ssh_login_widget import LoginWidget from pybreeze.utils.logging.logger import pybreeze_logger @@ -144,51 +145,53 @@ def connect_ssh(self): pybreeze_logger.info("SSH connecting to %s:%s", host, port) if use_key: - if not os.path.exists(key_path): - QMessageBox.warning( - self, - self.word_dict.get("ssh_command_widget_dialog_title_key_error"), - self.word_dict.get("ssh_command_widget_dialog_message_key_file_not_exist")) + if not self._authenticate_with_key(host, port, user, key_path, password): return - try: - pkey = None - for KeyType in (paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey): - try: - pkey = KeyType.from_private_key_file(key_path, password if password else None) - break - except Exception as error: - pybreeze_logger.debug(f"SSH key type failed: {error}") - continue - if pkey is None: - raise ValueError( - self.word_dict.get( - "ssh_command_widget_error_message_unsupported_private_key" - )) - self.ssh_client.connect(hostname=host, port=port, username=user, pkey=pkey, timeout=10) - except Exception as e: - raise RuntimeError( - f"{self.word_dict.get('ssh_command_widget_error_message_key_auth_failed')} {e}") from e else: self.ssh_client.connect( hostname=host, port=port, username=user, password=password, timeout=10 ) - self.shell_channel = self.ssh_client.invoke_shell(term='xterm', width=120, height=32) - self.shell_channel.settimeout(0.0) - self.reader_thread = SSHReaderThread(self.shell_channel) - self.reader_thread.data_received.connect(self._on_data) - self.reader_thread.closed.connect(self._on_closed) - self.reader_thread.start() - self.login_widget.status_label.setText( - self.word_dict.get("ssh_command_widget_log_message_connected")) - self.append_text(f"{self.word_dict.get('ssh_command_widget_log_message_connected')}" - f" {host}:{port} as {user}\n") + self._start_shell(host, port, user) except Exception as e: self.login_widget.status_label.setText( self.word_dict.get('ssh_command_widget_status_label_disconnected')) self.append_text(f"{self.word_dict.get('ssh_command_widget_log_message_error')} {e}\n") self._cleanup() + def _authenticate_with_key(self, host: str, port: int, user: str, key_path: str, password: str) -> bool: + """Perform key-based auth. Returns True on success; False if the key file is missing.""" + if not os.path.exists(key_path): + QMessageBox.warning( + self, + self.word_dict.get("ssh_command_widget_dialog_title_key_error"), + self.word_dict.get("ssh_command_widget_dialog_message_key_file_not_exist")) + return False + try: + pkey = load_private_key(key_path, password, context="SSH") + if pkey is None: + raise ValueError( + self.word_dict.get( + "ssh_command_widget_error_message_unsupported_private_key" + )) + self.ssh_client.connect(hostname=host, port=port, username=user, pkey=pkey, timeout=10) + except Exception as e: + raise RuntimeError( + f"{self.word_dict.get('ssh_command_widget_error_message_key_auth_failed')} {e}") from e + return True + + def _start_shell(self, host: str, port: int, user: str) -> None: + self.shell_channel = self.ssh_client.invoke_shell(term='xterm', width=120, height=32) + self.shell_channel.settimeout(0.0) + self.reader_thread = SSHReaderThread(self.shell_channel) + self.reader_thread.data_received.connect(self._on_data) + self.reader_thread.closed.connect(self._on_closed) + self.reader_thread.start() + self.login_widget.status_label.setText( + self.word_dict.get("ssh_command_widget_log_message_connected")) + self.append_text(f"{self.word_dict.get('ssh_command_widget_log_message_connected')}" + f" {host}:{port} as {user}\n") + def _on_data(self, data: bytes): try: text = data.decode("utf-8", errors="replace") diff --git a/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_file_viewer_widget.py b/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_file_viewer_widget.py index a02531a..75cfd92 100644 --- a/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_file_viewer_widget.py +++ b/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_file_viewer_widget.py @@ -13,6 +13,7 @@ from je_editor import language_wrapper from pybreeze.pybreeze_ui.connect_gui.ssh.ssh_host_key_policy import apply_host_key_policy +from pybreeze.pybreeze_ui.connect_gui.ssh.ssh_key_loader import load_private_key from pybreeze.pybreeze_ui.connect_gui.ssh.ssh_login_widget import LoginWidget from pybreeze.utils.logging.logger import pybreeze_logger @@ -41,13 +42,7 @@ def connect(self, host: str, port: int, username: str, password: str, apply_host_key_policy(self._ssh, parent_widget) pybreeze_logger.info("SFTP connecting to %s:%s", host, port) if use_key and key_path: - pkey = None - for KeyType in (paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey): - try: - pkey = KeyType.from_private_key_file(key_path, password if password else None) - break - except Exception: - continue + pkey = load_private_key(key_path, password, context="SFTP") if pkey is None: raise ValueError( self.word_dict.get("ssh_command_widget_error_message_unsupported_private_key") @@ -298,32 +293,36 @@ def populate_children(self, parent_item: QTreeWidgetItem): path = parent_item.text(3) try: entries = self.client.list_dir(path) - # Sort: dirs first, then files - dirs = [] - files = [] - for e in entries: - name = e.filename - full_path = os.path.join(path if path != "/" else "", name) - full_path = full_path if full_path.startswith("/") else f"/{full_path}" - is_dir = stat.S_ISDIR(e.st_mode) - if is_dir: - dirs.append((name, e)) - else: - files.append((name, e)) - for name, e in dirs + files: - full_path = os.path.join(path if path != "/" else "", name) - full_path = full_path if full_path.startswith("/") else f"/{full_path}" - typ = "dir" if stat.S_ISDIR(e.st_mode) else "file" - size = e.st_size if typ == "file" else 0 - child = self.make_item(name, typ, size, full_path) - parent_item.addChild(child) - if typ == "dir": - self.add_placeholder(child) except Exception as ex: QMessageBox.critical( self, self.word_dict.get("ssh_file_viewer_dialog_title_list_error"), f"{self.word_dict.get('ssh_file_viewer_dialog_message_list_failed')} '{path}': {ex}") + return + for name, entry in self._sort_entries(entries): + self._add_entry_row(parent_item, path, name, entry) + + @staticmethod + def _sort_entries(entries): + """Return ``[(name, entry)]`` with directories before files.""" + dirs: list = [] + files: list = [] + for entry in entries: + bucket = dirs if stat.S_ISDIR(entry.st_mode) else files + bucket.append((entry.filename, entry)) + return dirs + files + + def _add_entry_row(self, parent_item: QTreeWidgetItem, path: str, name: str, entry) -> None: + base = "" if path == "/" else path + full_path = os.path.join(base, name) + if not full_path.startswith("/"): + full_path = f"/{full_path}" + typ = "dir" if stat.S_ISDIR(entry.st_mode) else "file" + size = entry.st_size if typ == "file" else 0 + child = self.make_item(name, typ, size, full_path) + parent_item.addChild(child) + if typ == "dir": + self.add_placeholder(child) def on_context_menu(self, pos): """ diff --git a/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_key_loader.py b/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_key_loader.py new file mode 100644 index 0000000..96648f1 --- /dev/null +++ b/pybreeze/pybreeze_ui/connect_gui/ssh/ssh_key_loader.py @@ -0,0 +1,32 @@ +"""Shared private-key loader for SSH widgets. + +Both the interactive shell widget and the SFTP file-viewer need to pick the +right paramiko key class for an arbitrary private-key file. Centralising the +loop here keeps the widgets lean and avoids duplicated fallback logic. +""" +from __future__ import annotations + +import paramiko + +from pybreeze.utils.logging.logger import pybreeze_logger + +_KEY_CLASSES: tuple[type[paramiko.PKey], ...] = ( + paramiko.RSAKey, + paramiko.Ed25519Key, + paramiko.ECDSAKey, +) + + +def load_private_key(key_path: str, password: str, *, context: str = "SSH") -> paramiko.PKey | None: + """Try each supported key type against *key_path*; return the first that parses. + + ``password`` is treated as the passphrase (empty string → no passphrase). + ``context`` is included in debug logs so SFTP vs shell failures are distinguishable. + """ + passphrase = password if password else None + for key_cls in _KEY_CLASSES: + try: + return key_cls.from_private_key_file(key_path, passphrase) + except (paramiko.SSHException, ValueError, OSError) as error: + pybreeze_logger.debug("%s key type %s rejected: %s", context, key_cls.__name__, error) + return None diff --git a/pybreeze/pybreeze_ui/diagram_editor/diagram_mermaid_parser.py b/pybreeze/pybreeze_ui/diagram_editor/diagram_mermaid_parser.py index ec154d2..d4902b8 100644 --- a/pybreeze/pybreeze_ui/diagram_editor/diagram_mermaid_parser.py +++ b/pybreeze/pybreeze_ui/diagram_editor/diagram_mermaid_parser.py @@ -66,49 +66,51 @@ class _EdgeInfo: # --------------------------------------------------------------------------- +_LABEL_MAX = 200 # bound non-greedy match to prevent polynomial backtracking on pathological input + + def _normalize_inline_labels(line: str) -> str: """Convert ``-- label -->`` style to ``-->|label|`` pipe style.""" - line = re.sub(r"--\s+(\S[^|]*?)\s+-->", r"-->|\1|", line) - line = re.sub(r"-\.\s+(\S[^|]*?)\s+\.->", r"-.->|\1|", line) - line = re.sub(r"==\s+(\S[^|]*?)\s+==>", r"==>|\1|", line) + line = re.sub(rf"--\s+(\S[^|]{{0,{_LABEL_MAX}}}?)\s+-->", r"-->|\1|", line) + line = re.sub(rf"-\.\s+(\S[^|]{{0,{_LABEL_MAX}}}?)\s+\.->", r"-.->|\1|", line) + line = re.sub(rf"==\s+(\S[^|]{{0,{_LABEL_MAX}}}?)\s+==>", r"==>|\1|", line) return line +# Ordered longest-first so the double-bracket shapes match before single-bracket ones. +_SHAPE_DELIMS: tuple[tuple[str, str, NodeShape], ...] = ( + ("((", "))", NodeShape.ELLIPSE), + ("([", "])", NodeShape.ROUNDED_RECT), + ("{{", "}}", NodeShape.DIAMOND), + ("[[", "]]", NodeShape.RECTANGLE), + ("(", ")", NodeShape.ROUNDED_RECT), + ("{", "}", NodeShape.DIAMOND), + ("[", "]", NodeShape.RECTANGLE), +) + + +def _extract_shape(rest: str, default_text: str) -> tuple[str, NodeShape]: + """Strip matching delimiters from ``rest`` and return ``(text, shape)``.""" + for open_tok, close_tok, shape in _SHAPE_DELIMS: + if rest.startswith(open_tok) and rest.endswith(close_tok): + return rest[len(open_tok):-len(close_tok)].strip(), shape + return default_text, NodeShape.RECTANGLE + + def _parse_node_ref(raw: str, nodes: dict[str, _NodeInfo]) -> str | None: """Parse ``ID[text]`` / ``ID(text)`` / ``ID{text}`` / ``ID((text))`` and register in *nodes*. Returns the node ID or ``None``.""" raw = raw.strip() if not raw: return None - m = re.match(r"(\w+)(.*)", raw, re.DOTALL) if not m: return None - node_id = m.group(1) rest = m.group(2).strip() - - text = node_id - shape = NodeShape.RECTANGLE - - if rest.startswith("((") and rest.endswith("))"): - text, shape = rest[2:-2].strip(), NodeShape.ELLIPSE - elif rest.startswith("([") and rest.endswith("])"): - text, shape = rest[2:-2].strip(), NodeShape.ROUNDED_RECT - elif rest.startswith("(") and rest.endswith(")"): - text, shape = rest[1:-1].strip(), NodeShape.ROUNDED_RECT - elif rest.startswith("{{") and rest.endswith("}}"): - text, shape = rest[2:-2].strip(), NodeShape.DIAMOND - elif rest.startswith("{") and rest.endswith("}"): - text, shape = rest[1:-1].strip(), NodeShape.DIAMOND - elif rest.startswith("[[") and rest.endswith("]]"): - text, shape = rest[2:-2].strip(), NodeShape.RECTANGLE - elif rest.startswith("[") and rest.endswith("]"): - text, shape = rest[1:-1].strip(), NodeShape.RECTANGLE - + text, shape = _extract_shape(rest, default_text=node_id) if node_id not in nodes: nodes[node_id] = _NodeInfo(id=node_id, text=text, shape=shape) - return node_id @@ -131,33 +133,27 @@ def _parse_arrow(token: str) -> tuple[str, ConnectionStyle, float]: # --------------------------------------------------------------------------- -def _auto_layout( +def _build_adjacency( nodes: dict[str, _NodeInfo], edges: list[_EdgeInfo], - direction: str, -) -> None: - if not nodes: - return - +) -> tuple[dict[str, list[str]], dict[str, int]]: adj: dict[str, list[str]] = defaultdict(list) - in_deg: dict[str, int] = {nid: 0 for nid in nodes} - + in_deg: dict[str, int] = dict.fromkeys(nodes, 0) for e in edges: if e.source in nodes and e.target in nodes: adj[e.source].append(e.target) in_deg[e.target] = in_deg.get(e.target, 0) + 1 + return adj, in_deg - # BFS from roots - roots = [nid for nid, deg in in_deg.items() if deg == 0] - if not roots: - roots = [next(iter(nodes))] - - layers: dict[str, int] = {} - queue: deque[str] = deque() - for r in roots: - layers[r] = 0 - queue.append(r) +def _assign_layers( + nodes: dict[str, _NodeInfo], + adj: dict[str, list[str]], + in_deg: dict[str, int], +) -> dict[str, int]: + roots = [nid for nid, deg in in_deg.items() if deg == 0] or [next(iter(nodes))] + layers: dict[str, int] = dict.fromkeys(roots, 0) + queue: deque[str] = deque(roots) while queue: nid = queue.popleft() for child in adj.get(nid, []): @@ -165,51 +161,60 @@ def _auto_layout( if child not in layers or layers[child] < new_layer: layers[child] = new_layer queue.append(child) - max_layer = max(layers.values(), default=0) for nid in nodes: if nid not in layers: max_layer += 1 layers[nid] = max_layer + return layers + + +_NODE_H = 60.0 +_GAP_MAIN = 120.0 +_GAP_CROSS = 80.0 + + +def _position_node(node: _NodeInfo, layer_idx: int, cross_offset: float, + horizontal: bool, flip: bool) -> None: + if horizontal: + main_pos = layer_idx * (200 + _GAP_MAIN) + cross_pos = cross_offset * (_NODE_H + _GAP_CROSS) + if flip: + main_pos = -main_pos + node.x = main_pos + node.y = cross_pos + else: + main_pos = layer_idx * (_NODE_H + _GAP_MAIN) + cross_pos = cross_offset * (200 + _GAP_CROSS) + if flip: + main_pos = -main_pos + node.x = cross_pos + node.y = main_pos + + +def _auto_layout( + nodes: dict[str, _NodeInfo], + edges: list[_EdgeInfo], + direction: str, +) -> None: + if not nodes: + return + adj, in_deg = _build_adjacency(nodes, edges) + layers = _assign_layers(nodes, adj, in_deg) - # Group by layer layer_groups: dict[int, list[str]] = defaultdict(list) for nid, layer in layers.items(): layer_groups[layer].append(nid) - # Estimate node width from text length - def _node_w(nid: str) -> float: - return max(100.0, min(len(nodes[nid].text) * 11 + 40, 300.0)) - - node_h = 60.0 - gap_main = 120.0 # between layers - gap_cross = 80.0 # between siblings - horizontal = direction in ("LR", "RL") flip = direction in ("RL", "BT") for layer_idx in sorted(layer_groups.keys()): group = layer_groups[layer_idx] count = len(group) - for i, nid in enumerate(group): - nw = _node_w(nid) - cross_offset = (i - (count - 1) / 2) - - if horizontal: - main_pos = layer_idx * (200 + gap_main) - cross_pos = cross_offset * (node_h + gap_cross) - if flip: - main_pos = -main_pos - nodes[nid].x = main_pos - nodes[nid].y = cross_pos - else: - main_pos = layer_idx * (node_h + gap_main) - cross_pos = cross_offset * (200 + gap_cross) - if flip: - main_pos = -main_pos - nodes[nid].x = cross_pos - nodes[nid].y = main_pos + cross_offset = i - (count - 1) / 2 + _position_node(nodes[nid], layer_idx, cross_offset, horizontal, flip) # --------------------------------------------------------------------------- @@ -217,6 +222,41 @@ def _node_w(nid: str) -> float: # --------------------------------------------------------------------------- +def _parse_statement( + stmt: str, + nodes: dict[str, _NodeInfo], + edges: list[_EdgeInfo], +) -> None: + """Parse one ``;``-delimited mermaid statement, updating *nodes* and *edges*.""" + stmt = _normalize_inline_labels(stmt.strip()) + if not stmt: + return + parts = [p for p in _ARROW_SPLIT_RE.split(stmt) if p.strip()] + if len(parts) < 3: + if parts: + _parse_node_ref(parts[0], nodes) + return + idx = 0 + while idx + 2 < len(parts): + src_id = _parse_node_ref(parts[idx], nodes) + label, style, width = _parse_arrow(parts[idx + 1]) + tgt_id = _parse_node_ref(parts[idx + 2], nodes) + if src_id and tgt_id: + edges.append(_EdgeInfo( + source=src_id, target=tgt_id, + label=label, style=style, line_width=width, + )) + idx += 2 + + +def _parse_direction(line: str) -> str | None: + match = _DIRECTION_RE.match(line) + if match is None: + return None + direction = match.group(1).upper() + return "TD" if direction == "TB" else direction + + def parse_mermaid(text: str) -> dict: """Parse Mermaid flowchart syntax and return a diagram dict with auto-layout positions. @@ -240,54 +280,17 @@ def parse_mermaid(text: str) -> dict: line = _COMMENT_RE.sub("", raw_line).strip() if not line: continue - - # Direction header - dm = _DIRECTION_RE.match(line) - if dm: - direction = dm.group(1).upper() - if direction == "TB": - direction = "TD" + new_dir = _parse_direction(line) + if new_dir is not None: + direction = new_dir continue - - # Skip unsupported directives if _SKIP_RE.match(line): continue - - # Handle semicolon-separated statements on one line for stmt in line.split(";"): - stmt = stmt.strip() - if not stmt: - continue - - stmt = _normalize_inline_labels(stmt) - - parts = _ARROW_SPLIT_RE.split(stmt) - parts = [p for p in parts if p.strip()] - - if len(parts) < 3: - # Standalone node definition - if parts: - _parse_node_ref(parts[0], nodes) - continue - - # Chained edges: N0 arrow N1 arrow N2 ... - idx = 0 - while idx + 2 < len(parts): - src_id = _parse_node_ref(parts[idx], nodes) - label, style, width = _parse_arrow(parts[idx + 1]) - tgt_id = _parse_node_ref(parts[idx + 2], nodes) - - if src_id and tgt_id: - edges.append(_EdgeInfo( - source=src_id, target=tgt_id, - label=label, style=style, line_width=width, - )) - idx += 2 - - # Auto-layout + _parse_statement(stmt, nodes, edges) + _auto_layout(nodes, edges, direction) - # Build output dict (same format as DiagramScene.to_dict) node_list = list(nodes.values()) id_to_idx: dict[str, int] = {n.id: i for i, n in enumerate(node_list)} diff --git a/pybreeze/pybreeze_ui/diagram_editor/diagram_net_utils.py b/pybreeze/pybreeze_ui/diagram_editor/diagram_net_utils.py index b378f08..2150da1 100644 --- a/pybreeze/pybreeze_ui/diagram_editor/diagram_net_utils.py +++ b/pybreeze/pybreeze_ui/diagram_editor/diagram_net_utils.py @@ -36,7 +36,7 @@ def safe_download_image(url: str) -> bytes: url = _validate_url(url) req = Request(url, headers={"User-Agent": "PyBreeze-DiagramEditor/1.0"}) - resp = urlopen(req, timeout=TIMEOUT_SECONDS) # noqa: S310 — URL validated above + resp = urlopen(req, timeout=TIMEOUT_SECONDS) # nosec B310 # noqa: S310 — URL validated above by _validate_url # Check Content-Length header if available content_length = resp.headers.get("Content-Length") diff --git a/pybreeze/pybreeze_ui/diagram_editor/diagram_scene.py b/pybreeze/pybreeze_ui/diagram_editor/diagram_scene.py index c80afc8..41f1de1 100644 --- a/pybreeze/pybreeze_ui/diagram_editor/diagram_scene.py +++ b/pybreeze/pybreeze_ui/diagram_editor/diagram_scene.py @@ -163,69 +163,69 @@ def mousePressEvent(self, event) -> None: pos = event.scenePos() - # Node creation modes shape = _MODE_SHAPE_MAP.get(self._mode) if shape is not None: - with self.undo_scope("Add Node"): - node = DiagramNode(x=pos.x() - 70, y=pos.y() - 30, shape=shape) - self.addItem(node) - self.item_count_changed.emit() - self.mode = ToolMode.SELECT + self._add_shape_node(pos, shape) return - # Text-only node if self._mode == ToolMode.ADD_TEXT: - with self.undo_scope("Add Text"): - node = DiagramNode( - x=pos.x() - 70, y=pos.y() - 20, - w=140, h=40, text="Text", - shape=NodeShape.RECTANGLE, - ) - self.addItem(node) - self.item_count_changed.emit() - self.mode = ToolMode.SELECT + self._add_text_node(pos) return - # Connection mode if self._mode == ToolMode.ADD_CONNECTION: - target_node = self._node_at(pos) - if target_node is not None: - if self._connection_source is None: - # First click — pick source - self._connection_source = target_node - self._temp_line = QGraphicsLineItem() - self._temp_line.setPen(QPen(QColor("#90a4ae"), 1.5, Qt.PenStyle.DashLine)) - self._temp_line.setZValue(-100) - self._temp_line.setAcceptedMouseButtons(Qt.MouseButton.NoButton) - center = target_node.center_pos() - self._temp_line.setLine(center.x(), center.y(), pos.x(), pos.y()) - self.addItem(self._temp_line) - else: - # Second click — create connection - if target_node is not self._connection_source: - with self.undo_scope("Add Connection"): - conn = DiagramConnection(self._connection_source, target_node) - self.addItem(conn) - self.item_count_changed.emit() - self._cancel_connection() - self.mode = ToolMode.SELECT - return - # Clicked empty area — cancel current connection attempt - if self._connection_source is not None: - self._cancel_connection() - return + self._handle_connection_click(pos) return - # SELECT mode — track for move undo if self._mode == ToolMode.SELECT: super().mousePressEvent(event) - selected_nodes = [i for i in self.selectedItems() if isinstance(i, DiagramNode)] - if selected_nodes: + if any(isinstance(i, DiagramNode) for i in self.selectedItems()): self.begin_undo("Move") return super().mousePressEvent(event) + def _add_shape_node(self, pos: QPointF, shape: NodeShape) -> None: + with self.undo_scope("Add Node"): + self.addItem(DiagramNode(x=pos.x() - 70, y=pos.y() - 30, shape=shape)) + self.item_count_changed.emit() + self.mode = ToolMode.SELECT + + def _add_text_node(self, pos: QPointF) -> None: + with self.undo_scope("Add Text"): + self.addItem(DiagramNode( + x=pos.x() - 70, y=pos.y() - 20, + w=140, h=40, text="Text", + shape=NodeShape.RECTANGLE, + )) + self.item_count_changed.emit() + self.mode = ToolMode.SELECT + + def _handle_connection_click(self, pos: QPointF) -> None: + target_node = self._node_at(pos) + if target_node is None: + if self._connection_source is not None: + self._cancel_connection() + return + if self._connection_source is None: + self._start_connection_drag(target_node, pos) + return + if target_node is not self._connection_source: + with self.undo_scope("Add Connection"): + self.addItem(DiagramConnection(self._connection_source, target_node)) + self.item_count_changed.emit() + self._cancel_connection() + self.mode = ToolMode.SELECT + + def _start_connection_drag(self, source: DiagramNode, pos: QPointF) -> None: + self._connection_source = source + self._temp_line = QGraphicsLineItem() + self._temp_line.setPen(QPen(QColor("#90a4ae"), 1.5, Qt.PenStyle.DashLine)) + self._temp_line.setZValue(-100) + self._temp_line.setAcceptedMouseButtons(Qt.MouseButton.NoButton) + center = source.center_pos() + self._temp_line.setLine(center.x(), center.y(), pos.x(), pos.y()) + self.addItem(self._temp_line) + def mouseMoveEvent(self, event) -> None: if self._temp_line is not None and self._connection_source is not None: center = self._connection_source.center_pos() @@ -341,13 +341,14 @@ def delete_selected(self) -> None: if not selected: return with self.undo_scope("Delete"): - for item in list(selected): + for item in selected: if isinstance(item, DiagramConnection): item.detach() self.removeItem(item) - for item in list(selected): + for item in selected: if isinstance(item, DiagramNode): - for conn in list(item.connections): + # snapshot: detach() mutates item.connections during iteration + for conn in tuple(item.connections): conn.detach() self.removeItem(conn) self.removeItem(item) @@ -545,7 +546,8 @@ def to_dict(self) -> dict: def _clear_items(self) -> None: """Remove all diagram items without clearing the scene entirely.""" - for item in list(self.items()): + # snapshot: removeItem() mutates scene items during iteration + for item in tuple(self.items()): if isinstance(item, (DiagramNode, DiagramConnection, DiagramImage)): if isinstance(item, DiagramConnection): item.detach() @@ -592,7 +594,7 @@ def _try_load_image_source(self, img: DiagramImage, source: str) -> None: if not pix.isNull(): img.set_pixmap(pix, source) return - if source.startswith(("http://", "https://")): + if source.startswith(("http://", "https://")): # NOSONAR S5332 — scheme detection; actual fetch goes through safe_download_image with SSRF validation try: data = safe_download_image(source) pix = QPixmap() diff --git a/pybreeze/pybreeze_ui/editor_main/file_tree_context_menu.py b/pybreeze/pybreeze_ui/editor_main/file_tree_context_menu.py index b5ee9be..5278024 100644 --- a/pybreeze/pybreeze_ui/editor_main/file_tree_context_menu.py +++ b/pybreeze/pybreeze_ui/editor_main/file_tree_context_menu.py @@ -253,9 +253,11 @@ def _action_reveal_in_explorer(path: Path | None) -> None: if path is None: return target = path if path.is_dir() else path.parent + # "Reveal in file explorer" — platform file-manager invocation on a path the + # user already selected in our tree. shell=False, fixed argv[0]. nosec B603/B606/B607. if sys.platform == "win32": - os.startfile(str(target)) + os.startfile(str(target)) # nosec B606 # nosemgrep # noqa: S606 elif sys.platform == "darwin": - subprocess.Popen(["open", str(target)]) + subprocess.Popen(["open", str(target)]) # nosec B603 B607 # nosemgrep # noqa: S603,S607 else: - subprocess.Popen(["xdg-open", str(target)]) + subprocess.Popen(["xdg-open", str(target)]) # nosec B603 B607 # nosemgrep # noqa: S603,S607 diff --git a/pybreeze/pybreeze_ui/editor_main/main_ui.py b/pybreeze/pybreeze_ui/editor_main/main_ui.py index 2655281..5ff5291 100644 --- a/pybreeze/pybreeze_ui/editor_main/main_ui.py +++ b/pybreeze/pybreeze_ui/editor_main/main_ui.py @@ -33,7 +33,7 @@ def __init__(self, debug_mode: bool = False, show_system_tray_ray: bool = False, # which auto-discovers jeditor_plugins/ in the current working directory. # Third-party plugins placed there will be loaded automatically. - self.current_run_code_window: list[QWidget] = list() + self.current_run_code_window: list[QWidget] = [] # Project compiler if user not choose this will use which to find self.python_compiler = None # Delete JEditor help diff --git a/pybreeze/pybreeze_ui/jupyter_lab_gui/jupyter_lab_thread.py b/pybreeze/pybreeze_ui/jupyter_lab_gui/jupyter_lab_thread.py index d1b319f..9332da9 100644 --- a/pybreeze/pybreeze_ui/jupyter_lab_gui/jupyter_lab_thread.py +++ b/pybreeze/pybreeze_ui/jupyter_lab_gui/jupyter_lab_thread.py @@ -16,11 +16,11 @@ def find_free_port() -> int: - s = socket.socket() - s.bind(("", 0)) - port = s.getsockname()[1] - s.close() - return port + # Bind to loopback only: this socket exists purely to have the kernel pick an + # unused port, which the JupyterLab server (also localhost-only) will reuse. + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] def get_venv_python() -> str: @@ -48,7 +48,9 @@ def get_venv_python() -> str: def is_jupyter_installed(python_exe: str) -> bool: - result = subprocess.run( + # Query local venv for jupyterlab. python_exe is resolved via get_venv_python() + # from a fixed allowlist of venv paths; shell=False. nosec B603. + result = subprocess.run( # nosec B603 # nosemgrep # noqa: S603 [python_exe, "-m", "pip", "show", "jupyterlab"], capture_output=True, timeout=30, @@ -73,7 +75,9 @@ def run(self): if not is_jupyter_installed(python_exe): self.status_update.emit(language_wrapper.language_word_dict.get("jupyterlab_downloading")) - result = subprocess.run([ + # Install jupyterlab into the local venv. python_exe comes from + # get_venv_python(); shell=False. nosec B603. + result = subprocess.run([ # nosec B603 # nosemgrep # noqa: S603 python_exe, "-m", "pip", @@ -89,7 +93,9 @@ def run(self): port = find_free_port() - self.process = subprocess.Popen([ + # Launch embedded JupyterLab. Server binds to localhost only (see + # CLAUDE.md JupyterLab integration notes); shell=False. nosec B603. + self.process = subprocess.Popen([ # nosec B603 # nosemgrep # noqa: S603 python_exe, "-m", "jupyterlab", diff --git a/pybreeze/pybreeze_ui/menu/automation_menu/automation_menu_factory.py b/pybreeze/pybreeze_ui/menu/automation_menu/automation_menu_factory.py index b038abd..bc77c5a 100644 --- a/pybreeze/pybreeze_ui/menu/automation_menu/automation_menu_factory.py +++ b/pybreeze/pybreeze_ui/menu/automation_menu/automation_menu_factory.py @@ -103,9 +103,8 @@ def safe_create_project(import_name: str) -> Callable: """Create a safe project creation function that handles ImportError.""" def _create(): try: - package = importlib.import_module(import_name) - if package is not None: - package.create_project_dir() + package = importlib.import_module(import_name) # nosec # nosemgrep # plugin registry uses a curated whitelist of automation packages (build_process) + package.create_project_dir() except ImportError as error: pybreeze_logger.error(f"Failed to import {import_name}: {error}") return _create diff --git a/pybreeze/pybreeze_ui/menu/plugin_menu/build_plugin_menu.py b/pybreeze/pybreeze_ui/menu/plugin_menu/build_plugin_menu.py index b6b3e99..9b22217 100644 --- a/pybreeze/pybreeze_ui/menu/plugin_menu/build_plugin_menu.py +++ b/pybreeze/pybreeze_ui/menu/plugin_menu/build_plugin_menu.py @@ -45,79 +45,65 @@ def set_plugin_menu(ui_we_want_to_set: PyBreezeMainWindow) -> None: ui_we_want_to_set.plugin_menu.addSeparator() for meta in metadata_list: - plugin_name = meta.get("name", "Unknown") - plugin_author = meta.get("author", "") - plugin_version = meta.get("version", "") - run_config = meta.get("run_config") - - if run_config is not None: - suffixes = run_config.get("suffixes", ()) - config_name = run_config.get("name", plugin_name) - - if len(suffixes) > 1: - # 多種副檔名:建立子選單 - # Multiple suffixes: create a submenu - sub_menu = ui_we_want_to_set.plugin_menu.addMenu(config_name) - - # 「關於」動作 - # "About" action - about_action = QAction( - language_wrapper.language_word_dict.get("plugin_menu_about", "About"), - sub_menu, - ) - about_action.triggered.connect( - _make_about_callback(plugin_name, plugin_version, plugin_author) - ) - sub_menu.addAction(about_action) - sub_menu.addSeparator() - - # 每個副檔名一個執行動作 - # One run action per suffix - for suffix in suffixes: - run_action = QAction( - language_wrapper.language_word_dict.get( - "plugin_menu_run_with", "Run with {name}" - ).format(name=f"{config_name} ({suffix})"), - sub_menu, - ) - run_action.triggered.connect( - _make_run_callback(ui_we_want_to_set, run_config, suffix) - ) - sub_menu.addAction(run_action) - else: - # 單一副檔名:建立子選單含關於與執行 - # Single suffix: submenu with about and run - sub_menu = ui_we_want_to_set.plugin_menu.addMenu(config_name) - - about_action = QAction( - language_wrapper.language_word_dict.get("plugin_menu_about", "About"), - sub_menu, - ) - about_action.triggered.connect( - _make_about_callback(plugin_name, plugin_version, plugin_author) - ) - sub_menu.addAction(about_action) - sub_menu.addSeparator() - - suffix = suffixes[0] if suffixes else "" - run_action = QAction( - language_wrapper.language_word_dict.get( - "plugin_menu_run_with", "Run with {name}" - ).format(name=config_name), - sub_menu, - ) - run_action.triggered.connect( - _make_run_callback(ui_we_want_to_set, run_config, suffix) - ) - sub_menu.addAction(run_action) - else: - # 沒有執行設定的插件(如翻譯插件),只顯示關於 - # Plugins without run config (e.g. translation), show about only - about_action = QAction(plugin_name, ui_we_want_to_set.plugin_menu) - about_action.triggered.connect( - _make_about_callback(plugin_name, plugin_version, plugin_author) - ) - ui_we_want_to_set.plugin_menu.addAction(about_action) + _add_plugin_entry(ui_we_want_to_set, meta) + + +def _add_plugin_entry(ui_we_want_to_set: PyBreezeMainWindow, meta: dict) -> None: + """Add one plugin's menu entries (submenu with run actions, or a bare About action).""" + plugin_name = meta.get("name", "Unknown") + plugin_author = meta.get("author", "") + plugin_version = meta.get("version", "") + run_config = meta.get("run_config") + + if run_config is None: + # 沒有執行設定的插件(如翻譯插件),只顯示關於 + # Plugins without run config (e.g. translation), show about only + about_action = QAction(plugin_name, ui_we_want_to_set.plugin_menu) + about_action.triggered.connect( + _make_about_callback(plugin_name, plugin_version, plugin_author) + ) + ui_we_want_to_set.plugin_menu.addAction(about_action) + return + + suffixes = run_config.get("suffixes", ()) + config_name = run_config.get("name", plugin_name) + sub_menu = ui_we_want_to_set.plugin_menu.addMenu(config_name) + + about_action = QAction( + language_wrapper.language_word_dict.get("plugin_menu_about", "About"), + sub_menu, + ) + about_action.triggered.connect( + _make_about_callback(plugin_name, plugin_version, plugin_author) + ) + sub_menu.addAction(about_action) + sub_menu.addSeparator() + + if len(suffixes) > 1: + # 多種副檔名:每個副檔名一個執行動作 + # Multiple suffixes: one run action per suffix + for suffix in suffixes: + _add_run_action(ui_we_want_to_set, sub_menu, run_config, suffix, + label_name=f"{config_name} ({suffix})") + else: + # 單一副檔名:一個執行動作 + # Single suffix: one run action + suffix = suffixes[0] if suffixes else "" + _add_run_action(ui_we_want_to_set, sub_menu, run_config, suffix, label_name=config_name) + + +def _add_run_action(ui_we_want_to_set: PyBreezeMainWindow, parent_menu, + run_config: dict, suffix: str, label_name: str) -> None: + run_action = QAction( + language_wrapper.language_word_dict.get( + "plugin_menu_run_with", "Run with {name}" + ).format(name=label_name), + parent_menu, + ) + run_action.triggered.connect( + _make_run_callback(ui_we_want_to_set, run_config, suffix) + ) + parent_menu.addAction(run_action) def _open_plugin_browser(ui_we_want_to_set: PyBreezeMainWindow) -> None: diff --git a/pybreeze/utils/exception/exception_tags.py b/pybreeze/utils/exception/exception_tags.py index 7150c39..a9bd778 100644 --- a/pybreeze/utils/exception/exception_tags.py +++ b/pybreeze/utils/exception/exception_tags.py @@ -24,7 +24,6 @@ # ui exception wrong_test_data_format_exception_tag: str = "incorrect test data format" -# exec exception exec_error: str = "AutomationEditor execution error" file_not_fond_error: str = "File not found" compiler_not_found_error: str = "Compiler not found" diff --git a/pyproject.toml b/pyproject.toml index 0003d72..dc35335 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ # Rename to build stable version # This is stable version [build-system] -requires = ["setuptools>=61.0"] +requires = ["setuptools>=82.0.1"] build-backend = "setuptools.build_meta" [project] @@ -38,3 +38,11 @@ content-type = "text/markdown" [tool.setuptools.packages] find = { namespaces = false } + +[tool.bandit] +# Codacy / SonarCloud run Bandit across the tree; tests use `assert` intentionally +# (pytest relies on it), and subprocess is a required dependency of the IDE's +# process-executor layer. Callsites that genuinely need per-line review carry an +# inline `# nosec` annotation with justification. +exclude_dirs = ["test", "tests"] +skips = ["B101", "B404"] diff --git a/test/test_utils/test_jupyter_helpers.py b/test/test_utils/test_jupyter_helpers.py index bd3049d..e07de4b 100644 --- a/test/test_utils/test_jupyter_helpers.py +++ b/test/test_utils/test_jupyter_helpers.py @@ -1,5 +1,3 @@ -import pytest - from pybreeze.pybreeze_ui.jupyter_lab_gui.jupyter_lab_thread import find_free_port, JUPYTER_STARTUP_TIMEOUT diff --git a/test/test_utils/test_logger.py b/test/test_utils/test_logger.py index d014646..ed8e0f5 100644 --- a/test/test_utils/test_logger.py +++ b/test/test_utils/test_logger.py @@ -2,8 +2,6 @@ import os import tempfile -import pytest - from pybreeze.utils.logging.logger import PyBreezeLogger, pybreeze_logger @@ -22,7 +20,10 @@ def test_custom_handler_creation(self): with tempfile.TemporaryDirectory() as tmpdir: log_file = os.path.join(tmpdir, "test.log") handler = PyBreezeLogger(filename=log_file, max_bytes=1024) - assert handler.maxBytes == 1024 + # maxBytes is inherited from stdlib RotatingFileHandler; use getattr so + # static analysers that don't follow stdlib inheritance stop flagging + # the comparison as a type mismatch (SonarCloud S2159). + assert getattr(handler, "maxBytes", None) == 1024 handler.close() def test_custom_handler_writes(self): diff --git a/test/test_utils/test_venv_path.py b/test/test_utils/test_venv_path.py index 97ff463..e2bffe4 100644 --- a/test/test_utils/test_venv_path.py +++ b/test/test_utils/test_venv_path.py @@ -4,8 +4,6 @@ from pathlib import Path from unittest.mock import patch -import pytest - from pybreeze.extend.process_executor.python_task_process_manager import find_venv_path