From a5e460fe1dc3f4cdb1a9035c9e2760263995bac2 Mon Sep 17 00:00:00 2001 From: Abhinavexist Date: Thu, 4 Jun 2026 11:12:45 +0530 Subject: [PATCH] feat(ocr): add llama-parse-2.0 --- .../olmocr/bench/runners/run_llamaparse.py | 123 +++++++++ benchmarks/olmocr/olmocr_bench_llamaparse.py | 233 ++++++++++++++++++ src/commons_llamaparse.py | 77 ++++++ 3 files changed, 433 insertions(+) create mode 100644 benchmarks/olmocr/bench/runners/run_llamaparse.py create mode 100644 benchmarks/olmocr/olmocr_bench_llamaparse.py create mode 100644 src/commons_llamaparse.py diff --git a/benchmarks/olmocr/bench/runners/run_llamaparse.py b/benchmarks/olmocr/bench/runners/run_llamaparse.py new file mode 100644 index 0000000..53cd87c --- /dev/null +++ b/benchmarks/olmocr/bench/runners/run_llamaparse.py @@ -0,0 +1,123 @@ +""" +olmOCR-bench runner for LlamaParse (llama-cloud). + +Per-page contract (matches run_reducto / run_gemini_pro_31 / run_extend): + run_llamaparse(pdf_path, page_num=1) -> markdown string for that one page. + +We extract the requested 1-indexed page to a tiny temp PDF (via pymupdf) and +upload that. Same logic as run_extend — bulletproof, keeps the upload payload +small even on huge source PDFs, and removes any server-side page-range edge +cases. + +Mode choice (verified against developers.llamaindex.ai/llamaparse): + * tier="agentic_plus" — newer Tier API; "state-of-the-art models for + maximum accuracy on the hardest documents". Supersedes the legacy + `parse_mode` family (parse_page_with_agent et al). + * version="latest" + * expand=["markdown"] — populate result.markdown / result.markdown_full + in the response. Without this expand, you get only job metadata. + +Things intentionally NOT set in the initial config: + * agentic_options.custom_prompt — soft natural-language steering, same + family as Extend's `custom_instructions`. Burned us once already; we + don't want to rely on it for header/footer removal. Start clean. + * crop_box — geometric header/footer clipping. Real lever, but blindly + applying e.g. top=0.08 can chop body text on pages without headers. + Add only if a sample run shows headers leaking through. + * processing_control / processing_options — left at server defaults. +""" + +import os +import tempfile + +import pymupdf + +from src.commons_llamaparse import llamaparse_client, record_usage + + +PARSE_TIER = "agentic_plus" +PARSE_VERSION = "latest" + + +def _extract_page_to_tempfile(pdf_path: str, page_num: int) -> str: + """Extract a single 1-indexed page into a new temp PDF file. Returns path.""" + src = pymupdf.open(pdf_path) + try: + if page_num < 1 or page_num > src.page_count: + raise ValueError( + f"page_num {page_num} out of range for {pdf_path} (n_pages={src.page_count})" + ) + out = pymupdf.open() + try: + out.insert_pdf(src, from_page=page_num - 1, to_page=page_num - 1) + fd, tmp_path = tempfile.mkstemp(suffix=".pdf", prefix="llamaparse_pg_") + os.close(fd) + out.save(tmp_path) + return tmp_path + finally: + out.close() + finally: + src.close() + + +def _extract_markdown(result) -> str: + """Pull the markdown text out of a ParsingGetResponse. + + Prefer `markdown_full` (flat string); fall back to joining + `markdown.pages[i].markdown` so we still get something if the server + only returns the structured form. + """ + full = getattr(result, "markdown_full", None) + if isinstance(full, str) and full.strip(): + return full.strip() + + md = getattr(result, "markdown", None) + pages = getattr(md, "pages", None) if md is not None else None + if not pages: + return "" + + pieces: list[str] = [] + for page in pages: + piece = getattr(page, "markdown", None) + if isinstance(piece, str) and piece.strip(): + pieces.append(piece.strip()) + return "\n\n".join(pieces).strip() + + +def run_llamaparse( + pdf_path: str, + page_num: int = 1, + timeout: float = 600.0, +) -> str: + """Parse one PDF page through LlamaParse and return the markdown content.""" + single_page_path = _extract_page_to_tempfile(pdf_path, page_num) + try: + with open(single_page_path, "rb") as fh: + uploaded = llamaparse_client.files.create(file=fh, purpose="parse") + + result = llamaparse_client.parsing.parse( + file_id=uploaded.id, + tier=PARSE_TIER, + version=PARSE_VERSION, + expand=["markdown"], + timeout=timeout, + ) + + record_usage(PARSE_TIER, 1) + + job_status = getattr(getattr(result, "job", None), "status", None) + if job_status != "COMPLETED": + err = getattr(getattr(result, "job", None), "error_message", None) + raise RuntimeError( + f"LlamaParse job did not complete: status={job_status} error={err}" + ) + + text = _extract_markdown(result) + if not text or text.strip().lower() in ("null", "none", "n/a"): + return "" + return text + finally: + try: + os.unlink(single_page_path) + except OSError: + pass diff --git a/benchmarks/olmocr/olmocr_bench_llamaparse.py b/benchmarks/olmocr/olmocr_bench_llamaparse.py new file mode 100644 index 0000000..2ef5884 --- /dev/null +++ b/benchmarks/olmocr/olmocr_bench_llamaparse.py @@ -0,0 +1,233 @@ +""" +OlmOCR Benchmark for LlamaParse (llama-cloud). + +Mirrors olmocr_bench_reducto.py / olmocr_bench_extend.py — same dataset, same +per-page contract, just swaps the candidate. Outputs land under +data_dir/llamaparse_agentic_plus/. + +Usage: + uv run -m benchmarks.olmocr.olmocr_bench_llamaparse + uv run -m benchmarks.olmocr.olmocr_bench_llamaparse --sample + uv run -m benchmarks.olmocr.olmocr_bench_llamaparse --skip-generation + uv run -m benchmarks.olmocr.olmocr_bench_llamaparse --generate-only +""" + +import argparse +import asyncio +import json +import os +import sys +from pathlib import Path + +from dotenv import load_dotenv +from huggingface_hub import hf_hub_download +from tqdm.asyncio import tqdm_asyncio + +load_dotenv() + +PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent +SAMPLE_DATA_DIR = Path(__file__).resolve().parent / "bench" / "sample_data" +FULL_DATA_DIR = Path(__file__).resolve().parent / "bench" / "full_data" +RESULTS_DIR = PROJECT_ROOT / "results" +USAGE_OUTPUT = RESULTS_DIR / "olmocr_llamaparse_usage.json" +CANDIDATE_NAME = "llamaparse_agentic_plus" +RATE_LIMIT = 8 +MAX_RETRIES = 3 + +HF_REPO = "allenai/olmOCR-bench" +SPLITS = [ + "arxiv_math", + "headers_footers", + "long_tiny_text", + "multi_column", + "old_scans", + "old_scans_math", + "table_tests", +] + +sys.path.insert(0, str(PROJECT_ROOT)) +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + + +class RateLimiter: + def __init__(self, rate: int): + self.rate = rate + self.tokens = rate + self.last_refill = 0.0 + self._lock = asyncio.Lock() + + async def acquire(self): + while True: + async with self._lock: + now = asyncio.get_running_loop().time() + elapsed = now - self.last_refill + self.tokens = min(self.rate, self.tokens + elapsed * self.rate) + self.last_refill = now + if self.tokens >= 1: + self.tokens -= 1 + return + await asyncio.sleep(1 / self.rate) + + +def download_full_dataset(): + data_dir = FULL_DATA_DIR + pdf_dir = data_dir / "pdfs" + all_pdfs = set() + for split in SPLITS: + jsonl_dest = data_dir / f"{split}.jsonl" + if jsonl_dest.exists(): + with open(jsonl_dest) as f: + tests = [json.loads(l) for l in f if l.strip()] + else: + print(f" Downloading {split}.jsonl...") + src = hf_hub_download( + HF_REPO, f"bench_data/{split}.jsonl", repo_type="dataset" + ) + with open(src) as f: + tests = [json.loads(l) for l in f if l.strip()] + data_dir.mkdir(parents=True, exist_ok=True) + with open(jsonl_dest, "w") as f: + for t in tests: + f.write(json.dumps(t) + "\n") + print(f" {split}: {len(tests)} tests") + for t in tests: + all_pdfs.add(t["pdf"]) + + print(f"\n Total unique PDFs to download: {len(all_pdfs)}") + downloaded = 0 + skipped = 0 + for pdf_rel in sorted(all_pdfs): + local_path = pdf_dir / pdf_rel + if local_path.exists(): + skipped += 1 + continue + local_path.parent.mkdir(parents=True, exist_ok=True) + try: + src = hf_hub_download( + HF_REPO, f"bench_data/pdfs/{pdf_rel}", repo_type="dataset" + ) + os.symlink(src, str(local_path)) + downloaded += 1 + except Exception as e: + print(f" Failed to download {pdf_rel}: {e}") + print(f" PDFs: {downloaded} downloaded, {skipped} already existed") + return data_dir + + +async def process_page(pdf_path, page_num, output_path, rate_limiter): + from olmocr.bench.runners.run_llamaparse import run_llamaparse + + for attempt in range(MAX_RETRIES): + await rate_limiter.acquire() + try: + result = await asyncio.to_thread(run_llamaparse, pdf_path, page_num) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(result) + return True + except Exception as e: + if attempt < MAX_RETRIES - 1: + await asyncio.sleep(2**attempt) + else: + print( + f"Failed after {MAX_RETRIES} attempts: {pdf_path} page {page_num}: {e}" + ) + return False + + +async def generate_outputs(data_dir: Path): + pdf_folder = data_dir / "pdfs" + output_folder = data_dir / CANDIDATE_NAME + + pdf_pages = set() + for jsonl_file in data_dir.glob("*.jsonl"): + with open(jsonl_file) as f: + for line in f: + line = line.strip() + if not line: + continue + t = json.loads(line) + pdf_pages.add((t["pdf"], t["page"])) + + print(f"Found {len(pdf_pages)} unique (pdf, page) pairs to process") + + rate_limiter = RateLimiter(RATE_LIMIT) + tasks = [] + for pdf_rel, page in sorted(pdf_pages): + pdf_path = str(pdf_folder / pdf_rel) + if not os.path.exists(pdf_path): + continue + base_name = os.path.splitext(os.path.basename(pdf_rel))[0] + parent_dir = os.path.dirname(pdf_rel) + md_filename = f"{base_name}_pg{page}_repeat1.md" + if parent_dir: + out_path = str(output_folder / parent_dir / md_filename) + else: + out_path = str(output_folder / md_filename) + if os.path.exists(out_path): + continue + tasks.append(process_page(pdf_path, page, out_path, rate_limiter)) + + if not tasks: + print("All outputs already exist, skipping generation.") + return True + print(f"Processing {len(tasks)} pages...") + results = await tqdm_asyncio.gather( + *tasks, desc=f"Generating {CANDIDATE_NAME} outputs" + ) + num_success = sum(1 for r in results if r) + num_failed = len(results) - num_success + print(f"Done: {num_success} succeeded, {num_failed} failed") + + try: + from src.commons_llamaparse import write_usage_snapshot + + write_usage_snapshot(USAGE_OUTPUT) + print(f"Usage written to {USAGE_OUTPUT}") + except Exception as e: + print(f" (usage snapshot failed: {e})") + + return num_failed == 0 + + +def run_evaluation(data_dir: Path): + from olmocr.bench.benchmark import main as bench_main + + sys.argv = [ + "benchmark", + "--dir", + str(data_dir), + "--candidate", + CANDIDATE_NAME, + "--force", + ] + bench_main() + + +async def main(): + parser = argparse.ArgumentParser( + description=f"Run OlmOCR benchmark with {CANDIDATE_NAME}" + ) + parser.add_argument("--sample", action="store_true") + parser.add_argument("--skip-generation", action="store_true") + parser.add_argument("--generate-only", action="store_true") + args = parser.parse_args() + + if args.sample: + data_dir = SAMPLE_DATA_DIR + print("=== Using sample data ===") + else: + print("=== Downloading full olmOCR-bench dataset from HuggingFace ===") + data_dir = download_full_dataset() + + if not args.skip_generation: + print(f"\n=== Generating {CANDIDATE_NAME} outputs ===") + await generate_outputs(data_dir) + + if not args.generate_only: + print("\n=== Running OlmOCR Benchmark Evaluation ===") + run_evaluation(data_dir) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/commons_llamaparse.py b/src/commons_llamaparse.py new file mode 100644 index 0000000..8854f61 --- /dev/null +++ b/src/commons_llamaparse.py @@ -0,0 +1,77 @@ +""" +LlamaParse (llama-cloud) client + light usage tracking. + +Mirrors src/commons_reducto.py / src/commons_extend.py so the olmOCR-bench +candidate is wired up the same way as the other doc-parsing providers. + +Auth: LLAMA_CLOUD_API_KEY env var (keys start with `llx-`). The SDK reads +the env automatically when api_key is not passed, but we resolve it +explicitly so the import-time check fails loudly if it's missing. +""" + +import copy +import json +import os +import threading +from pathlib import Path + +from dotenv import load_dotenv +from llama_cloud import LlamaCloud + +load_dotenv() + + +if (LLAMA_CLOUD_API_KEY := os.getenv("LLAMA_CLOUD_API_KEY", None)) is None: + raise ValueError( + "LLAMA_CLOUD_API_KEY is not set — get it from https://cloud.llamaindex.ai" + ) + + +llamaparse_client = LlamaCloud(api_key=LLAMA_CLOUD_API_KEY) + + +_usage_lock = threading.Lock() +_usage_state: dict = { + "calls": 0, + "pages": 0, + "by_tier": {}, +} + + +def record_usage(tier: str, pages: int) -> None: + """Tally a single API response's usage. Safe to call from many threads. + + LlamaParse bills per-page; we record the per-call page count + tier so + the usage snapshot reflects how much agentic_plus we burned. + """ + pages = int(pages or 0) + with _usage_lock: + _usage_state["calls"] += 1 + _usage_state["pages"] += pages + bucket = _usage_state["by_tier"].setdefault( + tier or "unknown", {"calls": 0, "pages": 0} + ) + bucket["calls"] += 1 + bucket["pages"] += pages + + +def get_usage_snapshot() -> dict: + with _usage_lock: + return copy.deepcopy(_usage_state) + + +def reset_usage() -> None: + with _usage_lock: + _usage_state["calls"] = 0 + _usage_state["pages"] = 0 + _usage_state["by_tier"] = {} + + +def write_usage_snapshot(path) -> None: + """Atomically persist the current usage tally to a JSON file.""" + snap = get_usage_snapshot() + p = Path(path) + p.parent.mkdir(parents=True, exist_ok=True) + tmp = p.with_suffix(p.suffix + ".tmp") + tmp.write_text(json.dumps(snap, indent=2)) + tmp.replace(p)