diff --git a/pyproject.toml b/pyproject.toml index f65672e..2082825 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "requests>=2.32.4", "textual>=3.3.0", "pydantic>=2.7.1", + "prompt-toolkit>=3.0.51", ] [project.urls] @@ -44,7 +45,7 @@ dependencies = [ "Documentation" = "https://github.com/simedw/spegel#readme" [project.scripts] -spegel = "spegel.main:main" +spegel = "spegel._internal.cli:main" [project.optional-dependencies] dev = [ diff --git a/src/spegel/__init__.py b/src/spegel/__init__.py index db9b931..b05dad5 100644 --- a/src/spegel/__init__.py +++ b/src/spegel/__init__.py @@ -1,12 +1,9 @@ """Spegel – Reflect the web through AI (package entry).""" -from importlib.metadata import version, PackageNotFoundError +from ._internal.debug import _get_version -try: - __version__: str = version("spegel") -except PackageNotFoundError: # pragma: no cover - __version__ = "0.0.0-dev" +__version__: str = _get_version() -from .main import Spegel as _SpegelApp, main # noqa: F401 +from .main import Spegel as _SpegelApp # noqa: F401 -__all__ = ["__version__", "_SpegelApp", "main"] +__all__ = ["__version__", "_SpegelApp"] diff --git a/src/spegel/__main__.py b/src/spegel/__main__.py index ee1a0d3..4ab2823 100644 --- a/src/spegel/__main__.py +++ b/src/spegel/__main__.py @@ -1,6 +1,10 @@ """Run Spegel as a module: `python -m spegel`""" -from .main import main +import sys + +from ._internal.cli import main as cli_main if __name__ == "__main__": - main() + sys.exit( + cli_main(sys.argv[1:]) + ) # Pass command-line arguments to the CLI main function diff --git a/src/spegel/_internal/__init__.py b/src/spegel/_internal/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spegel/_internal/cli.py b/src/spegel/_internal/cli.py new file mode 100644 index 0000000..410a89e --- /dev/null +++ b/src/spegel/_internal/cli.py @@ -0,0 +1,90 @@ +# Why does this file exist, and why not put this in `__main__`? +# +# You might be tempted to import things from `__main__` later, +# but that will cause problems: the code will get executed twice: +# +# - When you run `python -m spegel` python will execute +# `__main__.py` as a script. That means there won't be any +# `spegel.__main__` in `sys.modules`. +# - When you import `__main__` it will get executed again (as a module) because +# there's no `spegel.__main__` in `sys.modules`. +from __future__ import annotations + +from argparse import Action, ArgumentParser, Namespace +import sys +from typing import Any + +from spegel._internal import debug +from spegel.main import Spegel + + +class _DebugInfo(Action): + def __init__(self, nargs: int | str | None = 0, **kwargs: Any) -> None: + super().__init__(nargs=nargs, **kwargs) + + def __call__(self, *_: Any, **__: Any) -> None: + print(debug._get_debug_info()) + sys.exit(0) + + +class _About(Action): + def __init__(self, nargs: int | str | None = 0, **kwargs: Any) -> None: + super().__init__(nargs=nargs, **kwargs) + + def __call__(self, *_: Any, **__: Any) -> None: + print(debug._get_package_info()) + sys.exit(0) + + +def get_parser() -> ArgumentParser: + name: str = debug._get_name() + version: str = f"{name} v{debug._get_version()}" + parser = ArgumentParser( + description=name.capitalize(), prog=name, exit_on_error=False + ) + parser.add_argument("-V", "--version", action="version", version=version) + parser.add_argument( + "--about", action=_About, help="Print information about the package" + ) + parser.add_argument( + "--debug_info", action=_DebugInfo, help="Print debug information" + ) + parser.add_argument("url", nargs="?", help="URL to open immediately on launch") + return parser + + +def main(args: list[str] | None = None) -> int: + """Main entry point for the CLI. + + This function is called when the CLI is executed. It can be used to + initialize the CLI, parse arguments, and execute commands. + + Args: + args (list[str] | None): A list of command-line arguments. If None, uses sys.argv[1:]. + + Returns: + int: Exit code of the CLI execution. 0 for success, non-zero for failure. + """ + if args is None: + args = sys.argv[1:] + try: + parser: ArgumentParser = get_parser() + opts: Namespace = parser.parse_args(args) + initial_url: str | None = opts.url + + if initial_url is not None and not initial_url.startswith( + ("http://", "https://") + ): + # Auto-prepend https if scheme is missing + initial_url = f"https://{initial_url}" + + app = Spegel(initial_url=initial_url) + app.run() + except Exception as e: + print(f"Error initializing {debug._get_name()}: {e}", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + main() diff --git a/src/spegel/_internal/debug.py b/src/spegel/_internal/debug.py new file mode 100644 index 0000000..6ba408d --- /dev/null +++ b/src/spegel/_internal/debug.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +from dataclasses import dataclass +import importlib.metadata +from importlib.metadata import PackageNotFoundError, metadata, version +import os +import platform +import sys + +__PACKAGE_NAME__ = "spegel" + + +@dataclass +class _Package: + """Dataclass to store package information.""" + + name: str = __PACKAGE_NAME__ + """Package name.""" + version: str = "0.0.0-dev" + """Package version.""" + description: str = "No description available." + """Package description.""" + + def __str__(self) -> str: + """String representation of the package information.""" + return f"{self.name} v{self.version}: {self.description}" + + +@dataclass +class _Variable: + """Dataclass describing an environment variable.""" + + name: str + """Variable name.""" + value: str + """Variable value.""" + + +@dataclass +class _Environment: + """Dataclass to store environment information.""" + + interpreter_name: str + """Python interpreter name.""" + interpreter_version: str + """Python interpreter version.""" + interpreter_path: str + """Path to Python executable.""" + platform: str + """Operating System.""" + packages: list[_Package] + """Installed packages.""" + variables: list[_Variable] + """Environment variables.""" + + def __str__(self) -> str: + """String representation of the environment information.""" + return ( + f"Python {self.interpreter_name} {self.interpreter_version} " + f"({self.interpreter_path}) on {self.platform}\n" + f"Packages:\n{', '.join(str(pkg) for pkg in self.packages)}\n" + f"Variables:\n{', '.join(f'{var.name}={var.value}' for var in self.variables)}" + ) + + +def _interpreter_name_version() -> tuple[str, str]: + if hasattr(sys, "implementation"): + impl: sys._version_info = sys.implementation.version + version = f"{impl.major}.{impl.minor}.{impl.micro}" + kind = impl.releaselevel + if kind != "final": + version += kind[0] + str(impl.serial) + return sys.implementation.name, version + return "", "0.0.0" + + +def _get_package_info(dist: str = __PACKAGE_NAME__) -> _Package: + try: + return _Package( + name=dist, + version=version(dist), + description=metadata(dist)["Summary"], + ) + except PackageNotFoundError: + return _Package(name=dist) + + +def _get_name(dist: str = __PACKAGE_NAME__) -> str: + """Get name of the given distribution. + + Parameters: + dist: A distribution name. + + Returns: + A package name. + """ + return _get_package_info(dist).name + + +def _get_version(dist: str = __PACKAGE_NAME__) -> str: + """Get version of the given distribution. + + Parameters: + dist: A distribution name. + + Returns: + A version number. + """ + return _get_package_info(dist).version + + +def _get_description(dist: str = __PACKAGE_NAME__) -> str: + """Get description of the given distribution. + + Parameters: + dist: A distribution name. + + Returns: + A description string. + """ + return _get_package_info(dist).description + + +def _get_debug_info() -> _Environment: + """Get debug/environment information. + + Returns: + Environment information. + """ + py_name, py_version = _interpreter_name_version() + packages: list[str] = [__PACKAGE_NAME__] + variables: list[str] = [ + "PYTHONPATH", + *[ + var + for var in os.environ + if var.startswith(__PACKAGE_NAME__.replace("-", "_")) + ], + ] + return _Environment( + interpreter_name=py_name, + interpreter_version=py_version, + interpreter_path=sys.executable, + platform=platform.platform(), + variables=[_Variable(var, val) for var in variables if (val := os.getenv(var))], + packages=[_Package(pkg, _get_version(pkg)) for pkg in packages], + ) + + +def _get_installed_packages() -> list[_Package]: + """Get all installed packages in current environment""" + packages = [] + for dist in importlib.metadata.distributions(): + packages.append({"name": dist.metadata["Name"], "version": dist.version}) + return packages + + +if __name__ == "__main__": + print(_get_debug_info()) diff --git a/src/spegel/config.py b/src/spegel/config.py index 1ed9785..9a33e3c 100644 --- a/src/spegel/config.py +++ b/src/spegel/config.py @@ -1,12 +1,3 @@ -from __future__ import annotations - -from pathlib import Path -from typing import List, Dict, Any - -import tomllib -from pydantic import BaseModel, Field, model_validator - - """Configuration handling for Spegel. This module is responsible for: @@ -15,6 +6,14 @@ • Providing fallback defaults so the app can run with zero user config. """ +from __future__ import annotations + +import tomllib +from pathlib import Path +from typing import Any, Self + +from pydantic import BaseModel, Field, model_validator + __all__ = [ "View", "AI", @@ -39,10 +38,10 @@ class View(BaseModel): model: str = "" # Optional model override for this view @model_validator(mode="after") - def validate_hotkey(cls, values): # type: ignore[override] - if len(values.hotkey) != 1: + def validate_hotkey(self) -> Self: + if len(self.hotkey) != 1: raise ValueError("Hotkey must be a single character") - return values + return self class AI(BaseModel): @@ -65,9 +64,9 @@ class FullConfig(BaseModel): settings: Settings = Settings() ai: AI = AI() ui: UI = UI() - views: List[View] = Field(default_factory=list) + views: list[View] = Field(default_factory=list) - def view_map(self) -> Dict[str, View]: + def view_map(self) -> dict[str, View]: """Return a mapping of view_id → View for quick lookup.""" return {v.id: v for v in self.views if v.enabled} @@ -76,7 +75,7 @@ def view_map(self) -> Dict[str, View]: # Defaults # -------------------------------------------------------------------------------------- -DEFAULT_CONFIG_DICT: Dict[str, Any] = { +DEFAULT_CONFIG_DICT: dict[str, Any] = { "settings": { "default_view": "terminal", "max_history": 50, @@ -114,7 +113,7 @@ def view_map(self) -> Dict[str, View]: } -def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]: +def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: """Recursively merge two dicts (override wins).""" result = base.copy() for key, value in override.items(): @@ -143,7 +142,7 @@ def load_config() -> FullConfig: Path.home() / ".config" / "spegel" / "config.toml", ] - merged: Dict[str, Any] = DEFAULT_CONFIG_DICT + merged: dict[str, Any] = DEFAULT_CONFIG_DICT # Only load the first config file found, not all of them for path in config_paths: diff --git a/src/spegel/llm.py b/src/spegel/llm.py index e12fcf8..67f8f76 100644 --- a/src/spegel/llm.py +++ b/src/spegel/llm.py @@ -1,8 +1,12 @@ from __future__ import annotations -import os import logging -from typing import AsyncIterator, Dict, Any +import os +from collections.abc import AsyncIterator +from typing import cast + +from litellm import CustomStreamWrapper +from litellm.types.utils import ModelResponse """Light abstraction layer over one or more LLM back-ends. @@ -33,7 +37,7 @@ def enable_llm_logging(level: int = logging.INFO) -> None: try: import litellm except ImportError: # pragma: no cover – dependency is optional until used - litellm = None # type: ignore + litellm = None # type: ignore[assignment] __all__ = ["LLMClient", "LiteLLMClient", "create_client", "LLMAuthenticationError"] @@ -123,7 +127,9 @@ async def stream( try: response = await litellm.acompletion(**completion_params) - + if isinstance(response, ModelResponse): + # only the ModelResponse class has the context manager, so this gets rid of type issues + response = cast(CustomStreamWrapper, response) async for chunk in response: try: # Extract content from the chunk @@ -137,7 +143,7 @@ async def stream( logger.warning("Error processing chunk: %s", e) continue - except litellm.AuthenticationError as e: + except litellm.AuthenticationError as e: # type: ignore[no-redef] logger.error("Authentication error in LLM completion: %s", e) # Extract the model provider from the model name for better error message model_provider = ( @@ -221,6 +227,13 @@ def create_client(model: str) -> LLMClient | None: async def _main() -> None: try: + if client is None: + print( + f"Error: No LLM client available for model '{model}'. " + "Check your API keys and model configuration.", + file=sys.stderr, + ) + sys.exit(1) async for chunk in client.stream(args.prompt, ""): print(chunk, end="", flush=True) except KeyboardInterrupt: diff --git a/src/spegel/main.py b/src/spegel/main.py index 8f87c3b..761baa7 100644 --- a/src/spegel/main.py +++ b/src/spegel/main.py @@ -1,19 +1,14 @@ #!/usr/bin/env python3 -""" -Spegel - Reflect the web through AI -""" +"""Spegel - Reflect the web through AI""" import asyncio -from typing import Optional, Dict, List +import re +from contextlib import suppress +from typing import cast +from urllib.parse import urljoin from dotenv import load_dotenv -import re from textual import on - -# External modules -from .config import load_config, View -from .llm import create_client - from textual.app import App, ComposeResult from textual.containers import Container from textual.widgets import ( @@ -22,13 +17,21 @@ Input, Markdown, Static, + Tab, TabbedContent, TabPane, TextArea, ) -from .web import fetch_url as fetch_url_blocking, html_to_markdown +from spegel.config import FullConfig +from spegel.llm import LLMClient + +# External modules +from .config import View, load_config +from .llm import create_client from .views import stream_view +from .web import fetch_url as fetch_url_blocking +from .web import html_to_markdown # Load environment variables load_dotenv() @@ -44,7 +47,7 @@ def __init__(self, content: str = "", **kwargs): def on_markdown_link_clicked(self, event: Markdown.LinkClicked) -> None: """Handle link clicks to navigate within the browser instead of opening externally.""" # Get the main app instance - app = self.app + app: Spegel = cast("Spegel", self.app) if hasattr(app, "handle_internal_link_click"): # Prevent default behavior (opening in external browser) event.prevent_default() @@ -74,12 +77,12 @@ class LinkManager: def __init__(self, app): self.app = app - self.current_links: List[ + self.current_links: list[ tuple ] = [] # List of (link_text, link_url, start_pos, end_pos) tuples self.current_link_index: int = -1 # Currently selected link index - def extract_links_from_markdown(self, content: str) -> List[tuple]: + def extract_links_from_markdown(self, content: str) -> list[tuple]: """Extract all links from markdown content with position tracking.""" # Regex to match markdown links: [text](url) - including angle brackets from html2text link_pattern = r"\[([^\]]+)\]\(([^)]+)\)" @@ -232,7 +235,7 @@ def _restore_scroll_if_needed(self, content_widget, scroll_state: dict) -> None: ) ) - def _restore_scroll_position(self, content_widget, target_scroll_y: int) -> None: + def _restore_scroll_position(self, content_widget, target_scroll_y: float) -> None: """Restore scroll position after content update.""" try: # Ensure we don't scroll beyond the new content bounds @@ -298,10 +301,10 @@ class Spegel(App): def __init__(self, initial_url: str | None = None, **kwargs): # Load configuration first (before super().__init__) - self.config = load_config() + self.config: FullConfig = load_config() # Load views from config (mapping view_id -> View) - self.views: Dict[str, View] = {v.id: v for v in self.config.views if v.enabled} + self.views: dict[str, View] = {v.id: v for v in self.config.views if v.enabled} self.current_view = self.config.settings.default_view super().__init__(**kwargs) @@ -309,25 +312,25 @@ def __init__(self, initial_url: str | None = None, **kwargs): # Set app title from config self.title = self.config.settings.app_title - self.current_url: Optional[str] = None + self.current_url: str | None = None # URL provided via CLI to open on startup - self._startup_url: Optional[str] = initial_url + self._startup_url: str | None = initial_url self.raw_html: str = "" self.url_input_visible = False self.prompt_editor_visible = False self.views_loaded: set = set() # Track which views have been processed self.views_loading: set = set() # Track which views are currently loading - self.original_content: Dict[ + self.original_content: dict[ str, str ] = {} # Store original content for each view - self.url_history: List[str] = [] # History of visited URLs for back navigation + self.url_history: list[str] = [] # History of visited URLs for back navigation # Initialize LLM client with default model from config - self.llm_client = create_client(self.config.ai.default_model) + self.llm_client: LLMClient | None = create_client(self.config.ai.default_model) # Initialize managers - self.scroll_manager = ScrollManager(self) - self.link_manager = LinkManager(self) + self.scroll_manager: ScrollManager = ScrollManager(self) + self.link_manager: LinkManager = LinkManager(self) def compose(self) -> ComposeResult: """Create child widgets for the app.""" @@ -380,7 +383,7 @@ def action_show_url_input(self) -> None: if not self.url_input_visible and not self.prompt_editor_visible: self.url_input_visible = True self.add_class("url-input-visible") - url_input = self.query_one("#url-input", URLInput) + url_input: URLInput = self.query_one("#url-input", URLInput) url_input.focus() if self.current_url: url_input.value = self.current_url @@ -397,7 +400,7 @@ def action_hide_overlays(self) -> None: # Focus the current tab content try: - content_widget = self.query_one( + content_widget: HTMLContent = self.query_one( f"#content-{self.current_view}", HTMLContent ) content_widget.focus() @@ -419,16 +422,16 @@ def action_edit_prompt(self) -> None: self.add_class("prompt-editor-visible") # Load current prompt - current_prompt = self.views[self.current_view].prompt - prompt_editor = self.query_one("#prompt-editor", PromptEditor) + current_prompt: str = self.views[self.current_view].prompt + prompt_editor: PromptEditor = self.query_one("#prompt-editor", PromptEditor) prompt_editor.text = current_prompt prompt_editor.focus() def action_switch_tab(self, tab_id: str) -> None: """Switch to a specific tab.""" if tab_id in self.views: - self.current_view = tab_id - tabbed_content = self.query_one(TabbedContent) + self.current_view: str = tab_id + tabbed_content: TabbedContent = self.query_one(TabbedContent) tabbed_content.active = tab_id def action_go_back(self) -> None: @@ -452,7 +455,7 @@ def action_go_back(self) -> None: def action_scroll_up(self) -> None: """Scroll the current content up.""" try: - content_widget = self.query_one( + content_widget: HTMLContent = self.query_one( f"#content-{self.current_view}", HTMLContent ) content_widget.action_scroll_up() @@ -462,7 +465,7 @@ def action_scroll_up(self) -> None: def action_scroll_down(self) -> None: """Scroll the current content down.""" try: - content_widget = self.query_one( + content_widget: HTMLContent = self.query_one( f"#content-{self.current_view}", HTMLContent ) content_widget.action_scroll_down() @@ -534,8 +537,8 @@ async def on_key(self, event) -> None: # Handle Ctrl+S for prompt editor if self.prompt_editor_visible and event.key == "ctrl+s": # Save the prompt - prompt_editor = self.query_one("#prompt-editor", PromptEditor) - new_prompt = prompt_editor.text + prompt_editor: PromptEditor = self.query_one("#prompt-editor", PromptEditor) + new_prompt: str = prompt_editor.text self.views[self.current_view].prompt = new_prompt self.notify(f"Prompt saved for {self.views[self.current_view].name}") @@ -565,18 +568,19 @@ async def handle_tab_change(self, event: TabbedContent.TabActivated) -> None: # Handle the case where Textual adds prefixes to tab IDs if raw_tab_id.startswith("--content-tab-"): - tab_name = raw_tab_id.replace("--content-tab-", "") + tab_name: str = raw_tab_id.replace("--content-tab-", "") else: - tab_name = raw_tab_id + tab_name: str = raw_tab_id if tab_name in self.views: self.current_view = tab_name # Check if this view needs to be loaded on-demand - view_config = self.views[tab_name] + view_config: View = self.views[tab_name] - needs_loading = ( - self.raw_html # We have content to process + has_content = bool(self.raw_html) + needs_loading: bool = ( + has_content # We have content to process and tab_name not in self.views_loaded # Not already loaded and tab_name not in self.views_loading # Not currently loading and not view_config.auto_load # Not an auto-load view @@ -620,7 +624,7 @@ async def fetch_and_display_url(self, url: str) -> None: self.url_history = self.url_history[-50:] # Show loading in current view only - current_content_widget = self.query_one( + current_content_widget: HTMLContent = self.query_one( f"#content-{self.current_view}", HTMLContent ) current_content_widget.update(f"Loading {url}...") @@ -634,11 +638,11 @@ async def fetch_and_display_url(self, url: str) -> None: try: # Fetch content - html_text = await asyncio.get_event_loop().run_in_executor( + html_text: str | None = await asyncio.get_event_loop().run_in_executor( None, fetch_url_blocking, url ) - if html_text: + if html_text is not None: self.current_url = url self.title = f"LLM Browser - {url}" self.raw_html = html_text @@ -654,7 +658,7 @@ async def fetch_and_display_url(self, url: str) -> None: async def _process_all_views_parallel(self) -> None: """Process views in parallel, respecting auto_load settings.""" # Only process auto-load views initially - auto_load_views = [ + auto_load_views: list[str] = [ view_id for view_id, view in self.views.items() if view.auto_load ] @@ -674,7 +678,9 @@ async def _process_single_view(self, view_id: str) -> None: """Process a single view and update its tab name.""" try: # Set immediate loading message in content - content_widget = self.query_one(f"#content-{view_id}", HTMLContent) + content_widget: HTMLContent = self.query_one( + f"#content-{view_id}", HTMLContent + ) if view_id == "raw": content_widget.update( "## Loading content...\n\n*Please wait while the page is fetched and parsed.*" @@ -713,52 +719,49 @@ async def _process_single_view(self, view_id: str) -> None: def _update_tab_name(self, view_id: str) -> None: """Update tab name with loading/loaded indicators.""" - base_name = self.views[view_id].name + base_name: str = self.views[view_id].name if view_id in self.views_loading: - display_name = f"⏳ {base_name}" + display_name: str = f"⏳ {base_name}" elif view_id in self.views_loaded: - display_name = f"✓ {base_name}" + display_name: str = f"✓ {base_name}" else: - display_name = base_name + display_name: str = base_name def update_label() -> None: - try: - tabbed = self.query_one(TabbedContent) - tab = tabbed.get_tab(view_id) + """Update the tab label on the main thread.""" + + with suppress(Exception): + # If we can't update the tab label, just ignore it + tabbed: TabbedContent = self.query_one(TabbedContent) + tab: Tab = tabbed.get_tab(view_id) if tab: tab.label = display_name tab.refresh() - except Exception: - # Fallback: try TabPane - try: - pane = self.query_one(f"#{view_id}", TabPane) - pane.label = display_name - pane.refresh() - except Exception: - pass # schedule the label update on main thread self.call_later(update_label) def _reset_tab_names(self) -> None: """Reset all tab names to their base names.""" - for view_id in self.views.keys(): - try: - tab_pane = self.query_one(f"#{view_id}", TabPane) - tab_pane.label = self.views[view_id].name - except Exception: - pass # Ignore if tab not found + with suppress(Exception): + # Ignore if tab not found + tabbed: TabbedContent = self.query_one(TabbedContent) + for view_id, view in self.views.items(): + tab: Tab = tabbed.get_tab(view_id) + if tab: + tab.label = view.name + tab.refresh() async def update_view_content(self, view_id: str) -> None: """Update content for a specific view.""" if not self.raw_html: return - content_widget = self.query_one(f"#content-{view_id}", HTMLContent) + content_widget: HTMLContent = self.query_one(f"#content-{view_id}", HTMLContent) if view_id == "raw": # Raw view - just parse HTML normally - formatted_content = html_to_markdown(self.raw_html, self.current_url) + formatted_content: str = html_to_markdown(self.raw_html, self.current_url) # Store original content and extract links self.original_content[view_id] = formatted_content @@ -779,8 +782,8 @@ async def update_view_content(self, view_id: str) -> None: # Other views – use the central processor content_widget.update("*Streaming response...*\n\n") - running_content = "" - chunk_count = 0 + running_content: str = "" + chunk_count: int = 0 async for chunk in stream_view( self.views[view_id], @@ -827,7 +830,6 @@ async def action_open_link(self) -> None: def _resolve_url(self, url: str) -> str: """Resolve a URL against the current page URL, handling relative URLs.""" - from urllib.parse import urljoin if not url.startswith(("http://", "https://")): if self.current_url: @@ -863,15 +865,10 @@ def handle_internal_link_click(self, href: str) -> None: # Use call_later to avoid potential event loop issues self.call_later(lambda: self.run_async_task(self.fetch_and_display_url(href))) - def run_async_task(self, task): + def run_async_task(self, task) -> None: """Helper to run async tasks from sync context.""" - import asyncio - - if hasattr(asyncio, "create_task"): - asyncio.create_task(task) - else: - # Fallback for older Python versions - asyncio.ensure_future(task) + asyncio.create_task(task) + # Since we are >3.11, there is no reason to support the fallback for older versions # --------------------------------- # Dynamic Key Binding Setup @@ -903,36 +900,3 @@ def _setup_bindings(self) -> None: severity="warning", timeout=3, ) - - -def main() -> None: - """CLI entry point for the *spegel* command. - - Usage:: - - spegel # opens browser with welcome screen - spegel https://news.ycombinator.com # auto-loads URL on launch - """ - - import argparse - import sys - - parser = argparse.ArgumentParser( - prog="spegel", - description="Spegel – Reflect the web through AI (terminal browser)", - ) - parser.add_argument("url", nargs="?", help="URL to open immediately on launch") - - args = parser.parse_args(sys.argv[1:]) - - initial_url = args.url - if initial_url and not initial_url.startswith(("http://", "https://")): - # Auto-prepend https if scheme is missing - initial_url = f"https://{initial_url}" - - app = Spegel(initial_url=initial_url) - app.run() - - -if __name__ == "__main__": - main() diff --git a/src/spegel/views.py b/src/spegel/views.py index 8a3053d..f40e25b 100644 --- a/src/spegel/views.py +++ b/src/spegel/views.py @@ -1,12 +1,9 @@ from __future__ import annotations -from typing import List - from .config import View -from .llm import LLMClient, create_client, LLMAuthenticationError +from .llm import LLMAuthenticationError, LLMClient, create_client from .web import extract_clean_text, html_to_markdown - """View processing logic for Spegel. For now this module contains thin wrappers that will eventually host the full diff --git a/src/spegel/web.py b/src/spegel/web.py index e2db020..bddb927 100644 --- a/src/spegel/web.py +++ b/src/spegel/web.py @@ -1,34 +1,36 @@ -from __future__ import annotations - -import re -from typing import Optional, List -import requests -from bs4 import BeautifulSoup - """Web fetching and HTML cleaning utilities for Spegel. This module centralises network I/O so the UI layer can remain async and testable. """ +from __future__ import annotations -__all__ = ["fetch_url", "extract_clean_text"] +from argparse import Namespace +import re +from bs4.element import NavigableString, PageElement +import httpx +from bs4 import BeautifulSoup, ResultSet, Tag +from charset_normalizer import CharsetMatch, from_bytes +__all__ = ["fetch_url", "extract_clean_text"] HEADERS = {"User-Agent": "Spegel/1.0 (Terminal Browser)"} -def fetch_url(url: str, timeout: int = 10) -> Optional[str]: +def fetch_url(url: str, timeout: int = 10) -> str | None: """Blocking HTTP GET returning the raw HTML text or None on error.""" try: - resp = requests.get(url, headers=HEADERS, timeout=timeout) + resp: httpx.Response = httpx.get(url, headers=HEADERS, timeout=timeout) resp.raise_for_status() - # Handle encoding more robustly to fix Unicode issues, but only when Requests clearly does not know + # Handle encoding more robustly to fix Unicode issues using charset_normalizer if not resp.encoding or resp.encoding.lower() in ("iso-8859-1", "ascii"): - resp.encoding = resp.apparent_encoding - + detected: CharsetMatch | None = from_bytes(resp.content).best() + if detected is not None: + resp.encoding = detected.encoding return resp.text - except requests.RequestException: + except httpx.HTTPError as exc: + # TODO: Log the error or handle it as needed return None @@ -37,12 +39,12 @@ def fetch_url(url: str, timeout: int = 10) -> Optional[str]: # --------------------------------------------------------------------------- -def _extract_table_content(table) -> List[str]: - content: List[str] = [] +def _extract_table_content(table) -> list[str]: + content: list[str] = [] for row in table.find_all("tr"): cells = row.find_all(["td", "th"]) if cells: - row_content: List[str] = [] + row_content: list[str] = [] for cell in cells: links = cell.find_all("a") if links: @@ -103,7 +105,7 @@ def extract_clean_text( ] for selector in content_selectors: - elements = soup.select(selector) + elements: ResultSet[Tag] = soup.select(selector) if elements: # Use the element with the most text content best_element = max(elements, key=lambda e: len(e.get_text(strip=True))) @@ -134,7 +136,7 @@ def extract_clean_text( node.decompose() else: # Fallback: clean the whole page but be more conservative - working_soup = soup + working_soup: BeautifulSoup = soup # More targeted noise removal (avoid overly broad selectors) conservative_noise_selectors = [ @@ -177,21 +179,21 @@ def extract_clean_text( h = html2text.HTML2Text() h.ignore_links = False h.ignore_images = True - h.body_width = None + h.body_width = 0 # disable line wrapping to avoid broken URLs h.wrap_links = False h.protect_links = True - cleaned_markdown = h.handle(str(working_soup)) + cleaned_markdown: str = h.handle(str(working_soup)) # html2text wraps URLs in <...>. Remove the angle brackets for cleaner markdown. - cleaned_markdown = re.sub(r"\]\(<([^>]+)>\)", r"](\1)", cleaned_markdown) + cleaned_markdown: str = re.sub(r"\]\(<([^>]+)>\)", r"](\1)", cleaned_markdown) # Optional truncation for token safety when used inside the browser if max_chars is not None and len(cleaned_markdown) > max_chars: cleaned_markdown = cleaned_markdown[:max_chars] + "\n...(truncated)" - title_tag = soup.find("title") - title_text = title_tag.get_text().strip() if title_tag else "No Title" + title_tag: PageElement | Tag | NavigableString | None = soup.find("title") + title_text: str = title_tag.get_text().strip() if title_tag else "No Title" - header = f"Title: {title_text}\nURL: {url or ''}\n\n" + header: str = f"Title: {title_text}\nURL: {url or ''}\n\n" return header + cleaned_markdown @@ -209,7 +211,7 @@ def html_to_markdown(html: str, base_url: str | None = None) -> str: h.ignore_links = False h.ignore_images = False h.ignore_emphasis = False - h.body_width = None # disable line wrapping to avoid broken URLs + h.body_width = 0 # disable line wrapping to avoid broken URLs h.wrap_links = False h.unicode_snob = True h.skip_internal_links = True @@ -230,10 +232,17 @@ def _fix(m): markdown_content = re.sub(r"\]\(([^)]+)\)", _fix, markdown_content) soup = BeautifulSoup(html, "lxml") - title = soup.find("title") - title_text = title.get_text().strip() if title else "No Title" - - header = [f"# {title_text}", "", f"**URL:** `{base_url or ''}`", "", "---", ""] + title: PageElement | Tag | NavigableString | None = soup.find("title") + title_text: str = title.get_text().strip() if title else "No Title" + + header: list[str] = [ + f"# {title_text}", + "", + f"**URL:** `{base_url or ''}`", + "", + "---", + "", + ] return "\n".join(header) + markdown_content except Exception as exc: return f"## ❌ Error parsing HTML\n\n```\n{exc}\n```" @@ -247,9 +256,9 @@ def _fix(m): description="Fetch a URL and print Spegel's cleaned text representation." ) parser.add_argument("url", help="URL to fetch") - args = parser.parse_args() + args: Namespace = parser.parse_args() - html = fetch_url(args.url) + html: str | None = fetch_url(args.url) if html is None: print("Error: Failed to fetch URL", file=sys.stderr) sys.exit(1) diff --git a/tests/test_content_processing.py b/tests/test_content_processing.py index a0204ab..afb58f9 100644 --- a/tests/test_content_processing.py +++ b/tests/test_content_processing.py @@ -507,15 +507,25 @@ def test_reset_tab_names(self): "summary": mock_view2, } - # Mock tab panes - mock_tab_panes = [Mock(), Mock()] - self.app.query_one = Mock(side_effect=mock_tab_panes) + mock_tab1 = Mock() + mock_tab2 = Mock() + mock_tabbed_content = Mock() + mock_tabbed_content.get_tab.side_effect = lambda view_id: { + "raw": mock_tab1, + "summary": mock_tab2, + }.get(view_id) + + # query_one should always return the same TabbedContent mock + self.app.query_one = Mock(return_value=mock_tabbed_content) self.app._reset_tab_names() - # Should update labels for both tabs - assert mock_tab_panes[0].label == "Raw" - assert mock_tab_panes[1].label == "Summary" + assert mock_tab1.label == "Raw" + assert mock_tab2.label == "Summary" + + # Verify refresh was called on both tabs + mock_tab1.refresh.assert_called_once() + mock_tab2.refresh.assert_called_once() class TestContentState: diff --git a/tests/test_llm.py b/tests/test_llm.py index 5a5b9d5..24f2552 100644 --- a/tests/test_llm.py +++ b/tests/test_llm.py @@ -3,7 +3,7 @@ import pytest -from spegel.llm import LiteLLMClient, LLMClient, create_client, LLMAuthenticationError +from spegel.llm import LiteLLMClient, LLMAuthenticationError, LLMClient, create_client @pytest.fixture diff --git a/tests/test_main.py b/tests/test_main.py index ea57532..6efdc0e 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -2,7 +2,8 @@ import pytest -from spegel.main import Spegel, main +from spegel._internal.cli import main +from spegel.main import Spegel class TestMainCLI: @@ -10,7 +11,7 @@ class TestMainCLI: def test_main_no_args(self): """Main with no arguments should start app with no initial URL.""" - with patch("spegel.main.Spegel") as mock_spegel_class: + with patch("spegel._internal.cli.Spegel") as mock_spegel_class: mock_app = Mock() mock_spegel_class.return_value = mock_app @@ -23,7 +24,7 @@ def test_main_no_args(self): def test_main_with_url(self): """Main with URL argument should start app with that URL.""" - with patch("spegel.main.Spegel") as mock_spegel_class: + with patch("spegel._internal.cli.Spegel") as mock_spegel_class: mock_app = Mock() mock_spegel_class.return_value = mock_app @@ -36,7 +37,7 @@ def test_main_with_url(self): def test_main_with_full_url(self): """Main with full URL should not modify it.""" - with patch("spegel.main.Spegel") as mock_spegel_class: + with patch("spegel._internal.cli.Spegel") as mock_spegel_class: mock_app = Mock() mock_spegel_class.return_value = mock_app @@ -48,7 +49,7 @@ def test_main_with_full_url(self): def test_main_with_http_url(self): """Main with http:// URL should not modify it.""" - with patch("spegel.main.Spegel") as mock_spegel_class: + with patch("spegel._internal.cli.Spegel") as mock_spegel_class: mock_app = Mock() mock_spegel_class.return_value = mock_app @@ -170,7 +171,7 @@ class TestURLHandling: ) def test_url_preprocessing(self, input_url, expected): """Test that URLs are correctly preprocessed in main().""" - with patch("spegel.main.Spegel") as mock_spegel_class: + with patch("spegel._internal.cli.Spegel") as mock_spegel_class: mock_app = Mock() mock_spegel_class.return_value = mock_app