Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/batch_scraping_protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,22 @@ the query pool instead of re-issuing the same head every run — maximising fres
coverage per dollar. Implementation: `select_run_queries` (queries.py) +
`query_last_run_at` (engine_checkpoint.py).

### Yield-weighted prioritization

Within a budget cap the **highest-yield query texts go first** — keywords/terms
that have actually produced index-relevant records win the budget over noisy
ones. Yield is measured along the record -> candidate -> query chain (a relevant
record's `event_candidate_ids` -> each candidate's `discovery_queries`) and
cached:

```bash
denbust query-yield --config <cfg> # compute + cache query_yield.json, print top keywords
```

The budget cap then orders queries by **(yield, kind priority, recency)**, so
proven enforcement-law terms are refreshed before low-yield generic keywords.
Implementation: `src/denbust/discovery/query_yield.py`.

## Outputs of each batch

Every batch run should report:
Expand Down
46 changes: 46 additions & 0 deletions src/denbust/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,52 @@ def search_budget(
typer.echo(f" {engine:<11} {queries:>6} queries ${usd:>7.3f}{cap}")


@app.command("query-yield")
def query_yield(
config: Annotated[
Path | None,
typer.Option("--config", "-c", help="Path to YAML config file"),
] = None,
top: Annotated[
int,
typer.Option("--top", help="How many top-yield query texts to print."),
] = 20,
) -> None:
"""Compute and cache per-keyword yield (index-relevant records discovered).

Walks the record → candidate → query chain, writes ``query_yield.json``, and
prints the highest-yield query texts. Discovery then spends its query budget
on proven keywords first. See ``docs/batch_scraping_protocol.md``.
"""
from denbust.config import load_config
from denbust.discovery.query_yield import QueryYieldStore, compute_query_yield
from denbust.discovery.storage import create_discovery_persistence
from denbust.ops.factory import create_operational_store

cfg = load_config(config or Path("agents/news/local.yaml"))
store = create_operational_store(cfg)
persistence = create_discovery_persistence(cfg)
try:
rows = store.fetch_records(cfg.dataset_name)
candidate_queries = {
candidate.candidate_id: candidate.discovery_queries
for candidate in persistence.list_candidates()
}
finally:
persistence.close()
store.close()

yield_map = compute_query_yield(rows, candidate_queries)
QueryYieldStore(cfg.discovery_state_paths.query_yield_path).save(yield_map)

relevant = sum(1 for r in rows if r.get("index_relevant"))
typer.echo(f"Query yield from {relevant} index-relevant record(s):")
for text, score in sorted(yield_map.items(), key=lambda kv: (-kv[1], kv[0]))[:top]:
typer.echo(f" {score:>3} {text}")
if not yield_map:
typer.echo(" (no yielding queries yet)")


@app.command()
def version() -> None:
"""Show version information."""
Expand Down
24 changes: 14 additions & 10 deletions src/denbust/discovery/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,25 +116,28 @@ def select_run_queries(
max_queries: int | None,
*,
last_run_at: Callable[[DiscoveryQuery], datetime | None] | None = None,
yield_of: Callable[[DiscoveryQuery], int] | None = None,
) -> list[DiscoveryQuery]:
"""Cap *queries* to *max_queries*, highest-priority kinds first.
"""Cap *queries* to *max_queries*, best queries first.

When *last_run_at* is supplied, ties within a priority tier are broken by
**least-recently-run first** (never-run queries first, then oldest), so a
budget-capped run refreshes a different slice of the pool each time
(cross-run rotation) instead of re-issuing the same head every run.
Ordering, in priority: highest historical **yield** first (when *yield_of*
is supplied — query texts that have produced index-relevant records), then
highest-priority query **kind** (open-web broad/taxonomy), then
**least-recently-run** (when *last_run_at* is supplied — cross-run rotation),
then original order.
"""
if max_queries is None or len(queries) <= max_queries:
return queries

def sort_key(pair: tuple[int, DiscoveryQuery]) -> tuple[object, ...]:
index, query = pair
yield_rank = -(yield_of(query) if yield_of is not None else 0)
priority = _QUERY_KIND_PRIORITY.get(query.query_kind, 99)
if last_run_at is None:
return (priority, index)
return (yield_rank, priority, index)
ran_at = last_run_at(query)
recency = (0, 0.0) if ran_at is None else (1, ran_at.timestamp())
return (priority, recency, index)
return (yield_rank, priority, recency, index)

ordered = sorted(enumerate(queries), key=sort_key)
return [query for _, query in ordered[:max_queries]]
Expand Down Expand Up @@ -165,14 +168,15 @@ def build_discovery_queries(
now: datetime | None = None,
max_queries: int | None = None,
last_run_at: Callable[[DiscoveryQuery], datetime | None] | None = None,
yield_of: Callable[[DiscoveryQuery], int] | None = None,
) -> list[DiscoveryQuery]:
"""Build normalized discovery queries for enabled discovery engines.

Source-targeted queries cover only ``source_targeted_search_domains`` —
natively-crawled and blocklisted domains are dropped to save search budget.
When *max_queries* (or ``config.discovery.max_queries_per_run``) is set, the
result is capped to that many queries, keeping the highest-priority kinds;
*last_run_at* additionally rotates within a tier (least-recently-run first).
result is capped — highest historical *yield_of* first, then kind priority,
then *last_run_at* rotation.
"""
keywords = _normalize_keywords(config.keywords)
taxonomy_enabled = DiscoveryQueryKind.TAXONOMY_TARGETED in config.discovery.default_query_kinds
Expand Down Expand Up @@ -285,4 +289,4 @@ def build_discovery_queries(
seen_keys.add(taxonomy_source_key)

budget = max_queries if max_queries is not None else config.discovery.max_queries_per_run
return select_run_queries(queries, budget, last_run_at=last_run_at)
return select_run_queries(queries, budget, last_run_at=last_run_at, yield_of=yield_of)
72 changes: 72 additions & 0 deletions src/denbust/discovery/query_yield.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Yield-weighted query prioritization.

Not all search keywords are equal: ``סחר בבני אדם`` (human trafficking) and
``בית בושת`` (brothel) have produced verified enforcement records, while others
mostly surface noise. This module measures, per query text, how many
index-relevant records that query actually contributed to discovering, and
caches the map so the budget cap can spend on proven keywords first.

The signal chain is record → candidate → query:

* an index-relevant operational record lists the ``event_candidate_ids`` it was
built from;
* each candidate records the ``discovery_queries`` (keyword/term texts) that
found it;
* so each query text that fed an index-relevant record earns one yield point.

``compute_query_yield`` is pure (takes plain inputs) for testability;
``QueryYieldStore`` caches the result as JSON under the discovery state dir.
"""

from __future__ import annotations

import json
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from pathlib import Path
from typing import Any


def compute_query_yield(
records: Iterable[Mapping[str, Any]],
candidate_queries: Mapping[str, Sequence[str]],
) -> dict[str, int]:
"""Return ``{query_text: index_relevant_record_count}``.

*records* are operational rows (dicts) with ``index_relevant`` and
``event_candidate_ids``. *candidate_queries* maps candidate id → the query
texts that discovered it. A query text earns one point per distinct
index-relevant record any of its candidates contributed to.
"""
yield_map: dict[str, int] = defaultdict(int)
for record in records:
if not record.get("index_relevant"):
continue
texts: set[str] = set()
for candidate_id in record.get("event_candidate_ids") or []:
for query_text in candidate_queries.get(candidate_id, ()):
if query_text:
texts.add(query_text)
for query_text in texts:
yield_map[query_text] += 1
return dict(yield_map)


class QueryYieldStore:
"""JSON cache of ``{query_text: yield}`` under the discovery state dir."""

def __init__(self, path: Path) -> None:
self.path = path

def load(self) -> dict[str, int]:
if not self.path.exists():
return {}
data = json.loads(self.path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {}
return {str(k): int(v) for k, v in data.items()}
Comment on lines +61 to +67

def save(self, yield_map: Mapping[str, int]) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
ordered = dict(sorted(yield_map.items(), key=lambda kv: (-kv[1], kv[0])))
self.path.write_text(json.dumps(ordered, ensure_ascii=False, indent=2), encoding="utf-8")
2 changes: 2 additions & 0 deletions src/denbust/discovery/state_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class DiscoveryStatePaths(BaseModel):
engine_query_cache_dir: Path
domain_verdicts_path: Path
search_budget_path: Path
query_yield_path: Path


def resolve_discovery_state_paths(
Expand All @@ -65,6 +66,7 @@ def resolve_discovery_state_paths(
engine_query_cache_dir=candidates_dir / "engine_query_cache",
domain_verdicts_path=candidates_dir / "domain_verdicts.jsonl",
search_budget_path=candidates_dir / "search_budget.jsonl",
query_yield_path=candidates_dir / "query_yield.json",
latest_candidates_path=candidates_dir / "latest_candidates.jsonl",
latest_backfill_batches_path=backfill_batches_dir / "latest_backfill_batches.jsonl",
retry_queue_path=candidates_dir / "retry_queue.jsonl",
Expand Down
46 changes: 38 additions & 8 deletions src/denbust/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import sys
from collections import defaultdict
from collections.abc import Mapping
from collections.abc import Callable, Mapping
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, TypedDict
Expand Down Expand Up @@ -72,6 +72,7 @@
PersistentCandidate,
)
from denbust.discovery.queries import build_discovery_queries, select_run_queries
from denbust.discovery.query_yield import QueryYieldStore
from denbust.discovery.scrape_queue import (
SCRAPEABLE_CANDIDATE_STATUSES,
CandidateScrapeBatch,
Expand Down Expand Up @@ -972,15 +973,28 @@ async def _run_source_native_discovery(
_DISCOVERY_PERSIST_BATCH: int = 50


def _query_yield_callback(config: Config) -> Callable[[DiscoveryQuery], int] | None:
"""Return a yield lookup from the cached query-yield map, or None if empty."""
yield_map = QueryYieldStore(config.discovery_state_paths.query_yield_path).load()
if not yield_map:
return None
return lambda query: yield_map.get(query.query_text, 0)


def _guard_search_budget(
config: Config, *, engine: str, queries: list[DiscoveryQuery], cache_dir: Path
config: Config,
*,
engine: str,
queries: list[DiscoveryQuery],
cache_dir: Path,
yield_of: Callable[[DiscoveryQuery], int] | None = None,
) -> list[DiscoveryQuery]:
"""Cap *queries* to what the engine's remaining monthly budget can afford.

No-op when the engine has no ``monthly_budget_usd``. When the budget is
partly spent, keeps the highest-priority queries that fit (avoids the 402),
rotating within a tier by least-recently-run so successive capped runs
refresh different slices of the pool.
partly spent, keeps the best queries that fit (highest yield, then kind
priority, then least-recently-run rotation) so the spend goes to the most
productive queries and successive runs refresh different slices.
"""
engine_cfg = getattr(config.discovery.engines, engine, None)
budget = getattr(engine_cfg, "monthly_budget_usd", None)
Expand Down Expand Up @@ -1008,6 +1022,7 @@ def _guard_search_budget(
queries,
affordable,
last_run_at=lambda query: query_last_run_at(cache_dir, engine, query),
yield_of=yield_of,
)
return queries

Expand All @@ -1029,13 +1044,18 @@ async def _run_brave_discovery(
) -> PersistedSourceDiscovery:
"""Run Brave-powered discovery with per-query checkpointing and incremental persist."""
cache_dir = config.discovery_state_paths.engine_query_cache_dir
yield_of = _query_yield_callback(config)
queries = _guard_search_budget(
config,
engine="brave",
queries=build_discovery_queries(
config, days=days, last_run_at=lambda q: query_last_run_at(cache_dir, "brave", q)
config,
days=days,
last_run_at=lambda q: query_last_run_at(cache_dir, "brave", q),
yield_of=yield_of,
),
cache_dir=cache_dir,
yield_of=yield_of,
)
discovery_run = DiscoveryRun(
run_id=run_id,
Expand Down Expand Up @@ -1122,13 +1142,18 @@ async def _run_exa_discovery(
) -> PersistedSourceDiscovery:
"""Run Exa-powered discovery with per-query checkpointing and incremental persist."""
cache_dir = config.discovery_state_paths.engine_query_cache_dir
yield_of = _query_yield_callback(config)
queries = _guard_search_budget(
config,
engine="exa",
queries=build_discovery_queries(
config, days=days, last_run_at=lambda q: query_last_run_at(cache_dir, "exa", q)
config,
days=days,
last_run_at=lambda q: query_last_run_at(cache_dir, "exa", q),
yield_of=yield_of,
),
cache_dir=cache_dir,
yield_of=yield_of,
)
discovery_run = DiscoveryRun(
run_id=run_id,
Expand Down Expand Up @@ -1219,13 +1244,18 @@ async def _run_google_cse_discovery(
) -> PersistedSourceDiscovery:
"""Run Google CSE-powered discovery with per-query checkpointing and incremental persist."""
cache_dir = config.discovery_state_paths.engine_query_cache_dir
yield_of = _query_yield_callback(config)
queries = _guard_search_budget(
config,
engine="google_cse",
queries=build_discovery_queries(
config, days=days, last_run_at=lambda q: query_last_run_at(cache_dir, "google_cse", q)
config,
days=days,
last_run_at=lambda q: query_last_run_at(cache_dir, "google_cse", q),
yield_of=yield_of,
),
cache_dir=cache_dir,
yield_of=yield_of,
)
discovery_run = DiscoveryRun(
run_id=run_id,
Expand Down
Loading
Loading