diff --git a/CHANGELOG.md b/CHANGELOG.md index c17766f..3d773ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,35 @@ Format follows [Keep a Changelog](https://keepachangelog.com/). ## [Unreleased] +### Added — Workspace Bandwidth Limit Phase 1 (top-K-per-epoch) + +Closes the second half of the May 15 audit's workspace partial entry. +Thalamus mode-broadcast closed the first half (org_state coupling); +this closes the bandwidth-limit half. Biology: Dehaene-Changeux global +neuronal workspace has a hard ~4-chunk ignition bandwidth. + +- **Migration 072** — 2 tables (`workspace_bandwidth_state` single + row + `workspace_bandwidth_epochs` history). Defaults: limit=4, + epoch_duration=60s, enforcement_mode='shadow'. +- **`agentmemory.mcp_tools_workspace_bandwidth`** — 4 MCP tools: + `workspace_bandwidth_status`, `workspace_bandwidth_set`, + `workspace_bandwidth_admit`, `workspace_bandwidth_epochs_history`. +- **Shadow / enforce / disabled modes.** In shadow, all admits go + through and `saturated=True` is informational. In enforce, admits + past the limit are rejected with `rejected=True`. In disabled, + behaves like shadow. +- **Auto-rotation.** `workspace_bandwidth_status` and `_admit` both + check epoch expiry and roll the live counters into the historical + table. +- **9 tests** covering defaults, status, admit-increment, saturation + detection, enforce-mode rejection, set validation, rotation, + history filters. + +Phase 2 wires `workspace_bandwidth_admit` into `workspace_ingest` so +every real broadcast goes through the limit. Phase 3 lets ARAS +mode-modulate the bandwidth (high arousal → wider; consolidation → +narrower). Phase 4 enforces. + ### Added — issue #116 Phase 1-A: retrieval pathway log External architecture memo (issue #116, "Thalamus, Basal Ganglia, and diff --git a/db/migrations/072_workspace_bandwidth.sql b/db/migrations/072_workspace_bandwidth.sql new file mode 100644 index 0000000..8c2d264 --- /dev/null +++ b/db/migrations/072_workspace_bandwidth.sql @@ -0,0 +1,63 @@ +-- Migration 072: workspace bandwidth limit — Phase 1 schema +-- +-- The May 15 brain_region_coverage.md audit flagged the workspace +-- (global neuronal workspace) as partial: "Fixed salience threshold, +-- no org_state coupling, no enforced bandwidth limit (any module can +-- write)." +-- +-- The thalamus mode-broadcast layer (shipped via thalamus Phase 2) +-- closed the org_state coupling half. This migration closes the +-- remaining half: a top-K-per-epoch bandwidth limit on workspace +-- broadcasts. +-- +-- Biology: the global neuronal workspace (Dehaene-Changeux model) has +-- a hard bandwidth — only ~4 chunks can be "ignited" at a time. Without +-- that constraint, the workspace degenerates into a firehose. brainctl's +-- current workspace_broadcasts table has no such limit; any module can +-- write any time. Phase 1 adds the *bookkeeping*; Phase 2 enforces. +-- +-- Schema: +-- workspace_bandwidth_state — single row, current epoch + count +-- workspace_bandwidth_epochs — historical log of completed epochs +-- +-- Phase 1 is inspection-only / additive. Phase 2 wires the limit +-- into workspace_ingest. Phase 3 lets the limit be context-modulated +-- (high arousal = wider bandwidth; consolidation = narrower). +-- +-- Rollback: +-- DROP TABLE IF EXISTS workspace_bandwidth_epochs; +-- DROP TABLE IF EXISTS workspace_bandwidth_state; +-- DELETE FROM schema_version WHERE version = 72; +-- +-- IDEMPOTENT. + +CREATE TABLE IF NOT EXISTS workspace_bandwidth_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + epoch_started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%S', 'now')), + epoch_duration_seconds INTEGER NOT NULL DEFAULT 60 CHECK(epoch_duration_seconds > 0), + epoch_count INTEGER NOT NULL DEFAULT 0, -- broadcasts in the current epoch + bandwidth_limit INTEGER NOT NULL DEFAULT 4 CHECK(bandwidth_limit > 0), + total_admits INTEGER NOT NULL DEFAULT 0, + total_rejects INTEGER NOT NULL DEFAULT 0, + enforcement_mode TEXT NOT NULL DEFAULT 'shadow' CHECK(enforcement_mode IN ('shadow', 'enforce', 'disabled')), + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%S', 'now')) +); +INSERT OR IGNORE INTO workspace_bandwidth_state (id) VALUES (1); + +CREATE TABLE IF NOT EXISTS workspace_bandwidth_epochs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + epoch_started_at TEXT NOT NULL, + epoch_ended_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%S', 'now')), + duration_seconds INTEGER NOT NULL, + admitted_count INTEGER NOT NULL DEFAULT 0, + rejected_count INTEGER NOT NULL DEFAULT 0, + bandwidth_limit INTEGER NOT NULL, + enforcement_mode TEXT NOT NULL, + saturation REAL NOT NULL DEFAULT 0.0 -- admitted_count / bandwidth_limit +); +CREATE INDEX IF NOT EXISTS idx_wbe_recent ON workspace_bandwidth_epochs(epoch_ended_at); +CREATE INDEX IF NOT EXISTS idx_wbe_saturated ON workspace_bandwidth_epochs(saturation); + +INSERT OR IGNORE INTO schema_version (version, description, applied_at) +VALUES (72, 'workspace bandwidth Phase 1: 2 tables (state + epochs) for top-K-per-epoch limit', + strftime('%Y-%m-%dT%H:%M:%S', 'now')); diff --git a/src/agentmemory/mcp_server.py b/src/agentmemory/mcp_server.py index 95e5f45..2302aee 100755 --- a/src/agentmemory/mcp_server.py +++ b/src/agentmemory/mcp_server.py @@ -78,6 +78,7 @@ mcp_tools_trust, mcp_tools_usage, mcp_tools_workspace, + mcp_tools_workspace_bandwidth, mcp_tools_world, ) _EXT_MODULES = [ @@ -121,6 +122,7 @@ mcp_tools_trust, mcp_tools_usage, mcp_tools_workspace, + mcp_tools_workspace_bandwidth, mcp_tools_world, ] except ImportError as _e: diff --git a/src/agentmemory/mcp_tools_workspace_bandwidth.py b/src/agentmemory/mcp_tools_workspace_bandwidth.py new file mode 100644 index 0000000..2698eed --- /dev/null +++ b/src/agentmemory/mcp_tools_workspace_bandwidth.py @@ -0,0 +1,311 @@ +"""brainctl MCP tools — workspace bandwidth limit. + +Phase 1: bookkeeping for top-K-per-epoch limit on workspace_broadcasts. +Closes the second half of the May 15 audit's workspace partial entry +(thalamus mode-broadcast closed the first half — org_state coupling). + +Phase 1 is inspection + state mgmt. Phase 2 wires the limit into +workspace_ingest. Phase 3 lets the limit be context-modulated. +""" +from __future__ import annotations + +import sqlite3 +from collections.abc import Iterable +from pathlib import Path +from typing import Any + +from mcp.types import Tool + +from agentmemory.lib.mcp_helpers import open_db +from agentmemory.paths import get_db_path + +DB_PATH: Path = get_db_path() + +VALID_ENFORCEMENT_MODES = {"shadow", "enforce", "disabled"} + + +def _db() -> sqlite3.Connection: + return open_db(str(DB_PATH)) + + +def _rows(rows: Iterable[sqlite3.Row]) -> list[dict[str, Any]]: + return [dict(r) for r in rows] + + +def _table_exists(conn: sqlite3.Connection, name: str) -> bool: + return bool( + conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (name,) + ).fetchone() + ) + + +def _require_schema(conn: sqlite3.Connection) -> str | None: + missing = [ + t for t in ("workspace_bandwidth_state", "workspace_bandwidth_epochs") + if not _table_exists(conn, t) + ] + if missing: + return ("workspace bandwidth schema missing: " + ", ".join(missing) + + ". Run `brainctl migrate` (migration 072).") + return None + + +def _rotate_if_due(conn: sqlite3.Connection) -> dict[str, Any]: + """Close out the current epoch if `epoch_duration_seconds` has elapsed. + + Inserts a row into workspace_bandwidth_epochs, resets the live + counters, returns the rotated-epoch metadata (or empty dict if no + rotation happened). + """ + state = conn.execute("SELECT * FROM workspace_bandwidth_state WHERE id = 1").fetchone() + if not state: + return {} + rot = conn.execute( + """ + SELECT (julianday('now') * 86400 - julianday(?) * 86400) AS elapsed_s + """, + (state["epoch_started_at"],), + ).fetchone() + elapsed = float(rot["elapsed_s"]) if rot and rot["elapsed_s"] is not None else 0.0 + duration = int(state["epoch_duration_seconds"]) + if elapsed < duration: + return {} + admitted = int(state["epoch_count"]) + bandwidth = int(state["bandwidth_limit"]) + saturation = admitted / bandwidth if bandwidth else 0.0 + conn.execute( + """ + INSERT INTO workspace_bandwidth_epochs + (epoch_started_at, duration_seconds, admitted_count, rejected_count, + bandwidth_limit, enforcement_mode, saturation) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + (state["epoch_started_at"], duration, admitted, 0, + bandwidth, state["enforcement_mode"], saturation), + ) + conn.execute( + """ + UPDATE workspace_bandwidth_state SET + epoch_started_at = strftime('%Y-%m-%dT%H:%M:%S', 'now'), + epoch_count = 0, + updated_at = strftime('%Y-%m-%dT%H:%M:%S', 'now') + WHERE id = 1 + """ + ) + conn.commit() + return {"rotated": True, "admitted": admitted, "limit": bandwidth, "saturation": saturation} + + +def tool_workspace_bandwidth_status(**_kw: Any) -> dict[str, Any]: + with _db() as conn: + conn.row_factory = sqlite3.Row + err = _require_schema(conn) + if err: + return {"error": err} + rotated = _rotate_if_due(conn) + state = conn.execute("SELECT * FROM workspace_bandwidth_state WHERE id = 1").fetchone() + last_epochs = _rows(conn.execute( + "SELECT * FROM workspace_bandwidth_epochs ORDER BY id DESC LIMIT 5" + ).fetchall()) + agg = conn.execute( + """ + SELECT COUNT(*) AS n, COALESCE(AVG(saturation), 0.0) AS mean_sat, + COALESCE(MAX(saturation), 0.0) AS peak_sat, + SUM(admitted_count) AS sum_admitted, + SUM(rejected_count) AS sum_rejected + FROM workspace_bandwidth_epochs + WHERE epoch_ended_at >= datetime('now', '-1 hour') + """ + ).fetchone() + return { + "ok": True, + "state": dict(state) if state else None, + "last_5_epochs": last_epochs, + "last_hour_aggregate": dict(agg) if agg else {}, + "rotation": rotated, + } + + +def tool_workspace_bandwidth_set( + bandwidth_limit: int | None = None, + epoch_duration_seconds: int | None = None, + enforcement_mode: str | None = None, + **_kw: Any, +) -> dict[str, Any]: + """Update workspace bandwidth state. Each arg is optional — pass + only what you want to change. Returns the new state.""" + if bandwidth_limit is not None and bandwidth_limit <= 0: + return {"error": "bandwidth_limit must be > 0"} + if epoch_duration_seconds is not None and epoch_duration_seconds <= 0: + return {"error": "epoch_duration_seconds must be > 0"} + if enforcement_mode is not None and enforcement_mode not in VALID_ENFORCEMENT_MODES: + return {"error": f"invalid enforcement_mode {enforcement_mode!r}; " + f"expected one of {sorted(VALID_ENFORCEMENT_MODES)}"} + with _db() as conn: + conn.row_factory = sqlite3.Row + err = _require_schema(conn) + if err: + return {"error": err} + updates = [] + params: list[Any] = [] + if bandwidth_limit is not None: + updates.append("bandwidth_limit = ?"); params.append(int(bandwidth_limit)) + if epoch_duration_seconds is not None: + updates.append("epoch_duration_seconds = ?"); params.append(int(epoch_duration_seconds)) + if enforcement_mode is not None: + updates.append("enforcement_mode = ?"); params.append(enforcement_mode) + if not updates: + return {"error": "no arguments passed; nothing to update"} + updates.append("updated_at = strftime('%Y-%m-%dT%H:%M:%S', 'now')") + conn.execute( + f"UPDATE workspace_bandwidth_state SET {', '.join(updates)} WHERE id = 1", + tuple(params), + ) + conn.commit() + state = conn.execute("SELECT * FROM workspace_bandwidth_state WHERE id = 1").fetchone() + return {"ok": True, "state": dict(state) if state else None} + + +def tool_workspace_bandwidth_admit(**_kw: Any) -> dict[str, Any]: + """Record one workspace broadcast admit. Returns the post-increment + epoch_count and the would-have-been-rejected flag (`saturated=True` + when epoch_count exceeds bandwidth_limit; in `shadow` mode the admit + still goes through; in `enforce` mode the caller should refuse). + """ + with _db() as conn: + conn.row_factory = sqlite3.Row + err = _require_schema(conn) + if err: + return {"error": err} + _rotate_if_due(conn) + state = conn.execute("SELECT * FROM workspace_bandwidth_state WHERE id = 1").fetchone() + if not state: + return {"error": "workspace_bandwidth_state seed row missing"} + new_count = int(state["epoch_count"]) + 1 + saturated = new_count > int(state["bandwidth_limit"]) + if saturated and state["enforcement_mode"] == "enforce": + conn.execute( + """ + UPDATE workspace_bandwidth_state SET + total_rejects = total_rejects + 1, + updated_at = strftime('%Y-%m-%dT%H:%M:%S', 'now') + WHERE id = 1 + """ + ) + conn.commit() + return { + "ok": True, "admitted": False, "rejected": True, + "saturated": True, "epoch_count": int(state["epoch_count"]), + "bandwidth_limit": int(state["bandwidth_limit"]), + "enforcement_mode": state["enforcement_mode"], + } + conn.execute( + """ + UPDATE workspace_bandwidth_state SET + epoch_count = ?, + total_admits = total_admits + 1, + updated_at = strftime('%Y-%m-%dT%H:%M:%S', 'now') + WHERE id = 1 + """, + (new_count,), + ) + conn.commit() + return { + "ok": True, "admitted": True, "rejected": False, + "saturated": saturated, + "epoch_count": new_count, + "bandwidth_limit": int(state["bandwidth_limit"]), + "enforcement_mode": state["enforcement_mode"], + } + + +def tool_workspace_bandwidth_epochs_history( + limit: int = 50, since: str | None = None, + min_saturation: float | None = None, + **_kw: Any, +) -> dict[str, Any]: + limit = max(1, min(int(limit), 500)) + with _db() as conn: + conn.row_factory = sqlite3.Row + err = _require_schema(conn) + if err: + return {"error": err} + clauses, params = [], [] + if since: + clauses.append("epoch_ended_at >= ?"); params.append(since) + if min_saturation is not None: + clauses.append("saturation >= ?"); params.append(float(min_saturation)) + where = "WHERE " + " AND ".join(clauses) if clauses else "" + rows = conn.execute( + f"SELECT * FROM workspace_bandwidth_epochs {where} ORDER BY id DESC LIMIT ?", + (*params, limit), + ).fetchall() + return {"ok": True, "epochs": _rows(rows)} + + +TOOLS: list[Tool] = [ + Tool( + name="workspace_bandwidth_status", + description=( + "Workspace bandwidth Phase 1 inspection. Current state (epoch + counts + " + "enforcement_mode) + last 5 completed epochs + 1h aggregate. Side-effect: " + "rotates the epoch if the current one has expired." + ), + inputSchema={"type": "object", "properties": {}}, + ), + Tool( + name="workspace_bandwidth_set", + description=( + "Update workspace bandwidth state. enforcement_mode ∈ {shadow, enforce, " + "disabled}. Each arg is optional; pass only what you want to change." + ), + inputSchema={ + "type": "object", + "properties": { + "bandwidth_limit": {"type": "integer"}, + "epoch_duration_seconds": {"type": "integer"}, + "enforcement_mode": {"type": "string", "enum": sorted(VALID_ENFORCEMENT_MODES)}, + }, + }, + ), + Tool( + name="workspace_bandwidth_admit", + description=( + "Record one workspace broadcast admit. Returns admitted/rejected status. " + "In `shadow` mode: always admits, returns saturated=True if over limit. " + "In `enforce` mode: admits up to bandwidth_limit per epoch, then rejects. " + "In `disabled` mode: behaves like shadow (logs only)." + ), + inputSchema={"type": "object", "properties": {}}, + ), + Tool( + name="workspace_bandwidth_epochs_history", + description="Paginated completed-epoch history. Filters: since, min_saturation. limit clamped to [1, 500].", + inputSchema={ + "type": "object", + "properties": { + "limit": {"type": "integer", "default": 50}, + "since": {"type": "string"}, + "min_saturation": {"type": "number"}, + }, + }, + ), +] + + +_WB_TOOLS = { + "workspace_bandwidth_status": tool_workspace_bandwidth_status, + "workspace_bandwidth_set": tool_workspace_bandwidth_set, + "workspace_bandwidth_admit": tool_workspace_bandwidth_admit, + "workspace_bandwidth_epochs_history": tool_workspace_bandwidth_epochs_history, +} + +DISPATCH: dict[str, Any] = { + name: (lambda _func=func, **kw: _func(**kw)) + for name, func in _WB_TOOLS.items() +} + + +def register_tools() -> tuple[list[Tool], dict[str, Any]]: + return TOOLS, DISPATCH diff --git a/tests/test_mcp_tools_workspace_bandwidth.py b/tests/test_mcp_tools_workspace_bandwidth.py new file mode 100644 index 0000000..f87472f --- /dev/null +++ b/tests/test_mcp_tools_workspace_bandwidth.py @@ -0,0 +1,140 @@ +"""Tests for mcp_tools_workspace_bandwidth — Phase 1.""" +from __future__ import annotations + +import sqlite3 +import time +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +MIGRATION_072 = REPO_ROOT / "db" / "migrations" / "072_workspace_bandwidth.sql" + + +def _bootstrap(conn): + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER PRIMARY KEY, + description TEXT, + applied_at TEXT + ); + """ + ) + + +def _apply(db_path): + conn = sqlite3.connect(str(db_path)) + try: + _bootstrap(conn) + conn.executescript(MIGRATION_072.read_text()) + conn.commit() + finally: + conn.close() + + +def _make_db(tmp_path, monkeypatch): + db = tmp_path / "brain.db" + _apply(db) + from agentmemory import mcp_tools_workspace_bandwidth as mod + monkeypatch.setattr(mod, "DB_PATH", db) + return mod + + +def test_migration_applies_with_defaults(tmp_path): + db = tmp_path / "brain.db" + _apply(db) + conn = sqlite3.connect(str(db)) + try: + state = conn.execute( + "SELECT bandwidth_limit, enforcement_mode, epoch_count FROM workspace_bandwidth_state" + ).fetchone() + assert state == (4, "shadow", 0) + sv = conn.execute("SELECT version FROM schema_version WHERE version=72").fetchone() + assert sv == (72,) + finally: + conn.close() + + +def test_status_returns_state(tmp_path, monkeypatch): + mod = _make_db(tmp_path, monkeypatch) + out = mod.tool_workspace_bandwidth_status() + assert out["ok"] is True + assert out["state"]["bandwidth_limit"] == 4 + assert out["state"]["enforcement_mode"] == "shadow" + assert out["last_5_epochs"] == [] + + +def test_admit_increments_counter(tmp_path, monkeypatch): + mod = _make_db(tmp_path, monkeypatch) + for i in range(3): + out = mod.tool_workspace_bandwidth_admit() + assert out["ok"] is True + assert out["admitted"] is True + assert out["epoch_count"] == i + 1 + assert out["saturated"] is False + + +def test_admit_saturates_above_limit_shadow_mode(tmp_path, monkeypatch): + mod = _make_db(tmp_path, monkeypatch) + for _ in range(4): + mod.tool_workspace_bandwidth_admit() + # 5th admit: still admitted in shadow mode but `saturated=True` + out = mod.tool_workspace_bandwidth_admit() + assert out["admitted"] is True + assert out["saturated"] is True + + +def test_admit_rejects_in_enforce_mode(tmp_path, monkeypatch): + mod = _make_db(tmp_path, monkeypatch) + mod.tool_workspace_bandwidth_set(enforcement_mode="enforce") + for _ in range(4): + admitted = mod.tool_workspace_bandwidth_admit() + assert admitted["admitted"] is True + rejected = mod.tool_workspace_bandwidth_admit() + assert rejected["admitted"] is False + assert rejected["rejected"] is True + + +def test_set_updates_state(tmp_path, monkeypatch): + mod = _make_db(tmp_path, monkeypatch) + out = mod.tool_workspace_bandwidth_set( + bandwidth_limit=10, epoch_duration_seconds=30, enforcement_mode="enforce" + ) + assert out["ok"] is True + assert out["state"]["bandwidth_limit"] == 10 + assert out["state"]["epoch_duration_seconds"] == 30 + assert out["state"]["enforcement_mode"] == "enforce" + + +def test_set_validates(tmp_path, monkeypatch): + mod = _make_db(tmp_path, monkeypatch) + assert "error" in mod.tool_workspace_bandwidth_set(bandwidth_limit=0) + assert "error" in mod.tool_workspace_bandwidth_set(epoch_duration_seconds=-1) + assert "error" in mod.tool_workspace_bandwidth_set(enforcement_mode="bogus") + assert "error" in mod.tool_workspace_bandwidth_set() # nothing passed + + +def test_rotation_creates_epoch_row(tmp_path, monkeypatch): + mod = _make_db(tmp_path, monkeypatch) + # Set duration to 1s, admit twice, sleep 2s, status should rotate. + mod.tool_workspace_bandwidth_set(epoch_duration_seconds=1) + mod.tool_workspace_bandwidth_admit() + mod.tool_workspace_bandwidth_admit() + time.sleep(1.2) + out = mod.tool_workspace_bandwidth_status() + assert out["rotation"].get("rotated") is True + assert out["rotation"]["admitted"] == 2 + history = mod.tool_workspace_bandwidth_epochs_history(limit=10) + assert len(history["epochs"]) == 1 + assert history["epochs"][0]["admitted_count"] == 2 + + +def test_history_filters(tmp_path, monkeypatch): + mod = _make_db(tmp_path, monkeypatch) + mod.tool_workspace_bandwidth_set(epoch_duration_seconds=1, bandwidth_limit=2) + for _ in range(3): + mod.tool_workspace_bandwidth_admit() + time.sleep(1.2) + mod.tool_workspace_bandwidth_status() # rotate + high_sat = mod.tool_workspace_bandwidth_epochs_history(limit=10, min_saturation=1.0) + assert len(high_sat["epochs"]) == 1 + assert high_sat["epochs"][0]["saturation"] >= 1.0