diff --git a/installer/fastflowprompt.spec b/installer/fastflowprompt.spec index 03acfe8..7f233f7 100644 --- a/installer/fastflowprompt.spec +++ b/installer/fastflowprompt.spec @@ -51,6 +51,7 @@ HIDDEN_IMPORTS = [ "_version", "ffp_actions", "ffp_benchmark", + "ffp_chat", "ffp_config", "ffp_flm_server", "ffp_hardware", diff --git a/pyproject.toml b/pyproject.toml index bfe4039..83a8dd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ py-modules = [ "_version", "ffp_actions", "ffp_benchmark", + "ffp_chat", "ffp_config", "ffp_flm_server", "ffp_hardware", diff --git a/scripts/ffp_chat.py b/scripts/ffp_chat.py new file mode 100644 index 0000000..32bd1a4 --- /dev/null +++ b/scripts/ffp_chat.py @@ -0,0 +1,303 @@ +"""Daemon-side chat: thread storage + multi-turn LLM send with optional notes grounding. + +Replaces the retired tkinter chat popup. The web dashboard Chat tab talks to the +daemon ``chat_*`` actions, which delegate here. Pure stdlib. The LLM call reuses +``grammar_fix``'s provider-resolved endpoint (``FLM_BASE_URL`` / ``FLM_MODEL`` / +``LLM_AUTH_BEARER`` / ``FLM_TIMEOUT_SECONDS``), so chat follows the same +FastFlowLM/Ollama selection as grammar and notes. The thread store reuses the +existing ``data/chat_threads.jsonl`` format, so popup-era threads carry over. +""" + +from __future__ import annotations + +import json +import logging +import time +import urllib.error +import urllib.request +import uuid + +import paths as _paths + +log = logging.getLogger("ffp.chat") + +THREADS_PATH = _paths.CHAT_THREADS_FILE +STAGED_PATH = _paths.DATA_DIR / ".chat_staged.json" + +MAX_THREADS = 100 # cap stored threads (newest kept) +CONTEXT_WINDOW_TURNS = 12 # sliding window of prior turns sent to the model +MAX_TOKENS = 1024 +TEMPERATURE = 0.3 +TITLE_MAX_CHARS = 60 +SYSTEM_PROMPT = "You are a concise, helpful local assistant." + + +# ---------- notes grounding (ported from the retired chat popup) ---------------------- + +def build_notes_context_message(query: str, search_fn, max_notes: int = 4) -> tuple[str | None, list[str]]: + """Build the retrieval-injection system message for notes-grounded chat. + + Runs the ranked vault search and formats the top hits as grounding context + (the model is told to cite note titles in [brackets]). Returns + ``(message, titles)``; ``(None, [])`` when nothing matched. Pure given + ``search_fn`` — unit-testable without a vault. + """ + try: + found = search_fn(query, max_notes) + except Exception: + return None, [] + results = (found or {}).get("results") or [] + if not results: + return None, [] + titles: list[str] = [] + blocks: list[str] = [] + for r in results: + title = str(r.get("title") or "untitled") + titles.append(title) + category = str(r.get("category") or "") + snippet = str(r.get("snippet") or "").strip() + blocks.append(f"[{title}] ({category})\n{snippet}") + message = ( + "The user has a personal notes vault. The saved notes below are relevant " + "to their next message. Ground your answer in these notes and cite note " + "titles in [brackets] when you use them. If the notes do not answer the " + "question, say so briefly instead of guessing.\n\n" + "\n\n".join(blocks) + ) + return message, titles + + +def retrieve_notes_context(query: str, max_notes: int = 4) -> tuple[str | None, list[str]]: + """Vault-backed wrapper around :func:`build_notes_context_message`. ``notes`` + is imported lazily so chat doesn't pay for it when grounding is never used.""" + try: + import notes + except Exception: + return None, [] + return build_notes_context_message(query, notes.search_notes, max_notes) + + +# ---------- thread storage (data/chat_threads.jsonl) ---------------------------------- + +def _now_iso() -> str: + return time.strftime("%Y-%m-%dT%H:%M:%S") + + +def load_threads() -> list[dict]: + """Read chat_threads.jsonl, return the latest snapshot per thread, newest first.""" + if not THREADS_PATH.exists(): + return [] + latest: dict[str, dict] = {} + try: + with THREADS_PATH.open("r", encoding="utf-8", errors="replace") as f: + for raw in f: + raw = raw.strip() + if not raw: + continue + try: + row = json.loads(raw) + except Exception: + continue + tid = row.get("thread_id") + if not tid: + continue + prev = latest.get(tid) + if (prev is None) or (str(row.get("updated_at") or "") >= str(prev.get("updated_at") or "")): + latest[tid] = row + except Exception: + return [] + ordered = sorted(latest.values(), key=lambda r: str(r.get("updated_at") or ""), reverse=True) + return ordered[:MAX_THREADS] + + +def save_threads(threads: list[dict]) -> None: + """Compact-rewrite: one line per thread, atomic via tmp+replace.""" + try: + THREADS_PATH.parent.mkdir(parents=True, exist_ok=True) + tmp = THREADS_PATH.with_suffix(".jsonl.tmp") + with tmp.open("w", encoding="utf-8") as f: + for t in threads: + f.write(json.dumps(t, ensure_ascii=False) + "\n") + tmp.replace(THREADS_PATH) + except Exception as exc: + log.warning("failed to save chat threads: %s", exc) + + +def _thread_index(threads: list[dict], thread_id: str) -> int: + for i, t in enumerate(threads): + if t.get("thread_id") == thread_id: + return i + return -1 + + +def list_threads() -> dict: + """Thread summaries (no bodies) for the dashboard sidebar, newest first.""" + return { + "threads": [ + { + "thread_id": t.get("thread_id"), + "title": t.get("title") or "New chat", + "updated_at": t.get("updated_at") or "", + } + for t in load_threads() + ] + } + + +def get_thread(thread_id: str) -> dict: + """Full history for one thread (empty shell if unknown).""" + threads = load_threads() + i = _thread_index(threads, str(thread_id or "")) + if i < 0: + return {"thread_id": str(thread_id or ""), "title": "New chat", "history": []} + t = threads[i] + return { + "thread_id": t.get("thread_id"), + "title": t.get("title") or "New chat", + "history": list(t.get("history") or []), + } + + +def delete_thread(thread_id: str) -> dict: + threads = load_threads() + kept = [t for t in threads if t.get("thread_id") != str(thread_id or "")] + if len(kept) != len(threads): + save_threads(kept) + return {"ok": True, "deleted": True} + return {"ok": True, "deleted": False} + + +# ---------- staged selection (Ctrl+Shift+A prefill) ----------------------------------- + +def stage_selection(text: str, source_app: str = "") -> dict: + """Stash a selection for the Chat tab to pick up (read-and-clear) on open.""" + payload = {"text": str(text or ""), "source_app": str(source_app or ""), "staged_at": _now_iso()} + try: + STAGED_PATH.parent.mkdir(parents=True, exist_ok=True) + STAGED_PATH.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") + except OSError as exc: + log.warning("stage_selection write failed: %s", exc) + raise RuntimeError(f"could not stage selection: {exc}") from exc + return {"ok": True, "chars": len(payload["text"])} + + +def take_staged() -> dict: + """Return and clear the staged selection ({text:"", source_app:""} if none).""" + if not STAGED_PATH.exists(): + return {"text": "", "source_app": ""} + try: + data = json.loads(STAGED_PATH.read_text(encoding="utf-8")) + except Exception: + data = {} + try: + STAGED_PATH.unlink() + except OSError: + pass + return {"text": str(data.get("text") or ""), "source_app": str(data.get("source_app") or "")} + + +# ---------- LLM call ------------------------------------------------------------------- + +def _default_llm_call(messages: list[dict]) -> str: + """POST /v1/chat/completions to the active provider endpoint, return the reply. + + Reads the endpoint/model/auth/timeout from ``grammar_fix``'s provider-resolved + module globals so chat matches grammar/notes routing. Raises RuntimeError on + transport/timeout/parse/empty-choices errors. + """ + import grammar_fix + + base_url = str(getattr(grammar_fix, "FLM_BASE_URL", "http://127.0.0.1:52625") or "").rstrip("/") + model = str(getattr(grammar_fix, "FLM_MODEL", "qwen3.5:4b") or "qwen3.5:4b") + bearer = str(getattr(grammar_fix, "LLM_AUTH_BEARER", "flm") or "") + timeout = int(getattr(grammar_fix, "FLM_TIMEOUT_SECONDS", 240) or 240) + + body = json.dumps( + { + "model": model, + "messages": messages, + "temperature": TEMPERATURE, + "max_tokens": MAX_TOKENS, + "stream": False, + } + ).encode("utf-8") + headers = {"Content-Type": "application/json"} + if bearer: + headers["Authorization"] = f"Bearer {bearer}" + req = urllib.request.Request(base_url + "/v1/chat/completions", data=body, headers=headers, method="POST") + try: + with urllib.request.urlopen(req, timeout=max(2, timeout)) as resp: + payload = json.loads(resp.read().decode("utf-8", errors="replace")) + except urllib.error.URLError as e: + raise RuntimeError(f"LLM unreachable at {base_url}: {getattr(e, 'reason', e)}") from e + except TimeoutError: + raise RuntimeError(f"LLM timed out after {timeout}s") from None + except json.JSONDecodeError as e: + raise RuntimeError(f"Malformed LLM response: {e}") from e + + choices = payload.get("choices") or [] + if not choices: + raise RuntimeError("LLM returned no choices.") + return str((choices[0].get("message") or {}).get("content") or "").strip() + + +def _derive_title(text: str) -> str: + text = (text or "").strip() + if not text: + return "New chat" + first = text.splitlines()[0].strip() + if not first: + return "New chat" + return (first[:TITLE_MAX_CHARS] + "…") if len(first) > TITLE_MAX_CHARS else first + + +def send(thread_id: str = "", message: str = "", use_notes: bool = False, llm_call=None) -> dict: + """Append the user message, call the LLM (optionally grounded in the notes + vault), persist, and return ``{thread_id, title, reply, notes_used}``. + + ``llm_call(messages)->str`` is injectable for tests; defaults to the live + provider endpoint. + """ + message = str(message or "").strip() + if not message: + raise ValueError("empty chat message") + llm_call = llm_call or _default_llm_call + + threads = load_threads() + idx = _thread_index(threads, str(thread_id or "")) if thread_id else -1 + if idx < 0: + thread_id = str(thread_id or "") or uuid.uuid4().hex + thread = {"thread_id": thread_id, "title": "New chat", "updated_at": _now_iso(), "history": []} + threads.insert(0, thread) + idx = 0 + thread = threads[idx] + thread_id = thread.get("thread_id") + history: list[dict] = list(thread.get("history") or []) + + # Build the request: system + optional notes grounding + sliding window + new turn. + messages: list[dict] = [{"role": "system", "content": SYSTEM_PROMPT}] + notes_used: list[str] = [] + if use_notes: + ctx, titles = retrieve_notes_context(message) + if ctx: + messages.append({"role": "system", "content": ctx}) + notes_used = titles + window = history[-(CONTEXT_WINDOW_TURNS * 2):] + for m in window: + role = str(m.get("role") or "") + content = str(m.get("content") or "") + if role in ("user", "assistant") and content: + messages.append({"role": role, "content": content}) + messages.append({"role": "user", "content": message}) + + reply = str(llm_call(messages) or "").strip() + + history.append({"role": "user", "content": message}) + history.append({"role": "assistant", "content": reply}) + if not thread.get("title") or thread.get("title") == "New chat": + thread["title"] = _derive_title(message) + thread["history"] = history + thread["updated_at"] = _now_iso() + threads.pop(idx) + threads.insert(0, thread) + save_threads(threads[:MAX_THREADS]) + return {"thread_id": thread_id, "title": thread["title"], "reply": reply, "notes_used": notes_used} diff --git a/scripts/ffp_daemon.py b/scripts/ffp_daemon.py index 2829ba0..d8f6aa1 100644 --- a/scripts/ffp_daemon.py +++ b/scripts/ffp_daemon.py @@ -654,6 +654,41 @@ def _try_send() -> bytes | None: return {"ok": False, "error": "chat did not accept ingest after spawn"} +# ---- Web-dashboard chat (daemon-backed; replaces the retired popup) ---------- +def _act_chat_threads_list(_args: dict) -> dict: + import ffp_chat + return ffp_chat.list_threads() + + +def _act_chat_thread_get(args: dict) -> dict: + import ffp_chat + return ffp_chat.get_thread(str(args.get("thread_id") or "")) + + +def _act_chat_send(args: dict) -> dict: + import ffp_chat + return ffp_chat.send( + thread_id=str(args.get("thread_id") or ""), + message=str(args.get("message") or ""), + use_notes=bool(args.get("use_notes")), + ) + + +def _act_chat_thread_delete(args: dict) -> dict: + import ffp_chat + return ffp_chat.delete_thread(str(args.get("thread_id") or "")) + + +def _act_chat_stage_selection(args: dict) -> dict: + import ffp_chat + return ffp_chat.stage_selection(str(args.get("text") or ""), str(args.get("source_app") or "")) + + +def _act_chat_take_staged(_args: dict) -> dict: + import ffp_chat + return ffp_chat.take_staged() + + ACTIONS: dict[str, Callable[[dict], Any]] = { "status": _act_status, "start": _act_start, @@ -701,6 +736,12 @@ def _try_send() -> bytes | None: "pull_status": _act_pull_status, "notify": _act_notify, "save_note": _act_save_note, + "chat_threads_list": _act_chat_threads_list, + "chat_thread_get": _act_chat_thread_get, + "chat_send": _act_chat_send, + "chat_thread_delete": _act_chat_thread_delete, + "chat_stage_selection": _act_chat_stage_selection, + "chat_take_staged": _act_chat_take_staged, "chat_send_selection": _act_chat_send_selection, "chat_reload": _act_chat_reload, "chat_restart": _act_chat_restart, @@ -719,6 +760,7 @@ def _try_send() -> bytes | None: "cycle_tone_preset", "set_tone", "set_tone_formal", "set_tone_casual", "set_tone_friendly", "pull_model", "remove_model", "apply_config_patch", "update_apply", "set_autostart", "bench_start", "pull_start", + "chat_send", "chat_thread_delete", "chat_stage_selection", "chat_take_staged", } _shutdown_event = threading.Event() diff --git a/tests/test_ffp_chat.py b/tests/test_ffp_chat.py new file mode 100644 index 0000000..fc9cf2e --- /dev/null +++ b/tests/test_ffp_chat.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +import json + +import ffp_chat +import pytest + + +@pytest.fixture(autouse=True) +def _tmp_store(tmp_path, monkeypatch): + """Point the thread + staged stores at a temp dir so tests never touch real data.""" + monkeypatch.setattr(ffp_chat, "THREADS_PATH", tmp_path / "chat_threads.jsonl") + monkeypatch.setattr(ffp_chat, "STAGED_PATH", tmp_path / ".chat_staged.json") + return tmp_path + + +# ---------- title derivation ----------------------------------------------------------- + +@pytest.mark.parametrize(("msg", "expected"), [ + ("", "New chat"), + (" ", "New chat"), + ("Summarize this", "Summarize this"), + ("first line\nsecond line", "first line"), # only first line +]) +def test_derive_title_basic(msg, expected): + assert ffp_chat._derive_title(msg) == expected + + +def test_derive_title_truncates_long(): + long = "x" * 200 + out = ffp_chat._derive_title(long) + assert out.endswith("…") and len(out) == ffp_chat.TITLE_MAX_CHARS + 1 + + +# ---------- thread store round-trip ---------------------------------------------------- + +def test_threads_roundtrip_newest_first_latest_snapshot(): + ffp_chat.save_threads([ + {"thread_id": "a", "title": "A", "updated_at": "2026-06-16T10:00:00", "history": []}, + {"thread_id": "b", "title": "B", "updated_at": "2026-06-16T11:00:00", "history": []}, + ]) + # A second snapshot of "a" with a newer timestamp should win and sort first. + with ffp_chat.THREADS_PATH.open("a", encoding="utf-8") as f: + f.write(json.dumps({"thread_id": "a", "title": "A2", "updated_at": "2026-06-16T12:00:00", "history": []}) + "\n") + loaded = ffp_chat.load_threads() + assert [t["thread_id"] for t in loaded] == ["a", "b"] # a now newest + assert loaded[0]["title"] == "A2" # latest snapshot wins + + +def test_list_and_get_thread(): + ffp_chat.save_threads([ + {"thread_id": "x", "title": "Hello", "updated_at": "2026-06-16T10:00:00", + "history": [{"role": "user", "content": "hi"}, {"role": "assistant", "content": "yo"}]}, + ]) + summaries = ffp_chat.list_threads()["threads"] + assert summaries == [{"thread_id": "x", "title": "Hello", "updated_at": "2026-06-16T10:00:00"}] + full = ffp_chat.get_thread("x") + assert full["history"][0] == {"role": "user", "content": "hi"} + # Unknown thread → empty shell, not an error. + assert ffp_chat.get_thread("nope") == {"thread_id": "nope", "title": "New chat", "history": []} + + +def test_delete_thread(): + ffp_chat.save_threads([ + {"thread_id": "x", "title": "X", "updated_at": "2026-06-16T10:00:00", "history": []}, + {"thread_id": "y", "title": "Y", "updated_at": "2026-06-16T10:00:00", "history": []}, + ]) + assert ffp_chat.delete_thread("x") == {"ok": True, "deleted": True} + assert [t["thread_id"] for t in ffp_chat.load_threads()] == ["y"] + assert ffp_chat.delete_thread("missing") == {"ok": True, "deleted": False} + + +# ---------- send() --------------------------------------------------------------------- + +def test_send_creates_thread_and_persists(): + seen = {} + + def fake_llm(messages): + seen["messages"] = messages + return "the reply" + + out = ffp_chat.send(message="Hello there", llm_call=fake_llm) + assert out["reply"] == "the reply" + assert out["title"] == "Hello there" + assert out["thread_id"] + # System prompt + the user turn reached the model. + assert seen["messages"][0]["role"] == "system" + assert seen["messages"][-1] == {"role": "user", "content": "Hello there"} + # Persisted with both turns. + full = ffp_chat.get_thread(out["thread_id"]) + assert [m["role"] for m in full["history"]] == ["user", "assistant"] + assert full["history"][1]["content"] == "the reply" + + +def test_send_continues_existing_thread_with_window(): + out1 = ffp_chat.send(message="first", llm_call=lambda m: "r1") + tid = out1["thread_id"] + captured = {} + + def cap(messages): + captured["m"] = messages + return "r2" + + out2 = ffp_chat.send(thread_id=tid, message="second", llm_call=cap) + assert out2["thread_id"] == tid + # Prior turns are replayed before the new user message. + roles = [m["role"] for m in captured["m"]] + assert roles[-3:] == ["user", "assistant", "user"] # first, r1, second + full = ffp_chat.get_thread(tid) + assert [m["content"] for m in full["history"]] == ["first", "r1", "second", "r2"] + + +def test_send_empty_message_rejected(): + with pytest.raises(ValueError, match="empty chat message"): + ffp_chat.send(message=" ", llm_call=lambda m: "x") + + +def test_send_with_notes_grounding_injects_context(monkeypatch): + # Stub the vault search so a grounding system message is injected. + monkeypatch.setattr( + ffp_chat, "retrieve_notes_context", + lambda q, max_notes=4: ("NOTES CONTEXT BLOCK", ["Note One", "Note Two"]), + ) + captured = {} + + def cap(messages): + captured["m"] = messages + return "grounded reply" + + out = ffp_chat.send(message="what did I save", use_notes=True, llm_call=cap) + assert out["notes_used"] == ["Note One", "Note Two"] + system_msgs = [m["content"] for m in captured["m"] if m["role"] == "system"] + assert "NOTES CONTEXT BLOCK" in system_msgs + + +def test_build_notes_context_message_empty_when_no_hits(): + msg, titles = ffp_chat.build_notes_context_message("q", lambda q, n: {"results": []}) + assert msg is None and titles == [] + + +# ---------- staged selection ----------------------------------------------------------- + +def test_stage_and_take_selection_read_and_clear(): + assert ffp_chat.stage_selection("hello world", "Outlook.exe") == {"ok": True, "chars": 11} + assert ffp_chat.STAGED_PATH.exists() + got = ffp_chat.take_staged() + assert got == {"text": "hello world", "source_app": "Outlook.exe"} + # Cleared after taking. + assert not ffp_chat.STAGED_PATH.exists() + assert ffp_chat.take_staged() == {"text": "", "source_app": ""} diff --git a/tests/test_ffp_daemon.py b/tests/test_ffp_daemon.py index 84c0311..83670eb 100644 --- a/tests/test_ffp_daemon.py +++ b/tests/test_ffp_daemon.py @@ -65,8 +65,13 @@ def test_actions_count_and_expected_names(daemon_module): # Late v1.4.0 added flm_update_check, bench_start/status/history, # note_search, pull_start/status; v1.5.0 removed model_stats; # v1.6 web dashboard added recent_history + notes_list + mode_ids -> 51; - # provider work added provider_status -> 52; model_recommendations -> 53. - assert len(daemon_module.ACTIONS) == 53 + # provider work added provider_status -> 52; model_recommendations -> 53; + # web chat backend added chat_threads_list/chat_thread_get/chat_send/ + # chat_thread_delete/chat_stage_selection/chat_take_staged -> 59. + assert len(daemon_module.ACTIONS) == 59 + for a in ("chat_threads_list", "chat_thread_get", "chat_send", + "chat_thread_delete", "chat_stage_selection", "chat_take_staged"): + assert a in daemon_module.ACTIONS assert "model_recommendations" in daemon_module.ACTIONS assert "recent_history" in daemon_module.ACTIONS assert "notes_list" in daemon_module.ACTIONS