Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions benchmarks/olmocr/bench/runners/run_llamaparse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
olmOCR-bench runner for LlamaParse (llama-cloud).

Per-page contract (matches run_reducto / run_gemini_pro_31 / run_extend):
run_llamaparse(pdf_path, page_num=1) -> markdown string for that one page.

We extract the requested 1-indexed page to a tiny temp PDF (via pymupdf) and
upload that. Same logic as run_extend — bulletproof, keeps the upload payload
small even on huge source PDFs, and removes any server-side page-range edge
cases.

Mode choice (verified against developers.llamaindex.ai/llamaparse):
* tier="agentic_plus" — newer Tier API; "state-of-the-art models for
maximum accuracy on the hardest documents". Supersedes the legacy
`parse_mode` family (parse_page_with_agent et al).
* version="latest"
* expand=["markdown"] — populate result.markdown / result.markdown_full
in the response. Without this expand, you get only job metadata.

Things intentionally NOT set in the initial config:
* agentic_options.custom_prompt — soft natural-language steering, same
family as Extend's `custom_instructions`. Burned us once already; we
don't want to rely on it for header/footer removal. Start clean.
* crop_box — geometric header/footer clipping. Real lever, but blindly
applying e.g. top=0.08 can chop body text on pages without headers.
Add only if a sample run shows headers leaking through.
* processing_control / processing_options — left at server defaults.
"""

import os
import tempfile

import pymupdf

from src.commons_llamaparse import llamaparse_client, record_usage


PARSE_TIER = "agentic_plus"
PARSE_VERSION = "latest"


def _extract_page_to_tempfile(pdf_path: str, page_num: int) -> str:
"""Extract a single 1-indexed page into a new temp PDF file. Returns path."""
src = pymupdf.open(pdf_path)
try:
if page_num < 1 or page_num > src.page_count:
raise ValueError(
f"page_num {page_num} out of range for {pdf_path} (n_pages={src.page_count})"
)
out = pymupdf.open()
try:
out.insert_pdf(src, from_page=page_num - 1, to_page=page_num - 1)
fd, tmp_path = tempfile.mkstemp(suffix=".pdf", prefix="llamaparse_pg_")
os.close(fd)
out.save(tmp_path)
return tmp_path
finally:
out.close()
finally:
src.close()


def _extract_markdown(result) -> str:
"""Pull the markdown text out of a ParsingGetResponse.

Prefer `markdown_full` (flat string); fall back to joining
`markdown.pages[i].markdown` so we still get something if the server
only returns the structured form.
"""
full = getattr(result, "markdown_full", None)
if isinstance(full, str) and full.strip():
return full.strip()

md = getattr(result, "markdown", None)
pages = getattr(md, "pages", None) if md is not None else None
if not pages:
return ""

pieces: list[str] = []
for page in pages:
piece = getattr(page, "markdown", None)
if isinstance(piece, str) and piece.strip():
pieces.append(piece.strip())
return "\n\n".join(pieces).strip()


def run_llamaparse(
pdf_path: str,
page_num: int = 1,
timeout: float = 600.0,
) -> str:
"""Parse one PDF page through LlamaParse and return the markdown content."""
single_page_path = _extract_page_to_tempfile(pdf_path, page_num)
try:
with open(single_page_path, "rb") as fh:
uploaded = llamaparse_client.files.create(file=fh, purpose="parse")

result = llamaparse_client.parsing.parse(
file_id=uploaded.id,
tier=PARSE_TIER,
version=PARSE_VERSION,
expand=["markdown"],
timeout=timeout,
)

record_usage(PARSE_TIER, 1)

job_status = getattr(getattr(result, "job", None), "status", None)
if job_status != "COMPLETED":
err = getattr(getattr(result, "job", None), "error_message", None)
raise RuntimeError(
f"LlamaParse job did not complete: status={job_status} error={err}"
)

text = _extract_markdown(result)
if not text or text.strip().lower() in ("null", "none", "n/a"):
return ""
return text
finally:
try:
os.unlink(single_page_path)
except OSError:
pass
233 changes: 233 additions & 0 deletions benchmarks/olmocr/olmocr_bench_llamaparse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""
OlmOCR Benchmark for LlamaParse (llama-cloud).

Mirrors olmocr_bench_reducto.py / olmocr_bench_extend.py — same dataset, same
per-page contract, just swaps the candidate. Outputs land under
data_dir/llamaparse_agentic_plus/.

Usage:
uv run -m benchmarks.olmocr.olmocr_bench_llamaparse
uv run -m benchmarks.olmocr.olmocr_bench_llamaparse --sample
uv run -m benchmarks.olmocr.olmocr_bench_llamaparse --skip-generation
uv run -m benchmarks.olmocr.olmocr_bench_llamaparse --generate-only
"""

import argparse
import asyncio
import json
import os
import sys
from pathlib import Path

from dotenv import load_dotenv
from huggingface_hub import hf_hub_download
from tqdm.asyncio import tqdm_asyncio

load_dotenv()

PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
SAMPLE_DATA_DIR = Path(__file__).resolve().parent / "bench" / "sample_data"
FULL_DATA_DIR = Path(__file__).resolve().parent / "bench" / "full_data"
RESULTS_DIR = PROJECT_ROOT / "results"
USAGE_OUTPUT = RESULTS_DIR / "olmocr_llamaparse_usage.json"
CANDIDATE_NAME = "llamaparse_agentic_plus"
RATE_LIMIT = 8
MAX_RETRIES = 3

HF_REPO = "allenai/olmOCR-bench"
SPLITS = [
"arxiv_math",
"headers_footers",
"long_tiny_text",
"multi_column",
"old_scans",
"old_scans_math",
"table_tests",
]

sys.path.insert(0, str(PROJECT_ROOT))
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))


class RateLimiter:
def __init__(self, rate: int):
self.rate = rate
self.tokens = rate
self.last_refill = 0.0
self._lock = asyncio.Lock()

async def acquire(self):
while True:
async with self._lock:
now = asyncio.get_running_loop().time()
elapsed = now - self.last_refill
self.tokens = min(self.rate, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return
await asyncio.sleep(1 / self.rate)


def download_full_dataset():
data_dir = FULL_DATA_DIR
pdf_dir = data_dir / "pdfs"
all_pdfs = set()
for split in SPLITS:
jsonl_dest = data_dir / f"{split}.jsonl"
if jsonl_dest.exists():
with open(jsonl_dest) as f:
tests = [json.loads(l) for l in f if l.strip()]
else:
print(f" Downloading {split}.jsonl...")
src = hf_hub_download(
HF_REPO, f"bench_data/{split}.jsonl", repo_type="dataset"
)
with open(src) as f:
tests = [json.loads(l) for l in f if l.strip()]
data_dir.mkdir(parents=True, exist_ok=True)
with open(jsonl_dest, "w") as f:
for t in tests:
f.write(json.dumps(t) + "\n")
print(f" {split}: {len(tests)} tests")
for t in tests:
all_pdfs.add(t["pdf"])

print(f"\n Total unique PDFs to download: {len(all_pdfs)}")
downloaded = 0
skipped = 0
for pdf_rel in sorted(all_pdfs):
local_path = pdf_dir / pdf_rel
if local_path.exists():
skipped += 1
continue
local_path.parent.mkdir(parents=True, exist_ok=True)
try:
src = hf_hub_download(
HF_REPO, f"bench_data/pdfs/{pdf_rel}", repo_type="dataset"
)
os.symlink(src, str(local_path))
downloaded += 1
except Exception as e:
print(f" Failed to download {pdf_rel}: {e}")
print(f" PDFs: {downloaded} downloaded, {skipped} already existed")
return data_dir


async def process_page(pdf_path, page_num, output_path, rate_limiter):
from olmocr.bench.runners.run_llamaparse import run_llamaparse

for attempt in range(MAX_RETRIES):
await rate_limiter.acquire()
try:
result = await asyncio.to_thread(run_llamaparse, pdf_path, page_num)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(result)
return True
except Exception as e:
if attempt < MAX_RETRIES - 1:
await asyncio.sleep(2**attempt)
else:
print(
f"Failed after {MAX_RETRIES} attempts: {pdf_path} page {page_num}: {e}"
)
return False


async def generate_outputs(data_dir: Path):
pdf_folder = data_dir / "pdfs"
output_folder = data_dir / CANDIDATE_NAME

pdf_pages = set()
for jsonl_file in data_dir.glob("*.jsonl"):
with open(jsonl_file) as f:
for line in f:
line = line.strip()
if not line:
continue
t = json.loads(line)
pdf_pages.add((t["pdf"], t["page"]))

print(f"Found {len(pdf_pages)} unique (pdf, page) pairs to process")

rate_limiter = RateLimiter(RATE_LIMIT)
tasks = []
for pdf_rel, page in sorted(pdf_pages):
pdf_path = str(pdf_folder / pdf_rel)
if not os.path.exists(pdf_path):
continue
base_name = os.path.splitext(os.path.basename(pdf_rel))[0]
parent_dir = os.path.dirname(pdf_rel)
md_filename = f"{base_name}_pg{page}_repeat1.md"
if parent_dir:
out_path = str(output_folder / parent_dir / md_filename)
else:
out_path = str(output_folder / md_filename)
if os.path.exists(out_path):
continue
tasks.append(process_page(pdf_path, page, out_path, rate_limiter))

if not tasks:
print("All outputs already exist, skipping generation.")
return True
print(f"Processing {len(tasks)} pages...")
results = await tqdm_asyncio.gather(
*tasks, desc=f"Generating {CANDIDATE_NAME} outputs"
)
num_success = sum(1 for r in results if r)
num_failed = len(results) - num_success
print(f"Done: {num_success} succeeded, {num_failed} failed")

try:
from src.commons_llamaparse import write_usage_snapshot

write_usage_snapshot(USAGE_OUTPUT)
print(f"Usage written to {USAGE_OUTPUT}")
except Exception as e:
print(f" (usage snapshot failed: {e})")

return num_failed == 0


def run_evaluation(data_dir: Path):
from olmocr.bench.benchmark import main as bench_main

sys.argv = [
"benchmark",
"--dir",
str(data_dir),
"--candidate",
CANDIDATE_NAME,
"--force",
]
bench_main()


async def main():
parser = argparse.ArgumentParser(
description=f"Run OlmOCR benchmark with {CANDIDATE_NAME}"
)
parser.add_argument("--sample", action="store_true")
parser.add_argument("--skip-generation", action="store_true")
parser.add_argument("--generate-only", action="store_true")
args = parser.parse_args()

if args.sample:
data_dir = SAMPLE_DATA_DIR
print("=== Using sample data ===")
else:
print("=== Downloading full olmOCR-bench dataset from HuggingFace ===")
data_dir = download_full_dataset()

if not args.skip_generation:
print(f"\n=== Generating {CANDIDATE_NAME} outputs ===")
await generate_outputs(data_dir)

if not args.generate_only:
print("\n=== Running OlmOCR Benchmark Evaluation ===")
run_evaluation(data_dir)


if __name__ == "__main__":
asyncio.run(main())
Loading