Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 131 additions & 33 deletions src/graphstore/bonsai_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,23 @@ def _scrape_belief_updates(

@dataclass
class ParsedTurn:
"""Parsed structured output of one @-verb LLM call."""
"""Parsed structured output of one @-verb LLM call.

entities holds (slug, surface_name) tuples - the slug is the bare
identifier the model emitted (without the legacy ``ent:`` prefix),
surface_name is the human-readable form ("Alice", "OpenAI"). The
synthesizer turns each into a mention node + a refers_to edge to a
resolved entity (see ``graphstore.entity_resolver``).

entity_edges holds (from_slug, to_slug, kind) tuples for @EDGE
output between two entity slugs. Synthesizer maps each slug to the
resolved entity_id and emits the edge between entity nodes.
Edge handlers append here instead of `statements` so the slug ->
entity_id rewrite happens after resolver runs.
"""

entities: list[tuple[str, str]] = field(default_factory=list)
entity_edges: list[tuple[str, str, str]] = field(default_factory=list)
beliefs: list[tuple[str, str]] = field(default_factory=list)
retracts: list[str] = field(default_factory=list)
statements: list[str] = field(default_factory=list)
Expand All @@ -265,14 +279,20 @@ def _dsl_escape(s: str) -> str:


def _h_upsert(turn: ParsedTurn, ln: str) -> None:
"""U <slug> <Name ...>"""
"""U <slug> <Name ...>

Stores (slug, surface_name). The synthesizer turns this into a
mention node + entity node (via the resolver) + refers_to edge.
Slug is stored bare; legacy ``ent:`` prefix on input is stripped
so the model can emit either form.
"""
parts = ln.split(None, 2)
if len(parts) < 3:
return
slug = parts[1].removeprefix("ent:").strip('"')
name = parts[2].strip().strip('"')
if slug and name:
turn.entities.append((f"ent:{slug}", name))
turn.entities.append((slug, name))


def _h_fact(turn: ParsedTurn, ln: str) -> None:
Expand All @@ -297,18 +317,27 @@ def _h_drop(turn: ParsedTurn, ln: str) -> None:


def _h_edge(turn: ParsedTurn, ln: str) -> None:
"""E <from_id> <to_id> <kind> (ids include ent:/fact: prefix)"""
"""E <from_id> <to_id> <kind> (ids may include legacy ent:/fact: prefix)

Pushes (from_slug, to_slug, kind) to ``turn.entity_edges`` so the
synthesizer can map each slug to the resolved entity_id before
materializing the edge. The legacy ``ent:`` prefix is stripped so
the slug can be looked up in the per-turn slug -> entity map. Bare
fact:/msg: ids are passed through unchanged - they reference nodes
that already exist by literal id.
"""
parts = ln.split()
if len(parts) < 4:
return
from_id = parts[1].strip('"')
to_id = parts[2].strip('"')
kind = parts[3].strip('"')
if from_id and to_id and kind:
turn.statements.append(
f'CREATE EDGE "{_dsl_escape(from_id)}" -> "{_dsl_escape(to_id)}" '
f'kind = "{_dsl_escape(kind)}"'
)
if not (from_id and to_id and kind):
return
# Strip legacy "ent:" prefix; remap to entity ids during synthesis.
from_slug = from_id.removeprefix("ent:")
to_slug = to_id.removeprefix("ent:")
turn.entity_edges.append((from_slug, to_slug, kind))


def _h_query(template: str):
Expand Down Expand Up @@ -903,47 +932,87 @@ def _synthesize_dsl(
session_id: str,
role: str,
text: str,
gs: Any | None = None,
) -> list[str]:
"""Build the full DSL statement list from the parsed compact output.

Deterministic. Emits in order:
1. CREATE NODE for the message (DOCUMENT = user text).
2. UPSERT NODE per entity + matching CREATE EDGE kind = "mentions".
Entities are deduped by id (first wins).
3. RETRACT per retract (before any ASSERT).
4. ASSERT per belief.
5. Pre-rendered statements (edges, queries, walks, sys ops) verbatim.
"""
"""Render parsed @-verbs to DSL. ``gs`` enables resolver lookup;
when None (dry-run, tests) every mention mints a fresh entity."""
from graphstore.entity_resolver import (
EDGE_REFERS_TO, KIND_ENTITY, KIND_MENTION,
make_entity_id, make_mention_id, resolve_mention,
)

out: list[str] = []
text_esc = _dsl_escape(text)
session_esc = _dsl_escape(session_id)
role_esc = _dsl_escape(role)
msg_esc = _dsl_escape(msg_id)

# Store the text in both `content` (for baseline retrieval adapters that
# scan the node column) and `DOCUMENT` (for the vector + BM25 pipeline).
# The two-liner duplication costs ~1 string interning but makes Bonsai-
# ingested messages directly comparable to the deterministic NER adapter.
# `content` mirrors `DOCUMENT` so adapters that scan typed columns
# (deterministic NER baseline) compare apples-to-apples with the
# vector + BM25 pipeline that reads `DOCUMENT`.
out.append(
f'CREATE NODE "{msg_esc}" kind = "message" '
f'session = "{session_esc}" role = "{role_esc}" '
f'content = "{text_esc}" '
f'DOCUMENT "{text_esc}"'
)

ordered_ents: list[str] = []
seen_ents: set[str] = set()
for ent_id, name in turn.entities:
if ent_id in seen_ents:
slug_to_entity: dict[str, str] = {}

seen_slugs: set[str] = set()
for occurrence, (slug, name) in enumerate(turn.entities):
if slug in seen_slugs:
continue
seen_ents.add(ent_id)
ordered_ents.append(ent_id)
seen_slugs.add(slug)

mention_id = make_mention_id(msg_id, slug, occurrence)
mention_esc = _dsl_escape(mention_id)
name_esc = _dsl_escape(name)

if gs is not None:
try:
resolved = resolve_mention(gs, surface_name=name, context=text)
entity_id = resolved.entity_id
is_new = resolved.is_new_entity
confidence = resolved.confidence
except Exception as e: # pragma: no cover
_log.warning("entity_resolver failed for %r: %s", name, e)
entity_id = make_entity_id()
is_new = True
confidence = 1.0
else:
entity_id = make_entity_id()
is_new = True
confidence = 1.0

slug_to_entity[slug] = entity_id
entity_esc = _dsl_escape(entity_id)

out.append(
f'CREATE NODE "{mention_esc}" kind = "{KIND_MENTION}" '
f'surface_name = "{name_esc}" source_msg = "{msg_esc}" '
f'session = "{session_esc}" '
f'DOCUMENT "{name_esc} | {text_esc}"'
)

if is_new:
out.append(
f'CREATE NODE "{entity_esc}" kind = "{KIND_ENTITY}" '
f'canonical_name = "{name_esc}" mention_count = 1 '
f'context = "{text_esc}" '
f'DOCUMENT "{name_esc} | {text_esc}"'
)
else:
out.append(
f'INCREMENT NODE "{entity_esc}" mention_count BY 1'
)

out.append(
f'UPSERT NODE "{_dsl_escape(ent_id)}" kind = "entity" name = "{_dsl_escape(name)}"'
f'CREATE EDGE "{mention_esc}" -> "{entity_esc}" '
f'kind = "{EDGE_REFERS_TO}" confidence = {confidence:.3f}'
)
for ent_id in ordered_ents:
out.append(
f'CREATE EDGE "{msg_esc}" -> "{_dsl_escape(ent_id)}" kind = "mentions"'
f'CREATE EDGE "{msg_esc}" -> "{mention_esc}" kind = "mentions"'
)

for fact_id in turn.retracts:
Expand All @@ -957,11 +1026,39 @@ def _synthesize_dsl(
f'value = "{_dsl_escape(value)}" CONFIDENCE 0.9 SOURCE "{msg_esc}"'
)

out.extend(turn.statements)
for from_slug, to_slug, edge_kind in turn.entity_edges:
from_eid = slug_to_entity.get(from_slug)
to_eid = slug_to_entity.get(to_slug)
if not (from_eid and to_eid):
_log.warning(
"bonsai: dropping @EDGE %r -> %r (slug not in turn map)",
from_slug, to_slug,
)
continue
out.append(
f'CREATE EDGE "{_dsl_escape(from_eid)}" -> "{_dsl_escape(to_eid)}" '
f'kind = "{_dsl_escape(edge_kind)}"'
)

# Retrieval verbs (@RECALL/@PATH/@ANCESTORS/...) emit literal
# `ent:slug` ids. After the refactor those nodes do not exist;
# rewrite each reference to the matching entity_id from the turn
# map, falling back to the literal slug when unknown so debug
# output remains traceable.
for stmt in turn.statements:
out.append(_rewrite_ent_refs(stmt, slug_to_entity))

return out


def _rewrite_ent_refs(stmt: str, slug_to_entity: dict[str, str]) -> str:
def _sub(match: re.Match) -> str:
slug = match.group(1)
eid = slug_to_entity.get(slug)
return f'"{eid}"' if eid else f'"ent:{slug}"'
return _ENT_FROM_ID_RE.sub(_sub, stmt)


def _render_known_facts_block(facts: dict[str, FactState], max_facts: int = 40) -> str:
"""Format non-retracted facts into a block the LLM reads before the input.

Expand Down Expand Up @@ -1397,6 +1494,7 @@ def _ingest_locked(
turn = _parse_verb_output(cleaned)
deduped = _synthesize_dsl(
turn, msg_id=msg_id, session_id=session_id, role=role, text=text,
gs=None if dry_run else self._gs,
)
dup_dropped: list[tuple[str, str]] = []

Expand Down
Loading
Loading