From 20e8648bd28378b3cfee241cbcbf610b19cf88ba Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 12 Jun 2026 00:11:19 +0300 Subject: [PATCH] feat(discovery): yield-weighted query prioritization (#3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR 3 of 3. Measures which keywords/terms actually produced index-relevant records and prioritizes them when a query budget caps a run, so spend goes to proven enforcement-law terms over noisy generic ones. Signal chain (record -> candidate -> query): - an index-relevant operational record lists its event_candidate_ids; - each candidate records the discovery_queries that found it; - each query text feeding a relevant record earns one yield point (deduped per record). - query_yield.py: compute_query_yield (pure) + QueryYieldStore (JSON cache). - select_run_queries / build_discovery_queries gain a yield_of callback; the cap now orders by (yield, kind priority, recency, index). - Engine fns load the cached yield map and pass yield_of to both build and the $-budget guard. - CLI `denbust query-yield` computes from the operational + candidate stores, caches query_yield.json, and prints the top keywords. Validated on the live index (16 records): top-yield terms are the specific trafficking/prostitution-law taxonomy terms ("הבאת אדם למדינה אחרת לשם העיסוק בזנות", "המודל הנורדי", "סחר בנשים"), not generic keywords — exactly the spend priority we want. Co-Authored-By: Claude Sonnet 4.6 --- docs/batch_scraping_protocol.md | 16 +++++++ src/denbust/cli.py | 46 ++++++++++++++++++ src/denbust/discovery/queries.py | 24 ++++++---- src/denbust/discovery/query_yield.py | 72 ++++++++++++++++++++++++++++ src/denbust/discovery/state_paths.py | 2 + src/denbust/pipeline.py | 46 ++++++++++++++---- tests/unit/test_query_yield.py | 71 +++++++++++++++++++++++++++ 7 files changed, 259 insertions(+), 18 deletions(-) create mode 100644 src/denbust/discovery/query_yield.py create mode 100644 tests/unit/test_query_yield.py diff --git a/docs/batch_scraping_protocol.md b/docs/batch_scraping_protocol.md index 2f11ec7..2e5766f 100644 --- a/docs/batch_scraping_protocol.md +++ b/docs/batch_scraping_protocol.md @@ -190,6 +190,22 @@ the query pool instead of re-issuing the same head every run — maximising fres coverage per dollar. Implementation: `select_run_queries` (queries.py) + `query_last_run_at` (engine_checkpoint.py). +### Yield-weighted prioritization + +Within a budget cap the **highest-yield query texts go first** — keywords/terms +that have actually produced index-relevant records win the budget over noisy +ones. Yield is measured along the record -> candidate -> query chain (a relevant +record's `event_candidate_ids` -> each candidate's `discovery_queries`) and +cached: + +```bash +denbust query-yield --config # compute + cache query_yield.json, print top keywords +``` + +The budget cap then orders queries by **(yield, kind priority, recency)**, so +proven enforcement-law terms are refreshed before low-yield generic keywords. +Implementation: `src/denbust/discovery/query_yield.py`. + ## Outputs of each batch Every batch run should report: diff --git a/src/denbust/cli.py b/src/denbust/cli.py index 686a63e..7f04deb 100644 --- a/src/denbust/cli.py +++ b/src/denbust/cli.py @@ -738,6 +738,52 @@ def search_budget( typer.echo(f" {engine:<11} {queries:>6} queries ${usd:>7.3f}{cap}") +@app.command("query-yield") +def query_yield( + config: Annotated[ + Path | None, + typer.Option("--config", "-c", help="Path to YAML config file"), + ] = None, + top: Annotated[ + int, + typer.Option("--top", help="How many top-yield query texts to print."), + ] = 20, +) -> None: + """Compute and cache per-keyword yield (index-relevant records discovered). + + Walks the record → candidate → query chain, writes ``query_yield.json``, and + prints the highest-yield query texts. Discovery then spends its query budget + on proven keywords first. See ``docs/batch_scraping_protocol.md``. + """ + from denbust.config import load_config + from denbust.discovery.query_yield import QueryYieldStore, compute_query_yield + from denbust.discovery.storage import create_discovery_persistence + from denbust.ops.factory import create_operational_store + + cfg = load_config(config or Path("agents/news/local.yaml")) + store = create_operational_store(cfg) + persistence = create_discovery_persistence(cfg) + try: + rows = store.fetch_records(cfg.dataset_name) + candidate_queries = { + candidate.candidate_id: candidate.discovery_queries + for candidate in persistence.list_candidates() + } + finally: + persistence.close() + store.close() + + yield_map = compute_query_yield(rows, candidate_queries) + QueryYieldStore(cfg.discovery_state_paths.query_yield_path).save(yield_map) + + relevant = sum(1 for r in rows if r.get("index_relevant")) + typer.echo(f"Query yield from {relevant} index-relevant record(s):") + for text, score in sorted(yield_map.items(), key=lambda kv: (-kv[1], kv[0]))[:top]: + typer.echo(f" {score:>3} {text}") + if not yield_map: + typer.echo(" (no yielding queries yet)") + + @app.command() def version() -> None: """Show version information.""" diff --git a/src/denbust/discovery/queries.py b/src/denbust/discovery/queries.py index 504dfcc..9a33b80 100644 --- a/src/denbust/discovery/queries.py +++ b/src/denbust/discovery/queries.py @@ -116,25 +116,28 @@ def select_run_queries( max_queries: int | None, *, last_run_at: Callable[[DiscoveryQuery], datetime | None] | None = None, + yield_of: Callable[[DiscoveryQuery], int] | None = None, ) -> list[DiscoveryQuery]: - """Cap *queries* to *max_queries*, highest-priority kinds first. + """Cap *queries* to *max_queries*, best queries first. - When *last_run_at* is supplied, ties within a priority tier are broken by - **least-recently-run first** (never-run queries first, then oldest), so a - budget-capped run refreshes a different slice of the pool each time - (cross-run rotation) instead of re-issuing the same head every run. + Ordering, in priority: highest historical **yield** first (when *yield_of* + is supplied — query texts that have produced index-relevant records), then + highest-priority query **kind** (open-web broad/taxonomy), then + **least-recently-run** (when *last_run_at* is supplied — cross-run rotation), + then original order. """ if max_queries is None or len(queries) <= max_queries: return queries def sort_key(pair: tuple[int, DiscoveryQuery]) -> tuple[object, ...]: index, query = pair + yield_rank = -(yield_of(query) if yield_of is not None else 0) priority = _QUERY_KIND_PRIORITY.get(query.query_kind, 99) if last_run_at is None: - return (priority, index) + return (yield_rank, priority, index) ran_at = last_run_at(query) recency = (0, 0.0) if ran_at is None else (1, ran_at.timestamp()) - return (priority, recency, index) + return (yield_rank, priority, recency, index) ordered = sorted(enumerate(queries), key=sort_key) return [query for _, query in ordered[:max_queries]] @@ -165,14 +168,15 @@ def build_discovery_queries( now: datetime | None = None, max_queries: int | None = None, last_run_at: Callable[[DiscoveryQuery], datetime | None] | None = None, + yield_of: Callable[[DiscoveryQuery], int] | None = None, ) -> list[DiscoveryQuery]: """Build normalized discovery queries for enabled discovery engines. Source-targeted queries cover only ``source_targeted_search_domains`` — natively-crawled and blocklisted domains are dropped to save search budget. When *max_queries* (or ``config.discovery.max_queries_per_run``) is set, the - result is capped to that many queries, keeping the highest-priority kinds; - *last_run_at* additionally rotates within a tier (least-recently-run first). + result is capped — highest historical *yield_of* first, then kind priority, + then *last_run_at* rotation. """ keywords = _normalize_keywords(config.keywords) taxonomy_enabled = DiscoveryQueryKind.TAXONOMY_TARGETED in config.discovery.default_query_kinds @@ -285,4 +289,4 @@ def build_discovery_queries( seen_keys.add(taxonomy_source_key) budget = max_queries if max_queries is not None else config.discovery.max_queries_per_run - return select_run_queries(queries, budget, last_run_at=last_run_at) + return select_run_queries(queries, budget, last_run_at=last_run_at, yield_of=yield_of) diff --git a/src/denbust/discovery/query_yield.py b/src/denbust/discovery/query_yield.py new file mode 100644 index 0000000..928d9af --- /dev/null +++ b/src/denbust/discovery/query_yield.py @@ -0,0 +1,72 @@ +"""Yield-weighted query prioritization. + +Not all search keywords are equal: ``סחר בבני אדם`` (human trafficking) and +``בית בושת`` (brothel) have produced verified enforcement records, while others +mostly surface noise. This module measures, per query text, how many +index-relevant records that query actually contributed to discovering, and +caches the map so the budget cap can spend on proven keywords first. + +The signal chain is record → candidate → query: + +* an index-relevant operational record lists the ``event_candidate_ids`` it was + built from; +* each candidate records the ``discovery_queries`` (keyword/term texts) that + found it; +* so each query text that fed an index-relevant record earns one yield point. + +``compute_query_yield`` is pure (takes plain inputs) for testability; +``QueryYieldStore`` caches the result as JSON under the discovery state dir. +""" + +from __future__ import annotations + +import json +from collections import defaultdict +from collections.abc import Iterable, Mapping, Sequence +from pathlib import Path +from typing import Any + + +def compute_query_yield( + records: Iterable[Mapping[str, Any]], + candidate_queries: Mapping[str, Sequence[str]], +) -> dict[str, int]: + """Return ``{query_text: index_relevant_record_count}``. + + *records* are operational rows (dicts) with ``index_relevant`` and + ``event_candidate_ids``. *candidate_queries* maps candidate id → the query + texts that discovered it. A query text earns one point per distinct + index-relevant record any of its candidates contributed to. + """ + yield_map: dict[str, int] = defaultdict(int) + for record in records: + if not record.get("index_relevant"): + continue + texts: set[str] = set() + for candidate_id in record.get("event_candidate_ids") or []: + for query_text in candidate_queries.get(candidate_id, ()): + if query_text: + texts.add(query_text) + for query_text in texts: + yield_map[query_text] += 1 + return dict(yield_map) + + +class QueryYieldStore: + """JSON cache of ``{query_text: yield}`` under the discovery state dir.""" + + def __init__(self, path: Path) -> None: + self.path = path + + def load(self) -> dict[str, int]: + if not self.path.exists(): + return {} + data = json.loads(self.path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + return {} + return {str(k): int(v) for k, v in data.items()} + + def save(self, yield_map: Mapping[str, int]) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + ordered = dict(sorted(yield_map.items(), key=lambda kv: (-kv[1], kv[0]))) + self.path.write_text(json.dumps(ordered, ensure_ascii=False, indent=2), encoding="utf-8") diff --git a/src/denbust/discovery/state_paths.py b/src/denbust/discovery/state_paths.py index 98c41c9..16005fd 100644 --- a/src/denbust/discovery/state_paths.py +++ b/src/denbust/discovery/state_paths.py @@ -39,6 +39,7 @@ class DiscoveryStatePaths(BaseModel): engine_query_cache_dir: Path domain_verdicts_path: Path search_budget_path: Path + query_yield_path: Path def resolve_discovery_state_paths( @@ -65,6 +66,7 @@ def resolve_discovery_state_paths( engine_query_cache_dir=candidates_dir / "engine_query_cache", domain_verdicts_path=candidates_dir / "domain_verdicts.jsonl", search_budget_path=candidates_dir / "search_budget.jsonl", + query_yield_path=candidates_dir / "query_yield.json", latest_candidates_path=candidates_dir / "latest_candidates.jsonl", latest_backfill_batches_path=backfill_batches_dir / "latest_backfill_batches.jsonl", retry_queue_path=candidates_dir / "retry_queue.jsonl", diff --git a/src/denbust/pipeline.py b/src/denbust/pipeline.py index bbe9ec2..2007763 100644 --- a/src/denbust/pipeline.py +++ b/src/denbust/pipeline.py @@ -8,7 +8,7 @@ import os import sys from collections import defaultdict -from collections.abc import Mapping +from collections.abc import Callable, Mapping from datetime import UTC, datetime from pathlib import Path from typing import Any, TypedDict @@ -72,6 +72,7 @@ PersistentCandidate, ) from denbust.discovery.queries import build_discovery_queries, select_run_queries +from denbust.discovery.query_yield import QueryYieldStore from denbust.discovery.scrape_queue import ( SCRAPEABLE_CANDIDATE_STATUSES, CandidateScrapeBatch, @@ -972,15 +973,28 @@ async def _run_source_native_discovery( _DISCOVERY_PERSIST_BATCH: int = 50 +def _query_yield_callback(config: Config) -> Callable[[DiscoveryQuery], int] | None: + """Return a yield lookup from the cached query-yield map, or None if empty.""" + yield_map = QueryYieldStore(config.discovery_state_paths.query_yield_path).load() + if not yield_map: + return None + return lambda query: yield_map.get(query.query_text, 0) + + def _guard_search_budget( - config: Config, *, engine: str, queries: list[DiscoveryQuery], cache_dir: Path + config: Config, + *, + engine: str, + queries: list[DiscoveryQuery], + cache_dir: Path, + yield_of: Callable[[DiscoveryQuery], int] | None = None, ) -> list[DiscoveryQuery]: """Cap *queries* to what the engine's remaining monthly budget can afford. No-op when the engine has no ``monthly_budget_usd``. When the budget is - partly spent, keeps the highest-priority queries that fit (avoids the 402), - rotating within a tier by least-recently-run so successive capped runs - refresh different slices of the pool. + partly spent, keeps the best queries that fit (highest yield, then kind + priority, then least-recently-run rotation) so the spend goes to the most + productive queries and successive runs refresh different slices. """ engine_cfg = getattr(config.discovery.engines, engine, None) budget = getattr(engine_cfg, "monthly_budget_usd", None) @@ -1008,6 +1022,7 @@ def _guard_search_budget( queries, affordable, last_run_at=lambda query: query_last_run_at(cache_dir, engine, query), + yield_of=yield_of, ) return queries @@ -1029,13 +1044,18 @@ async def _run_brave_discovery( ) -> PersistedSourceDiscovery: """Run Brave-powered discovery with per-query checkpointing and incremental persist.""" cache_dir = config.discovery_state_paths.engine_query_cache_dir + yield_of = _query_yield_callback(config) queries = _guard_search_budget( config, engine="brave", queries=build_discovery_queries( - config, days=days, last_run_at=lambda q: query_last_run_at(cache_dir, "brave", q) + config, + days=days, + last_run_at=lambda q: query_last_run_at(cache_dir, "brave", q), + yield_of=yield_of, ), cache_dir=cache_dir, + yield_of=yield_of, ) discovery_run = DiscoveryRun( run_id=run_id, @@ -1122,13 +1142,18 @@ async def _run_exa_discovery( ) -> PersistedSourceDiscovery: """Run Exa-powered discovery with per-query checkpointing and incremental persist.""" cache_dir = config.discovery_state_paths.engine_query_cache_dir + yield_of = _query_yield_callback(config) queries = _guard_search_budget( config, engine="exa", queries=build_discovery_queries( - config, days=days, last_run_at=lambda q: query_last_run_at(cache_dir, "exa", q) + config, + days=days, + last_run_at=lambda q: query_last_run_at(cache_dir, "exa", q), + yield_of=yield_of, ), cache_dir=cache_dir, + yield_of=yield_of, ) discovery_run = DiscoveryRun( run_id=run_id, @@ -1219,13 +1244,18 @@ async def _run_google_cse_discovery( ) -> PersistedSourceDiscovery: """Run Google CSE-powered discovery with per-query checkpointing and incremental persist.""" cache_dir = config.discovery_state_paths.engine_query_cache_dir + yield_of = _query_yield_callback(config) queries = _guard_search_budget( config, engine="google_cse", queries=build_discovery_queries( - config, days=days, last_run_at=lambda q: query_last_run_at(cache_dir, "google_cse", q) + config, + days=days, + last_run_at=lambda q: query_last_run_at(cache_dir, "google_cse", q), + yield_of=yield_of, ), cache_dir=cache_dir, + yield_of=yield_of, ) discovery_run = DiscoveryRun( run_id=run_id, diff --git a/tests/unit/test_query_yield.py b/tests/unit/test_query_yield.py new file mode 100644 index 0000000..344e3d3 --- /dev/null +++ b/tests/unit/test_query_yield.py @@ -0,0 +1,71 @@ +"""Unit tests for yield-weighted query prioritization.""" + +from __future__ import annotations + +from pathlib import Path + +from denbust.discovery.query_yield import QueryYieldStore, compute_query_yield + + +def test_compute_query_yield_credits_contributing_queries() -> None: + """Each query that fed an index-relevant record earns a point per record.""" + records = [ + {"index_relevant": True, "event_candidate_ids": ["c1", "c2"]}, + {"index_relevant": True, "event_candidate_ids": ["c3"]}, + {"index_relevant": False, "event_candidate_ids": ["c4"]}, # ignored + ] + candidate_queries = { + "c1": ["סחר בבני אדם", "בית בושת"], + "c2": ["סחר בבני אדם"], # same record → "סחר" counted once for record 1 + "c3": ["בית בושת"], + "c4": ["ליווי"], # only on a non-relevant record + } + + result = compute_query_yield(records, candidate_queries) + + assert result["סחר בבני אדם"] == 1 # one distinct relevant record + assert result["בית בושת"] == 2 # record 1 and record 3 + assert "ליווי" not in result # never on a relevant record + + +def test_compute_query_yield_dedupes_within_record() -> None: + """A query on multiple candidates of the same record counts once for it.""" + records = [{"index_relevant": True, "event_candidate_ids": ["a", "b", "c"]}] + candidate_queries = {"a": ["k"], "b": ["k"], "c": ["k"]} + assert compute_query_yield(records, candidate_queries) == {"k": 1} + + +def test_compute_query_yield_empty() -> None: + """No relevant records → empty map.""" + assert compute_query_yield([], {}) == {} + assert compute_query_yield([{"index_relevant": False}], {}) == {} + + +def test_query_yield_store_round_trip(tmp_path: Path) -> None: + """The cache writes and reads back the yield map.""" + store = QueryYieldStore(tmp_path / "query_yield.json") + assert store.load() == {} + store.save({"בית בושת": 2, "סחר בבני אדם": 1}) + assert QueryYieldStore(tmp_path / "query_yield.json").load() == { + "בית בושת": 2, + "סחר בבני אדם": 1, + } + + +def test_select_run_queries_prioritizes_high_yield() -> None: + """A capped run keeps high-yield query texts first, over kind priority.""" + from denbust.config import Config, SourceConfig, SourceType + from denbust.discovery.queries import build_discovery_queries, select_run_queries + + config = Config( + keywords=["זנות", "בית בושת", "סרסור"], + sources=[SourceConfig(name="mako", type=SourceType.SCRAPER)], + discovery={"default_query_kinds": ["broad"]}, + ) + queries = build_discovery_queries(config, days=3) + assert len(queries) == 3 + + yield_map = {"סרסור": 5} # the last keyword is the proven one + + capped = select_run_queries(queries, 1, yield_of=lambda q: yield_map.get(q.query_text, 0)) + assert [q.query_text for q in capped] == ["סרסור"]