From afe914fd65b0cd1f7a2e83ef4a8bc91e8867c085 Mon Sep 17 00:00:00 2001 From: Qiuyang Mang Date: Fri, 29 May 2026 03:32:45 -0700 Subject: [PATCH 1/2] Support project-based Frontier-CS 2.0 Harbor tasks --- .../src/frontier_cs_2_0/adapter.py | 94 +++++++++++- .../task-template/environment/Dockerfile | 5 +- .../environment/Dockerfile.judge | 3 +- .../environment/docker-compose.yaml | 14 +- .../task-template/environment/judge_server.py | 134 +++++++++++++++-- .../task-template/environment/submit.py | 140 +++++++++++++++--- .../frontier_cs_2_0/task-template/task.toml | 8 +- .../task-template/tests/evaluate.py | 45 +++++- .../src/frontier_cs_2_0/utils.py | 1 + src/frontier_cs/cli.py | 19 ++- src/frontier_cs/config.py | 5 + 11 files changed, 412 insertions(+), 56 deletions(-) diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/adapter.py b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/adapter.py index bc0c4bb9e..8fed62bf8 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/adapter.py +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/adapter.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import json import shutil from pathlib import Path from types import SimpleNamespace @@ -63,6 +64,7 @@ def discover_problems(frontier_cs_root: Path) -> list[FrontierCS20Problem]: language=str(runtime.get("language", "python")), timeout_seconds=int(runtime.get("timeout_seconds", 10800)), docker_image=str(docker.get("image", "ubuntu:24.04")), + config=config, ) ) @@ -131,13 +133,32 @@ def generate_task( return task_paths.task_dir def _write_instruction(self, task_paths: "TaskPaths", problem: FrontierCS20Problem) -> None: + submission = problem.config.get("submission", {}) or {} + submission_kind = str(submission.get("kind", "file")) + submission_path = str(submission.get("path", "/app/solution.py")) + if submission_kind == "directory": + workflow = ( + "Create or modify the project under `/app`. You can call " + "`bash /app/submit.sh` at any time to package `/app`, grade it " + "with the same black-box judge used by the final verifier, and " + "get score feedback. The evaluator implementation and hidden " + "benchmark data are intentionally not available in the agent " + "workspace. The task statement defines the required build and " + "runtime contract.\n\n" + f"Final submission path: `{submission_path}`\n" + ) + else: + workflow = ( + f"Create a {problem.language} solution at `{submission_path}`. " + "You can call `bash /app/submit.sh` at any time to grade the " + "current solution with the same black-box judge used by the final " + "verifier and get score feedback. The evaluator implementation is " + "intentionally not available in the agent workspace.\n" + ) + instruction = ( "You are solving a Frontier-CS 2.0 open-ended optimization problem.\n\n" - "Create a Python solution at `/app/solution.py`. You can call " - "`bash /app/submit.sh` at any time to grade the current solution " - "with the same black-box judge used by the final verifier and get " - "score feedback. The evaluator implementation is intentionally not " - "available in the agent workspace.\n\n" + f"{workflow}\n" f"Problem id: `{problem.problem_id}`\n" f"Language: `{problem.language}`\n" f"Time limit: `{problem.timeout_seconds}s`\n\n" @@ -152,8 +173,35 @@ def _write_environment(self, task_paths: "TaskPaths", problem: FrontierCS20Probl encoding="utf-8" ) image = self.docker_image or problem.docker_image + runtime = problem.config.get("runtime", {}) or {} + apt_package_names = [ + str(pkg) + for pkg in [ + *(runtime.get("apt_packages", []) or []), + *(runtime.get("judge_apt_packages", []) or []), + ] + ] + apt_packages = " ".join(dict.fromkeys(apt_package_names)) + extra_apt_install = ( + f"apt-get install -y --no-install-recommends {apt_packages} &&" + if apt_packages + else ": &&" + ) + pip_package_names = [ + str(pkg) for pkg in runtime.get("judge_pip_packages", []) or [] + ] + pip_packages = " ".join(dict.fromkeys(pip_package_names)) + extra_pip_install = ( + f"pip3 install --break-system-packages {pip_packages} &&" + if pip_packages + else ": &&" + ) env_dir.joinpath("Dockerfile").write_text( - dockerfile.replace("{base_image}", image), + dockerfile.replace("{base_image}", image).replace( + "{extra_apt_install}", extra_apt_install + ).replace( + "{extra_pip_install}", extra_pip_install + ), encoding="utf-8", ) @@ -161,12 +209,28 @@ def _write_environment(self, task_paths: "TaskPaths", problem: FrontierCS20Probl src = problem.problem_dir / name if src.exists(): shutil.copy2(src, env_dir / name) + self._write_submission_config(env_dir, problem) + harbor_app_dir = problem.problem_dir / "harbor" / "app" + generated_harbor_app_dir = env_dir / "harbor_app" + generated_harbor_app_dir.mkdir(parents=True, exist_ok=True) + if harbor_app_dir.exists(): + shutil.copytree( + harbor_app_dir, generated_harbor_app_dir, dirs_exist_ok=True + ) judge_dockerfile = ( self.template_dir / "environment" / "Dockerfile.judge" ).read_text(encoding="utf-8") + judge_apt_packages = " ".join( + str(pkg) for pkg in runtime.get("judge_apt_packages", []) or [] + ) env_dir.joinpath("Dockerfile.judge").write_text( - judge_dockerfile.replace("{base_image}", image), + judge_dockerfile.replace("{base_image}", image).replace( + "{judge_apt_packages_line}", + f" {judge_apt_packages}" if judge_apt_packages else "", + ).replace( + "{judge_pip_install}", extra_pip_install + ), encoding="utf-8", ) for name in ("docker-compose.yaml", "judge_server.py", "submit.py"): @@ -178,6 +242,15 @@ def _write_environment(self, task_paths: "TaskPaths", problem: FrontierCS20Probl shutil.copy2(self.template_dir / "environment" / "submit.sh", submit_sh) submit_sh.chmod(0o755) + def _write_submission_config(self, env_dir: Path, problem: FrontierCS20Problem) -> None: + submission = dict(problem.config.get("submission", {}) or {}) + submission.setdefault("kind", "file") + submission.setdefault("path", "/app/solution.py") + submission.setdefault("exclude", []) + (env_dir / "submission_config.json").write_text( + json.dumps(submission, indent=2), encoding="utf-8" + ) + def _write_tests(self, task_paths: "TaskPaths", problem: FrontierCS20Problem) -> None: tests_dir = task_paths.tests_dir shutil.copy2(self.template_dir / "tests" / "test.sh", tests_dir / "test.sh") @@ -196,12 +269,19 @@ def _write_solution(self, task_paths: "TaskPaths", problem: FrontierCS20Problem) def _write_task_config(self, task_paths: "TaskPaths", problem: FrontierCS20Problem) -> None: template = (self.template_dir / "task.toml").read_text(encoding="utf-8") + environment = problem.config.get("environment", {}) or {} text = template.format( task_id=problem.task_id, problem_id=problem.problem_id, tag=problem.tag, timeout_sec=problem.timeout_seconds, agent_timeout_sec=max(10800, problem.timeout_seconds), + environment_build_timeout_sec=float( + environment.get("build_timeout_seconds", 600) + ), + cpus=int(environment.get("cpus", 2)), + memory_mb=int(environment.get("memory_mb", 4096)), + storage_mb=int(environment.get("storage_mb", 4096)), ) try: from harbor.models.task.config import TaskConfig diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/Dockerfile b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/Dockerfile index 7873525ce..90971ce49 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/Dockerfile +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/Dockerfile @@ -3,7 +3,9 @@ FROM {base_image} RUN apt-get update && \ apt-get install -y --no-install-recommends \ python3 python3-pip ca-certificates bash curl ripgrep && \ + {extra_apt_install} \ pip3 install --break-system-packages requests && \ + {extra_pip_install} \ rm -rf /var/lib/apt/lists/* # Pre-install Claude Code so the agent setup does not need apt at trial time. @@ -23,5 +25,6 @@ ENV CLAUDE_CODE_MAX_OUTPUT_TOKENS=128000 WORKDIR /app -COPY readme config.yaml submit.py submit.sh /app/ +COPY readme config.yaml submission_config.json submit.py submit.sh /app/ +COPY harbor_app/ /app/ RUN chmod +x /app/submit.sh diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/Dockerfile.judge b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/Dockerfile.judge index 6207d4fa5..5ae2f5b0f 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/Dockerfile.judge +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/Dockerfile.judge @@ -1,7 +1,8 @@ FROM {base_image} RUN apt-get update && \ - apt-get install -y --no-install-recommends python3 ca-certificates && \ + apt-get install -y --no-install-recommends python3 ca-certificates {judge_apt_packages_line} && \ + {judge_pip_install} \ rm -rf /var/lib/apt/lists/* WORKDIR /judge diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/docker-compose.yaml b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/docker-compose.yaml index 4e07cbdc7..c70b9b334 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/docker-compose.yaml +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/docker-compose.yaml @@ -2,7 +2,7 @@ services: main: depends_on: judge: - condition: service_started + condition: service_healthy environment: JUDGE_URL: "http://judge:8082" @@ -15,3 +15,15 @@ services: - "8082" environment: PORT: "8082" + healthcheck: + test: + [ + "CMD", + "python3", + "-c", + "import urllib.request; urllib.request.urlopen('http://localhost:8082/health', timeout=2).read()", + ] + interval: 5s + timeout: 3s + retries: 360 + start_period: 5s diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/judge_server.py b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/judge_server.py index 9b9ef1d11..e5cc7647e 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/judge_server.py +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/judge_server.py @@ -6,13 +6,21 @@ import importlib.util import json import os +import sys +import base64 +import tarfile import tempfile +import io +import time +import traceback +import threading from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any PROBLEM_EVALUATOR_PATH = Path("/judge/problem_evaluator.py") -MAX_SUBMISSION_BYTES = 2_000_000 +MAX_SUBMISSION_BYTES = 30_000_000 +MAX_ARCHIVE_BYTES = 20_000_000 def load_problem_evaluator(): @@ -22,26 +30,105 @@ def load_problem_evaluator(): if spec is None or spec.loader is None: raise RuntimeError(f"could not load evaluator from {PROBLEM_EVALUATOR_PATH}") module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module spec.loader.exec_module(module) return module -EVALUATOR = load_problem_evaluator() - - -def evaluate_code(code: str) -> dict[str, Any]: - with tempfile.TemporaryDirectory(prefix="frontier_cs_2_0_submission_") as tmp: - solution_path = Path(tmp) / "solution.py" - solution_path.write_text(code, encoding="utf-8") - score, score_unbounded, message = EVALUATOR.evaluate(str(solution_path)) +EVALUATOR = None +READY = False +READY_PAYLOAD: dict[str, Any] = {"status": "starting"} + + +def prepare_evaluator() -> None: + global EVALUATOR, READY, READY_PAYLOAD + start = time.time() + try: + EVALUATOR = load_problem_evaluator() + prepare = getattr(EVALUATOR, "prepare", None) + if callable(prepare): + payload = prepare() + else: + ensure = getattr(EVALUATOR, "_ensure_benchmark", None) + payload = ensure() if callable(ensure) else {} + elapsed = time.time() - start + if not isinstance(payload, dict): + payload = {} + READY_PAYLOAD = { + "status": "ok", + "ready": True, + "prepare_seconds": elapsed, + **payload, + } + READY = True + print( + "[frontier judge] ready " + + " ".join(f"{key}={value}" for key, value in READY_PAYLOAD.items()), + flush=True, + ) + except Exception as exc: + READY_PAYLOAD = { + "status": "error", + "ready": False, + "error": str(exc), + } + print(traceback.format_exc(), flush=True) + + +def normalize_result(result: Any) -> tuple[float, float, str, dict[str, Any]]: + if not isinstance(result, tuple) or len(result) not in (3, 4): + raise TypeError("evaluator must return (score, score_unbounded, message[, metrics])") + score = float(result[0]) + score_unbounded = float(result[1]) + message = str(result[2]) + metrics = result[3] if len(result) == 4 else {} + if not isinstance(metrics, dict): + raise TypeError("evaluator metrics must be a dict") + return score, score_unbounded, message, metrics + + +def evaluate_path(solution_path: Path) -> dict[str, Any]: + if EVALUATOR is None: + raise RuntimeError("problem evaluator is not loaded") + score, score_unbounded, message, metrics = normalize_result( + EVALUATOR.evaluate(str(solution_path)) + ) return { "status": "done", "score": float(score), "score_unbounded": float(score_unbounded), "message": message, + "metrics": metrics, } +def evaluate_code(code: str) -> dict[str, Any]: + with tempfile.TemporaryDirectory(prefix="frontier_cs_2_0_submission_") as tmp: + solution_path = Path(tmp) / "solution.py" + solution_path.write_text(code, encoding="utf-8") + return evaluate_path(solution_path) + + +def is_safe_tar_member(member: tarfile.TarInfo) -> bool: + path = Path(member.name) + return not path.is_absolute() and ".." not in path.parts + + +def evaluate_archive(archive_b64: str) -> dict[str, Any]: + archive = base64.b64decode(archive_b64.encode("ascii"), validate=True) + if len(archive) > MAX_ARCHIVE_BYTES: + raise ValueError("submission archive too large") + with tempfile.TemporaryDirectory(prefix="frontier_cs_2_0_project_") as tmp: + root = Path(tmp) / "submission" + root.mkdir() + with tarfile.open(fileobj=io.BytesIO(archive), mode="r:gz") as tar: + members = tar.getmembers() + if not all(is_safe_tar_member(member) for member in members): + raise ValueError("unsafe path in submission archive") + tar.extractall(root) + return evaluate_path(root) + + class JudgeHandler(BaseHTTPRequestHandler): server_version = "FrontierCS20Judge/1.0" @@ -55,7 +142,7 @@ def _write_json(self, status: int, payload: dict[str, Any]) -> None: def do_GET(self) -> None: if self.path == "/health": - self._write_json(200, {"status": "ok"}) + self._write_json(200 if READY else 503, READY_PAYLOAD) return self._write_json(404, {"status": "error", "error": "not found"}) @@ -63,6 +150,18 @@ def do_POST(self) -> None: if self.path != "/evaluate": self._write_json(404, {"status": "error", "error": "not found"}) return + if not READY: + self._write_json( + 503, + { + "status": "error", + "score": 0.0, + "score_unbounded": 0.0, + "message": "judge is not ready", + "health": READY_PAYLOAD, + }, + ) + return try: content_length = int(self.headers.get("Content-Length", "0")) @@ -79,11 +178,23 @@ def do_POST(self) -> None: try: payload = json.loads(self.rfile.read(content_length).decode("utf-8")) + submission_kind = payload.get("submission_kind", "file") + if submission_kind == "directory": + archive_b64 = payload.get("archive_b64") + if not isinstance(archive_b64, str) or not archive_b64: + raise ValueError( + "directory submission must include archive_b64" + ) + self._write_json(200, evaluate_archive(archive_b64)) + return code = payload.get("code") if not isinstance(code, str) or not code.strip(): - raise ValueError("request JSON must include non-empty string field 'code'") + raise ValueError( + "file submission must include non-empty string field 'code'" + ) self._write_json(200, evaluate_code(code)) except Exception: + print(traceback.format_exc(), flush=True) self._write_json( 200, { @@ -101,6 +212,7 @@ def log_message(self, fmt: str, *args: object) -> None: def main() -> None: port = int(os.environ.get("PORT", "8082")) server = ThreadingHTTPServer(("0.0.0.0", port), JudgeHandler) + threading.Thread(target=prepare_evaluator, daemon=True).start() server.serve_forever() diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/submit.py b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/submit.py index 0f89f1f30..ad236d89a 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/submit.py +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/submit.py @@ -9,17 +9,27 @@ import time import traceback import uuid +import base64 +import io +import tarfile from datetime import datetime, timezone from pathlib import Path import requests SOLUTION_PATH = Path("/app/solution.py") +SUBMISSION_CONFIG_PATH = Path("/app/submission_config.json") SUBMISSIONS_LOG = Path("/logs/agent/submissions.jsonl") JUDGE_URL = os.environ.get("JUDGE_URL", "http://judge:8082").rstrip("/") JUDGE_TIMEOUT_SECONDS = int(os.environ.get("JUDGE_TIMEOUT_SECONDS", "10800")) +def load_submission_config() -> dict: + if not SUBMISSION_CONFIG_PATH.exists(): + return {"kind": "file", "path": str(SOLUTION_PATH), "exclude": []} + return json.loads(SUBMISSION_CONFIG_PATH.read_text(encoding="utf-8")) + + def now_iso() -> str: return ( datetime.now(timezone.utc) @@ -35,24 +45,72 @@ def log_record(record: dict) -> None: def wait_for_judge() -> None: - deadline = time.time() + 60 + deadline = time.time() + JUDGE_TIMEOUT_SECONDS last_error: Exception | None = None + printed_waiting = False while time.time() < deadline: try: response = requests.get(f"{JUDGE_URL}/health", timeout=5) - if response.status_code == 200: - return except Exception as exc: last_error = exc + if not printed_waiting: + print("[submit] waiting for judge service", flush=True) + printed_waiting = True + time.sleep(1) + continue + + try: + payload = response.json() + except Exception: + payload = {} + if payload.get("status") == "error": + raise RuntimeError(f"judge setup failed: {payload.get('error') or payload}") + if response.status_code == 200: + prepare_seconds = payload.get("prepare_seconds") + suffix = ( + f" prepare_seconds={float(prepare_seconds):.2f}" + if isinstance(prepare_seconds, (int, float)) + else "" + ) + print(f"[submit] judge ready{suffix}", flush=True) + return + if not printed_waiting: + print("[submit] waiting for judge to finish environment setup", flush=True) + printed_waiting = True time.sleep(1) raise RuntimeError(f"judge service is not ready at {JUDGE_URL}: {last_error}") -def evaluate_with_judge(code: str) -> tuple[float, float, str]: +def should_exclude(path: Path, root: Path, exclude: list[str]) -> bool: + rel = path.relative_to(root).as_posix() + parts = set(path.relative_to(root).parts) + for pattern in exclude: + pattern = str(pattern).strip("/") + if not pattern: + continue + if rel == pattern or rel.startswith(pattern + "/") or pattern in parts: + return True + return False + + +def make_directory_archive(root: Path, exclude: list[str]) -> tuple[str, int]: + buf = io.BytesIO() + file_count = 0 + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + for path in sorted(root.rglob("*")): + if should_exclude(path, root, exclude): + continue + if path.is_file(): + tar.add(path, arcname=path.relative_to(root).as_posix()) + file_count += 1 + return base64.b64encode(buf.getvalue()).decode("ascii"), file_count + + +def evaluate_with_judge(payload: dict) -> tuple[float, float, str, dict]: wait_for_judge() response = requests.post( f"{JUDGE_URL}/evaluate", - json={"code": code}, + json=payload, timeout=JUDGE_TIMEOUT_SECONDS, ) response.raise_for_status() @@ -63,13 +121,19 @@ def evaluate_with_judge(code: str) -> tuple[float, float, str]: float(payload.get("score", 0.0)), float(payload.get("score_unbounded", payload.get("score", 0.0))), str(payload.get("message", "")), + dict(payload.get("metrics", {}) or {}), ) def main() -> int: - solution_path = Path(sys.argv[1] if len(sys.argv) > 1 else SOLUTION_PATH) + config = load_submission_config() + submission_kind = str(config.get("kind", "file")) + default_path = str(config.get("path") or SOLUTION_PATH) + solution_path = Path(sys.argv[1] if len(sys.argv) > 1 else default_path) + exclude = list(config.get("exclude", []) or []) sub_uuid = str(uuid.uuid4()) code_chars = 0 + file_count = 0 log_record( { @@ -77,6 +141,7 @@ def main() -> int: "ts": now_iso(), "status": "started", "solution_path": str(solution_path), + "submission_kind": submission_kind, "code_chars": code_chars, } ) @@ -94,25 +159,51 @@ def main() -> int: ) return 2 - code = solution_path.read_text(encoding="utf-8") - code_chars = len(code) - if not code.strip(): - msg = f"Solution file {solution_path} is empty" - print(f"[submit] ERROR: {msg}", file=sys.stderr) - log_record( - { - "submission_uuid": sub_uuid, - "ts": now_iso(), - "status": "error", - "error": msg, - "code_chars": code_chars, - } + if submission_kind == "directory": + if not solution_path.is_dir(): + msg = f"Submission path {solution_path} is not a directory" + print(f"[submit] ERROR: {msg}", file=sys.stderr) + log_record( + { + "submission_uuid": sub_uuid, + "ts": now_iso(), + "status": "error", + "error": msg, + "code_chars": code_chars, + } + ) + return 2 + archive_b64, file_count = make_directory_archive(solution_path, exclude) + code_chars = sum( + p.stat().st_size + for p in solution_path.rglob("*") + if p.is_file() and not should_exclude(p, solution_path, exclude) ) - return 2 + judge_payload = { + "submission_kind": "directory", + "archive_b64": archive_b64, + } + else: + code = solution_path.read_text(encoding="utf-8") + code_chars = len(code) + if not code.strip(): + msg = f"Solution file {solution_path} is empty" + print(f"[submit] ERROR: {msg}", file=sys.stderr) + log_record( + { + "submission_uuid": sub_uuid, + "ts": now_iso(), + "status": "error", + "error": msg, + "code_chars": code_chars, + } + ) + return 2 + judge_payload = {"submission_kind": "file", "code": code} try: start = time.time() - score, score_unbounded, message = evaluate_with_judge(code) + score, score_unbounded, message, metrics = evaluate_with_judge(judge_payload) elapsed_seconds = time.time() - start reward = float(score) / 100.0 @@ -127,6 +218,8 @@ def main() -> int: "elapsed_seconds": elapsed_seconds, "detail": message, "code_chars": code_chars, + "file_count": file_count, + "metrics": metrics, } ) @@ -140,6 +233,11 @@ def main() -> int: if message: snippet = message if len(message) <= 800 else message[:800] + "..." print(f"[submit] detail: {snippet}") + if metrics: + metric_text = " ".join( + f"{key}={value}" for key, value in sorted(metrics.items()) + ) + print(f"[submit] metrics: {metric_text}") return 0 except Exception as exc: detail = traceback.format_exc() diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/task.toml b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/task.toml index 30d9d5d18..6d721e89a 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/task.toml +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/task.toml @@ -19,9 +19,9 @@ timeout_sec = {timeout_sec} timeout_sec = {agent_timeout_sec} [environment] -build_timeout_sec = 600.0 -cpus = 2 -memory_mb = 4096 -storage_mb = 4096 +build_timeout_sec = {environment_build_timeout_sec} +cpus = {cpus} +memory_mb = {memory_mb} +storage_mb = {storage_mb} gpus = 0 allow_internet = true diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/tests/evaluate.py b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/tests/evaluate.py index 86a770446..4e5ada805 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/tests/evaluate.py +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/tests/evaluate.py @@ -6,10 +6,13 @@ import importlib.util import json import shutil +import sys import traceback from pathlib import Path SOLUTION_PATH = Path("/app/solution.py") +APP_PATH = Path("/app") +SUBMISSION_CONFIG_PATH = Path("/app/submission_config.json") PROBLEM_EVALUATOR_PATH = Path("/tests/problem_evaluator.py") REWARD_TXT = Path("/logs/verifier/reward.txt") REWARD_JSON = Path("/logs/verifier/reward.json") @@ -69,10 +72,32 @@ def load_problem_evaluator(): if spec is None or spec.loader is None: raise RuntimeError(f"could not load evaluator from {PROBLEM_EVALUATOR_PATH}") module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module spec.loader.exec_module(module) return module +def normalize_result(result): + if not isinstance(result, tuple) or len(result) not in (3, 4): + raise TypeError("evaluator must return (score, score_unbounded, message[, metrics])") + score = float(result[0]) + score_unbounded = float(result[1]) + message = str(result[2]) + metrics = result[3] if len(result) == 4 else {} + if not isinstance(metrics, dict): + raise TypeError("evaluator metrics must be a dict") + return score, score_unbounded, message, metrics + + +def load_submission_path() -> Path: + if not SUBMISSION_CONFIG_PATH.exists(): + return SOLUTION_PATH + config = json.loads(SUBMISSION_CONFIG_PATH.read_text(encoding="utf-8")) + if config.get("kind") == "directory": + return Path(config.get("path") or APP_PATH) + return Path(config.get("path") or SOLUTION_PATH) + + def main() -> None: copy_submissions_log() best = best_submission() @@ -83,6 +108,9 @@ def write_best_submission_reward(reason: str) -> bool: reward = float(best.get("score", 0.0)) score_raw = best.get("score_raw", reward * 100.0) score_unbounded = best.get("score_unbounded", score_raw) + metrics = best.get("metrics", {}) + if not isinstance(metrics, dict): + metrics = {} print(f"Using best iterative submission after {reason}: reward={reward:.4f}") write_reward( reward, @@ -92,17 +120,19 @@ def write_best_submission_reward(reason: str) -> bool: "score_unbounded": score_unbounded, "best_submission_reward": reward, "used_best_submission": 1, + **metrics, }, ) return True - if not SOLUTION_PATH.exists(): - print("ERROR: /app/solution.py not found") - if write_best_submission_reward("solution.py not found"): + solution_path = load_submission_path() + if not solution_path.exists(): + print(f"ERROR: {solution_path} not found") + if write_best_submission_reward(f"{solution_path} not found"): return - write_reward(0.0, "solution.py not found") + write_reward(0.0, f"{solution_path} not found") return - if not SOLUTION_PATH.read_text(encoding="utf-8").strip(): + if solution_path.is_file() and not solution_path.read_text(encoding="utf-8").strip(): print("ERROR: /app/solution.py is empty") if write_best_submission_reward("solution.py is empty"): return @@ -111,7 +141,9 @@ def write_best_submission_reward(reason: str) -> bool: try: evaluator = load_problem_evaluator() - score, score_unbounded, message = evaluator.evaluate(str(SOLUTION_PATH)) + score, score_unbounded, message, metrics = normalize_result( + evaluator.evaluate(str(solution_path)) + ) reward = float(score) / 100.0 if best is not None and float(best.get("score", 0.0)) > reward: write_best_submission_reward("final solution scored below best submission") @@ -126,6 +158,7 @@ def write_best_submission_reward(reason: str) -> bool: { "score": score, "score_unbounded": score_unbounded, + **metrics, }, ) except Exception as exc: diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/utils.py b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/utils.py index bf833b3f4..879f48a2c 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/utils.py +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/utils.py @@ -20,6 +20,7 @@ class FrontierCS20Problem: language: str timeout_seconds: int docker_image: str + config: dict[str, Any] def load_problem_config(config_path: Path) -> dict[str, Any]: diff --git a/src/frontier_cs/cli.py b/src/frontier_cs/cli.py index 6d1a45009..92ae0afcc 100644 --- a/src/frontier_cs/cli.py +++ b/src/frontier_cs/cli.py @@ -1217,11 +1217,17 @@ def _harbor_trial_summary_payload(trial_dir: Path) -> dict: ) has_reward = rewards.get("reward") is not None agent_status = exception.get("exception_type") if exception else "completed" + reward_metrics = { + key: value + for key, value in rewards.items() + if key not in {"reward", "score", "score_unbounded"} + } payload = { "reward": rewards.get("reward"), "score": rewards.get("score"), "score_unbounded": rewards.get("score_unbounded"), + "metrics": reward_metrics or None, "trial_status": "scored" if has_reward else agent_status, "agent_status": agent_status, "agent_error_summary": _summarize_agent_exception(exception) @@ -1403,7 +1409,7 @@ def _score_bar(scores: list[float]) -> str: def _print_harbor_submission_event( - index: int, event: dict, scores: list[float] + index: int, event: dict, scores: list[float], previous_score: float | None ) -> None: timestamp = event.get("timestamp") or time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime() @@ -1424,8 +1430,8 @@ def _print_harbor_submission_event( print( "[frontier harbor] " f"score bar: {_score_bar(scores)} " - f"({_format_score(scores[0] if scores else None)} -> " - f"{_format_score(scores[-1] if scores else None)})", + f"({_format_score(previous_score if previous_score is not None else 0.0)} -> " + f"{_format_score(score)})", file=sys.stderr, flush=True, ) @@ -1513,9 +1519,12 @@ def read_stdout() -> None: ): submission_count += 1 score = _parse_float(event.get("score")) + previous_score = scores[-1] if scores else 0.0 if score is not None: scores.append(score) - _print_harbor_submission_event(submission_count, event, scores) + _print_harbor_submission_event( + submission_count, event, scores, previous_score + ) if stdout_closed and process.poll() is not None: break @@ -1585,6 +1594,8 @@ def run_harbor(args: argparse.Namespace) -> int: env["FRONTIER_CS_ALGORITHMIC_PATH"] = str(_repo_root() / "algorithmic") _progress(f"Starting Harbor trial {task_name}") + if args.track == "2.0": + _progress("Building/preparing Harbor environment; judge readiness appears at submission time") if args.verbose: print("Running Harbor trial...") print(" ".join(command)) diff --git a/src/frontier_cs/config.py b/src/frontier_cs/config.py index 5ccc4910a..f4825d8f0 100644 --- a/src/frontier_cs/config.py +++ b/src/frontier_cs/config.py @@ -213,6 +213,11 @@ class LanguageConfig: extension="cpp", code_block_tag="cpp", ), + "rust": LanguageConfig( + name="rust", + extension="rs", + code_block_tag="rust", + ), } DEFAULT_LANGUAGE = "python" From a5fad28582cd2255da8dc03969deee9dc64b0733 Mon Sep 17 00:00:00 2001 From: Qiuyang Mang Date: Fri, 29 May 2026 03:32:55 -0700 Subject: [PATCH 2/2] Add Vector DB ANN Frontier-CS 2.0 task --- 2.0/problems/vector_db_ann/config.yaml | 32 + 2.0/problems/vector_db_ann/evaluate.sh | 38 + 2.0/problems/vector_db_ann/evaluator.py | 657 ++++++++++++++++++ .../vector_db_ann/harbor/app/Cargo.toml | 14 + .../vector_db_ann/harbor/app/LICENSE.KCORES | 21 + .../vector_db_ann/harbor/app/README.md | 26 + .../vector_db_ann/harbor/app/src/api.rs | 46 ++ .../vector_db_ann/harbor/app/src/db.rs | 23 + .../vector_db_ann/harbor/app/src/distance.rs | 9 + .../vector_db_ann/harbor/app/src/main.rs | 54 ++ 2.0/problems/vector_db_ann/readme | 140 ++++ 2.0/problems/vector_db_ann/reference.py | 6 + 2.0/problems/vector_db_ann/reference.rs | 41 ++ 13 files changed, 1107 insertions(+) create mode 100644 2.0/problems/vector_db_ann/config.yaml create mode 100755 2.0/problems/vector_db_ann/evaluate.sh create mode 100644 2.0/problems/vector_db_ann/evaluator.py create mode 100644 2.0/problems/vector_db_ann/harbor/app/Cargo.toml create mode 100644 2.0/problems/vector_db_ann/harbor/app/LICENSE.KCORES create mode 100644 2.0/problems/vector_db_ann/harbor/app/README.md create mode 100644 2.0/problems/vector_db_ann/harbor/app/src/api.rs create mode 100644 2.0/problems/vector_db_ann/harbor/app/src/db.rs create mode 100644 2.0/problems/vector_db_ann/harbor/app/src/distance.rs create mode 100644 2.0/problems/vector_db_ann/harbor/app/src/main.rs create mode 100644 2.0/problems/vector_db_ann/readme create mode 100644 2.0/problems/vector_db_ann/reference.py create mode 100644 2.0/problems/vector_db_ann/reference.rs diff --git a/2.0/problems/vector_db_ann/config.yaml b/2.0/problems/vector_db_ann/config.yaml new file mode 100644 index 000000000..e6d48943a --- /dev/null +++ b/2.0/problems/vector_db_ann/config.yaml @@ -0,0 +1,32 @@ +tag: systems +runtime: + language: rust + timeout_seconds: 10800 + environment: "Rust project; hidden ANN benchmark; Python/NumPy judge" + apt_packages: + - build-essential + - cargo + - git + - rustc + judge_apt_packages: + - build-essential + - cargo + - rustc + - python3-pip + - python3-numpy + judge_pip_packages: + - faiss-cpu + docker: + image: ubuntu:24.04 +environment: + cpus: 4 + memory_mb: 8192 + storage_mb: 8192 + build_timeout_seconds: 3600 +submission: + kind: directory + path: /app + exclude: + - target + - .git + - .frontier-cs diff --git a/2.0/problems/vector_db_ann/evaluate.sh b/2.0/problems/vector_db_ann/evaluate.sh new file mode 100755 index 000000000..6bed53333 --- /dev/null +++ b/2.0/problems/vector_db_ann/evaluate.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) + +if [[ $# -gt 0 ]]; then + exec python3 "$SCRIPT_DIR/evaluator.py" "$@" +fi + +SOLUTION="/work/execution_env/solution_env/solution.rs" +if [[ ! -f "$SOLUTION" ]]; then + echo "Error: Missing $SOLUTION" >&2 + exit 1 +fi + +if ! command -v cargo >/dev/null 2>&1 || ! python3 -c 'import numpy, faiss' >/dev/null 2>&1; then + export DEBIAN_FRONTEND=noninteractive + apt-get update -qq + apt-get install -y -qq --no-install-recommends \ + build-essential cargo rustc python3-pip python3-numpy >/dev/null + python3 -c 'import faiss' >/dev/null 2>&1 || \ + pip3 install --break-system-packages -q faiss-cpu +fi + +WORKDIR=$(mktemp -d) +trap 'rm -rf "$WORKDIR"' EXIT +cp -R "$SCRIPT_DIR/harbor/app/." "$WORKDIR/" +cp "$SOLUTION" "$WORKDIR/src/db.rs" + +# The repository validator checks that the evaluator path works; the full +# 1M-vector benchmark is exercised through Harbor. +export FRONTIER_VECTOR_DB_N="${FRONTIER_VECTOR_DB_N:-5000}" +export FRONTIER_VECTOR_DB_Q="${FRONTIER_VECTOR_DB_Q:-16}" +export FRONTIER_VECTOR_DB_WARMUP="${FRONTIER_VECTOR_DB_WARMUP:-4}" +export FRONTIER_VECTOR_DB_BATCH_SIZE="${FRONTIER_VECTOR_DB_BATCH_SIZE:-500}" +export FRONTIER_VECTOR_DB_CACHE="${FRONTIER_VECTOR_DB_CACHE:-/tmp/frontier_vector_db_ann_ci}" + +python3 "$SCRIPT_DIR/evaluator.py" "$WORKDIR" diff --git a/2.0/problems/vector_db_ann/evaluator.py b/2.0/problems/vector_db_ann/evaluator.py new file mode 100644 index 000000000..edf3ea408 --- /dev/null +++ b/2.0/problems/vector_db_ann/evaluator.py @@ -0,0 +1,657 @@ +#!/usr/bin/env python3 +"""Evaluator for the Frontier-CS 2.0 Vector DB ANN task.""" + +from __future__ import annotations + +import json +import os +import shutil +import socket +import subprocess +import tempfile +import time +import math +from dataclasses import dataclass +from concurrent.futures import ThreadPoolExecutor, as_completed +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from urllib import request + +import numpy as np + +DIM = 128 +N_BASE = int(os.environ.get("FRONTIER_VECTOR_DB_N", "1000000")) +N_QUERIES = int(os.environ.get("FRONTIER_VECTOR_DB_Q", "256")) +TOP_K = int(os.environ.get("FRONTIER_VECTOR_DB_TOP_K", "10")) +SEED = int(os.environ.get("FRONTIER_VECTOR_DB_SEED", "20260528")) +TARGET_RECALL = float(os.environ.get("FRONTIER_VECTOR_DB_TARGET_RECALL", "0.95")) +QUERY_NOISE = float(os.environ.get("FRONTIER_VECTOR_DB_QUERY_NOISE", "0.02")) +BUILD_TIMEOUT_SECONDS = int(os.environ.get("FRONTIER_VECTOR_DB_BUILD_TIMEOUT", "600")) +LOAD_TIMEOUT_SECONDS = int(os.environ.get("FRONTIER_VECTOR_DB_LOAD_TIMEOUT", "900")) +LOAD_PENALTY_WEIGHT = float(os.environ.get("FRONTIER_VECTOR_DB_LOAD_PENALTY", "0.01")) +BATCH_SIZE = int(os.environ.get("FRONTIER_VECTOR_DB_BATCH_SIZE", "1000")) +CONCURRENCY = int(os.environ.get("FRONTIER_VECTOR_DB_CONCURRENCY", "4")) +WARMUP = int(os.environ.get("FRONTIER_VECTOR_DB_WARMUP", "32")) +CACHE_DIR = Path(os.environ.get("FRONTIER_VECTOR_DB_CACHE", "/tmp/frontier_vector_db_ann")) + +_BENCHMARK: "Benchmark | None" = None + + +@dataclass +class Benchmark: + base_path: Path + queries_path: Path + truth: np.ndarray + baseline_qps: float + baseline_effective_qps: float + baseline_seconds: float + baseline_load_seconds: float + + +class QueryTooSlow(RuntimeError): + def __init__( + self, + elapsed_seconds: float, + optimistic_qps: float, + optimistic_effective_qps: float, + completed_queries: int, + ): + super().__init__( + "query phase stopped early because even an optimistic final " + "effective_qps cannot beat the baseline" + ) + self.elapsed_seconds = elapsed_seconds + self.optimistic_qps = optimistic_qps + self.optimistic_effective_qps = optimistic_effective_qps + self.completed_queries = completed_queries + + +class LoadTooSlow(RuntimeError): + def __init__( + self, + load_seconds: float, + optimistic_effective_qps: float, + inserted: int, + ): + super().__init__( + "load/index phase stopped early because even zero query time " + "cannot beat the baseline effective_qps" + ) + self.load_seconds = load_seconds + self.optimistic_effective_qps = optimistic_effective_qps + self.inserted = inserted + + +def prepare() -> dict: + print( + f"[vector-db-ann] preparing benchmark n_base={N_BASE} " + f"n_queries={N_QUERIES} top_k={TOP_K} cache={CACHE_DIR}", + flush=True, + ) + benchmark = _ensure_benchmark() + print( + f"[vector-db-ann] benchmark ready baseline_qps={benchmark.baseline_qps:.6f} " + f"baseline_effective_qps={benchmark.baseline_effective_qps:.6f} " + f"baseline_seconds={benchmark.baseline_seconds:.6f} " + f"baseline_load_seconds={benchmark.baseline_load_seconds:.6f}", + flush=True, + ) + return { + "n_base": N_BASE, + "n_queries": N_QUERIES, + "top_k": TOP_K, + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + } + + +def _write_vectors(path: Path, values: np.ndarray) -> None: + values.astype(np.float32, copy=False).tofile(path) + + +def _load_vectors(path: Path, rows: int) -> np.memmap: + return np.memmap(path, dtype=np.float32, mode="r", shape=(rows, DIM)) + + +def _generate_data(base_path: Path, queries_path: Path) -> None: + rng = np.random.default_rng(SEED) + chunk = 50_000 + base = np.memmap(base_path, dtype=np.float32, mode="w+", shape=(N_BASE, DIM)) + for start in range(0, N_BASE, chunk): + end = min(start + chunk, N_BASE) + base[start:end] = rng.standard_normal((end - start, DIM), dtype=np.float32) + base.flush() + + ids = rng.integers(0, N_BASE, size=N_QUERIES) + selected = np.asarray(base[ids], dtype=np.float32) + noise = rng.standard_normal((N_QUERIES, DIM), dtype=np.float32) * QUERY_NOISE + _write_vectors(queries_path, selected + noise) + + +def _run_reference_server(port: int, base_path: str) -> None: + import faiss + + _ = base_path + index = faiss.IndexIDMap(faiss.IndexFlatL2(DIM)) + + class ReferenceHandler(BaseHTTPRequestHandler): + server_version = "FrontierVectorFaissReference/1.0" + + def _write_json(self, status: int, payload: dict) -> None: + body = json.dumps(payload).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self) -> None: + if self.path == "/health": + self._write_json(200, {"status": "ok"}) + return + self._write_json(404, {"status": "error", "error": "not found"}) + + def do_POST(self) -> None: + if self.path == "/insert": + self._write_json(200, {"status": "ok"}) + return + if self.path == "/bulk_insert": + try: + length = int(self.headers.get("Content-Length", "0")) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + items = payload.get("vectors", []) + inserted = len(items) + if inserted: + vectors = np.asarray( + [item["vector"] for item in items], dtype=np.float32 + ) + ids = np.asarray( + [int(item["id"]) for item in items], dtype=np.int64 + ) + if vectors.shape != (inserted, DIM): + raise ValueError("bulk_insert vector shape mismatch") + index.add_with_ids(vectors, ids) + self._write_json(200, {"status": "ok", "inserted": inserted}) + except Exception as exc: + self._write_json(400, {"status": "error", "error": str(exc)}) + return + if self.path != "/search": + self._write_json(404, {"status": "error", "error": "not found"}) + return + try: + length = int(self.headers.get("Content-Length", "0")) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + vector = np.asarray(payload["vector"], dtype=np.float32) + top_k = int(payload.get("top_k", TOP_K)) + if vector.shape != (DIM,): + raise ValueError("query vector has wrong dimension") + if top_k != TOP_K: + raise ValueError("unexpected top_k") + distances, ids = index.search(vector.reshape(1, DIM), top_k) + results = [ + {"id": int(id_), "distance": float(distance)} + for id_, distance in zip(ids[0], distances[0]) + ] + self._write_json(200, {"results": results}) + except Exception as exc: + self._write_json(400, {"status": "error", "error": str(exc)}) + + def log_message(self, fmt: str, *args: object) -> None: + return + + ThreadingHTTPServer(("127.0.0.1", port), ReferenceHandler).serve_forever() + + +def _measure_reference_baseline(base_path: Path, queries: np.ndarray): + port = _free_port() + process = subprocess.Popen( + [ + "python3", + str(Path(__file__).resolve()), + "--reference-server", + str(port), + str(base_path), + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + try: + _wait_for_server(port, time.time() + 120) + base = _load_vectors(base_path, N_BASE) + inserted, load_seconds = _bulk_insert(f"http://127.0.0.1:{port}", base) + if inserted != N_BASE: + raise RuntimeError(f"reference inserted {inserted}; expected {N_BASE}") + results, latencies, baseline_seconds = _run_queries( + f"http://127.0.0.1:{port}", queries + ) + return results, latencies, baseline_seconds, load_seconds + finally: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + +def _ensure_benchmark() -> Benchmark: + global _BENCHMARK + if _BENCHMARK is not None: + return _BENCHMARK + + CACHE_DIR.mkdir(parents=True, exist_ok=True) + base_path = CACHE_DIR / f"base_n{N_BASE}_d{DIM}_seed{SEED}.f32" + queries_path = CACHE_DIR / f"queries_q{N_QUERIES}_d{DIM}_seed{SEED}.f32" + truth_path = CACHE_DIR / ( + f"truth_n{N_BASE}_q{N_QUERIES}_k{TOP_K}_seed{SEED}.u32" + ) + meta_path = CACHE_DIR / ( + f"baseline_faiss_http_bulk_load_v3_n{N_BASE}_q{N_QUERIES}_k{TOP_K}_seed{SEED}.json" + ) + + expected_base_bytes = N_BASE * DIM * 4 + expected_query_bytes = N_QUERIES * DIM * 4 + if ( + not base_path.exists() + or base_path.stat().st_size != expected_base_bytes + or not queries_path.exists() + or queries_path.stat().st_size != expected_query_bytes + ): + print("[vector-db-ann] generating hidden vectors", flush=True) + _generate_data(base_path, queries_path) + truth_path.unlink(missing_ok=True) + meta_path.unlink(missing_ok=True) + + base = _load_vectors(base_path, N_BASE) + queries = _load_vectors(queries_path, N_QUERIES) + + if truth_path.exists() and meta_path.exists(): + truth = np.fromfile(truth_path, dtype=np.uint32).reshape(N_QUERIES, TOP_K) + meta = json.loads(meta_path.read_text(encoding="utf-8")) + baseline_seconds = float(meta["baseline_seconds"]) + baseline_qps = float(meta["baseline_qps"]) + baseline_load_seconds = float(meta["baseline_load_seconds"]) + baseline_effective_qps = float(meta["baseline_effective_qps"]) + else: + print("[vector-db-ann] running Faiss HTTP exact baseline", flush=True) + truth, _, baseline_seconds, baseline_load_seconds = _measure_reference_baseline( + base_path, queries + ) + truth.astype(np.uint32, copy=False).tofile(truth_path) + baseline_qps = N_QUERIES / baseline_seconds + baseline_effective_seconds = baseline_seconds + ( + LOAD_PENALTY_WEIGHT * baseline_load_seconds + ) + baseline_effective_qps = N_QUERIES / max(baseline_effective_seconds, 1e-9) + meta_path.write_text( + json.dumps( + { + "baseline_seconds": baseline_seconds, + "baseline_qps": baseline_qps, + "baseline_load_seconds": baseline_load_seconds, + "baseline_effective_qps": baseline_effective_qps, + }, + indent=2, + ), + encoding="utf-8", + ) + + _BENCHMARK = Benchmark( + base_path=base_path, + queries_path=queries_path, + truth=truth, + baseline_qps=baseline_qps, + baseline_effective_qps=baseline_effective_qps, + baseline_seconds=baseline_seconds, + baseline_load_seconds=baseline_load_seconds, + ) + return _BENCHMARK + + +def _invalid(message: str, metrics: dict | None = None): + return 0.0, 0.0, message, metrics or {} + + +def _copy_project(src: Path, dst: Path) -> None: + ignore = shutil.ignore_patterns("target", ".git", ".frontier-cs") + shutil.copytree(src, dst, ignore=ignore) + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return int(sock.getsockname()[1]) + + +def _post_json(url: str, payload: dict, timeout: float = 60.0) -> dict: + body = json.dumps(payload).encode("utf-8") + req = request.Request( + url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with request.urlopen(req, timeout=timeout) as response: + return json.loads(response.read().decode("utf-8")) + + +def _wait_for_server(port: int, deadline: float) -> None: + last_error: Exception | None = None + while time.time() < deadline: + try: + with socket.create_connection(("127.0.0.1", port), timeout=1.0): + pass + return + except Exception as exc: + last_error = exc + time.sleep(0.25) + raise RuntimeError(f"server did not become ready: {last_error}") + + +def _bulk_insert( + base_url: str, + base: np.ndarray, + *, + min_effective_qps: float | None = None, +) -> tuple[int, float]: + inserted = 0 + start_time = time.perf_counter() + deadline = start_time + LOAD_TIMEOUT_SECONDS + early_stop_deadline: float | None = None + if min_effective_qps is not None and LOAD_PENALTY_WEIGHT > 0.0: + early_stop_deadline = start_time + ( + N_QUERIES / (min_effective_qps * LOAD_PENALTY_WEIGHT) + ) + + def check_load_budget() -> None: + if early_stop_deadline is None: + return + load_seconds = max(time.perf_counter() - start_time, 1e-9) + optimistic_effective_seconds = LOAD_PENALTY_WEIGHT * load_seconds + optimistic_effective_qps = N_QUERIES / max( + optimistic_effective_seconds, 1e-9 + ) + if optimistic_effective_qps <= min_effective_qps: + raise LoadTooSlow( + load_seconds, + optimistic_effective_qps, + inserted, + ) + + for start in range(0, N_BASE, BATCH_SIZE): + check_load_budget() + remaining = deadline - time.perf_counter() + if remaining <= 0: + raise TimeoutError(f"bulk_insert timed out after {LOAD_TIMEOUT_SECONDS}s") + if early_stop_deadline is not None: + remaining = min(remaining, early_stop_deadline - time.perf_counter()) + if remaining <= 0: + check_load_budget() + raise TimeoutError("bulk_insert early-stop budget expired") + end = min(start + BATCH_SIZE, N_BASE) + vectors = [ + {"id": int(i), "vector": base[i].astype(float).tolist()} + for i in range(start, end) + ] + payload = {"vectors": vectors} + try: + response = _post_json( + f"{base_url}/bulk_insert", + payload, + timeout=max(0.05, min(300.0, remaining)), + ) + except Exception: + check_load_budget() + raise + inserted += int(response.get("inserted", 0)) + check_load_budget() + load_seconds = max(time.perf_counter() - start_time, 1e-9) + return inserted, load_seconds + + +def _search_one(base_url: str, query_index: int, vector: np.ndarray) -> tuple[int, list[int], float]: + start = time.perf_counter() + response = _post_json( + f"{base_url}/search", + {"vector": vector.astype(float).tolist(), "top_k": TOP_K}, + timeout=120.0, + ) + latency_ms = (time.perf_counter() - start) * 1000.0 + ids = [int(item.get("id", -1)) for item in response.get("results", [])[:TOP_K]] + return query_index, ids, latency_ms + + +def _run_queries( + base_url: str, + queries: np.ndarray, + *, + load_seconds: float = 0.0, + min_effective_qps: float | None = None, +) -> tuple[np.ndarray, list[float], float]: + for i in range(min(WARMUP, N_QUERIES)): + try: + _search_one(base_url, i, queries[i]) + except Exception: + pass + + results = np.zeros((N_QUERIES, TOP_K), dtype=np.uint32) + latencies: list[float] = [] + start = time.perf_counter() + pool = ThreadPoolExecutor(max_workers=CONCURRENCY) + futures = [ + pool.submit(_search_one, base_url, i, queries[i]) + for i in range(N_QUERIES) + ] + should_wait = True + try: + for future in as_completed(futures): + query_index, ids, latency_ms = future.result() + if len(ids) != TOP_K: + raise ValueError("search response did not contain top_k results") + if len(set(ids)) != len(ids): + raise ValueError("search response contains duplicate vector ids") + if any(id_ < 0 or id_ >= N_BASE for id_ in ids): + raise ValueError("search response contains an out-of-range vector id") + results[query_index] = np.asarray(ids, dtype=np.uint32) + latencies.append(latency_ms) + + if min_effective_qps is not None: + elapsed = max(time.perf_counter() - start, 1e-9) + optimistic_qps = N_QUERIES / elapsed + optimistic_effective_seconds = elapsed + ( + LOAD_PENALTY_WEIGHT * load_seconds + ) + optimistic_effective_qps = N_QUERIES / max( + optimistic_effective_seconds, 1e-9 + ) + if optimistic_effective_qps <= min_effective_qps: + should_wait = False + raise QueryTooSlow( + elapsed, + optimistic_qps, + optimistic_effective_qps, + len(latencies), + ) + finally: + pool.shutdown(wait=should_wait, cancel_futures=not should_wait) + duration = max(time.perf_counter() - start, 1e-9) + return results, latencies, duration + + +def _recall_at_k(results: np.ndarray, truth: np.ndarray) -> float: + hits = 0 + for got, expected in zip(results, truth): + hits += len(set(int(x) for x in got) & set(int(x) for x in expected)) + return hits / float(N_QUERIES * TOP_K) + + +def evaluate(solution_path: str): + root = Path(solution_path) + if not root.is_dir(): + return _invalid("submission path must be a Rust project directory") + if not (root / "Cargo.toml").exists(): + return _invalid("Cargo.toml not found in submission directory") + + benchmark = _ensure_benchmark() + + with tempfile.TemporaryDirectory(prefix="frontier_vector_db_ann_") as tmp: + workdir = Path(tmp) / "project" + _copy_project(root, workdir) + try: + subprocess.run( + ["cargo", "build", "--release", "--quiet"], + cwd=workdir, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=BUILD_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired: + return _invalid("cargo build timed out") + except subprocess.CalledProcessError as exc: + stderr = exc.stderr.decode("utf-8", errors="replace") + return _invalid(f"cargo build failed: {stderr[-800:]}") + + base = _load_vectors(benchmark.base_path, N_BASE) + queries = _load_vectors(benchmark.queries_path, N_QUERIES) + port = _free_port() + base_url = f"http://127.0.0.1:{port}" + process = subprocess.Popen( + ["cargo", "run", "--release", "--quiet"], + cwd=workdir, + env={**os.environ, "PORT": str(port)}, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + load_seconds = 0.0 + try: + _wait_for_server(port, time.time() + 30) + inserted, load_seconds = _bulk_insert( + base_url, + base, + min_effective_qps=benchmark.baseline_effective_qps, + ) + if inserted != N_BASE: + return _invalid(f"bulk_insert inserted {inserted}; expected {N_BASE}") + results, latencies, candidate_seconds = _run_queries( + base_url, + queries, + load_seconds=load_seconds, + min_effective_qps=benchmark.baseline_effective_qps, + ) + except QueryTooSlow as exc: + metrics = { + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "qps": 0.0, + "qps_upper_bound": exc.optimistic_qps, + "effective_qps": 0.0, + "effective_qps_upper_bound": exc.optimistic_effective_qps, + "candidate_seconds": exc.elapsed_seconds, + "load_seconds": load_seconds, + "load_penalty_weight": LOAD_PENALTY_WEIGHT, + "recall_at_10": 0.0, + "completed_queries": float(exc.completed_queries), + "early_stopped": 1.0, + } + return _invalid(str(exc), metrics) + except LoadTooSlow as exc: + metrics = { + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "qps": 0.0, + "effective_qps": 0.0, + "effective_qps_upper_bound": exc.optimistic_effective_qps, + "candidate_seconds": 0.0, + "load_seconds": exc.load_seconds, + "load_penalty_weight": LOAD_PENALTY_WEIGHT, + "recall_at_10": 0.0, + "inserted_vectors": float(exc.inserted), + "early_stopped": 1.0, + } + return _invalid(str(exc), metrics) + except Exception as exc: + stderr = b"" + if process.poll() is not None and process.stderr is not None: + stderr = process.stderr.read()[-800:] + metrics = { + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "qps": 0.0, + "effective_qps": 0.0, + "candidate_seconds": 0.0, + "load_seconds": 0.0, + "recall_at_10": 0.0, + } + detail = stderr.decode("utf-8", errors="replace") + suffix = f": {detail}" if detail else "" + return _invalid(f"candidate benchmark failed: {exc}{suffix}", metrics) + finally: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + recall = _recall_at_k(results, benchmark.truth) + qps = N_QUERIES / candidate_seconds + effective_seconds = candidate_seconds + (LOAD_PENALTY_WEIGHT * load_seconds) + effective_qps = N_QUERIES / max(effective_seconds, 1e-9) + if recall < TARGET_RECALL or effective_qps <= benchmark.baseline_effective_qps: + score = 0.0 + else: + score = 100.0 * ( + 1.0 - math.sqrt(benchmark.baseline_effective_qps) / math.sqrt(effective_qps) + ) + + metrics = { + "qps": qps, + "effective_qps": effective_qps, + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "recall_at_10": recall, + "candidate_seconds": candidate_seconds, + "load_seconds": load_seconds, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "load_penalty_weight": LOAD_PENALTY_WEIGHT, + "avg_latency_ms": float(np.mean(latencies)) if latencies else 0.0, + "p50_latency_ms": float(np.percentile(latencies, 50)) if latencies else 0.0, + "p95_latency_ms": float(np.percentile(latencies, 95)) if latencies else 0.0, + "p99_latency_ms": float(np.percentile(latencies, 99)) if latencies else 0.0, + "concurrency": float(CONCURRENCY), + "n_base": float(N_BASE), + "n_queries": float(N_QUERIES), + "top_k": float(TOP_K), + } + message = ( + f"N={N_BASE}; Q={N_QUERIES}; top_k={TOP_K}; " + f"recall_at_10={recall:.6f}; qps={qps:.6f}; " + f"effective_qps={effective_qps:.6f}; " + f"baseline_effective_qps={benchmark.baseline_effective_qps:.6f}; " + f"load_seconds={load_seconds:.6f}; score={score:.6f}" + ) + return score, score, message, metrics + + +if __name__ == "__main__": + import sys + + if len(sys.argv) == 4 and sys.argv[1] == "--reference-server": + _run_reference_server(int(sys.argv[2]), sys.argv[3]) + raise SystemExit(0) + + if len(sys.argv) != 2: + print("usage: evaluator.py /path/to/rust/project", file=sys.stderr) + raise SystemExit(2) + bounded, unbounded, detail, metrics = evaluate(sys.argv[1]) + print(detail) + print(json.dumps(metrics, indent=2)) + print(f"{bounded:.12f} {unbounded:.12f}") diff --git a/2.0/problems/vector_db_ann/harbor/app/Cargo.toml b/2.0/problems/vector_db_ann/harbor/app/Cargo.toml new file mode 100644 index 000000000..53c3a8833 --- /dev/null +++ b/2.0/problems/vector_db_ann/harbor/app/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "vector-db-skeleton" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.7" +tokio = { version = "1", features = ["full"] } +serde = { version = "1", features = ["derive"] } + +[profile.release] +lto = true +codegen-units = 1 +debug = true diff --git a/2.0/problems/vector_db_ann/harbor/app/LICENSE.KCORES b/2.0/problems/vector_db_ann/harbor/app/LICENSE.KCORES new file mode 100644 index 000000000..c8393e377 --- /dev/null +++ b/2.0/problems/vector_db_ann/harbor/app/LICENSE.KCORES @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 karminski + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/2.0/problems/vector_db_ann/harbor/app/README.md b/2.0/problems/vector_db_ann/harbor/app/README.md new file mode 100644 index 000000000..c87b189f0 --- /dev/null +++ b/2.0/problems/vector_db_ann/harbor/app/README.md @@ -0,0 +1,26 @@ +# Vector DB Skeleton + +This is a starter project for the Vector DB ANN task. You may use it, modify it, +or replace it entirely. The judge only requires that `/app` builds with: + +```bash +cargo build --release +PORT= cargo run --release --quiet +``` + +and serves the required `/bulk_insert` and `/search` HTTP endpoints. + +The Harbor environment uses the Ubuntu `apt` Rust toolchain: + +```text +rustc 1.75 +cargo 1.75 +``` + +Pin crate versions if newer transitive dependencies require a newer Rust +compiler. + +## Attribution + +This starter skeleton is adapted from KCORES/vector-db-bench, licensed under +the MIT License. See `LICENSE.KCORES` for the upstream notice. diff --git a/2.0/problems/vector_db_ann/harbor/app/src/api.rs b/2.0/problems/vector_db_ann/harbor/app/src/api.rs new file mode 100644 index 000000000..3bb70396a --- /dev/null +++ b/2.0/problems/vector_db_ann/harbor/app/src/api.rs @@ -0,0 +1,46 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize)] +pub struct InsertRequest { + pub id: u64, + pub vector: Vec, +} + +#[derive(Serialize)] +pub struct InsertResponse { + pub status: String, +} + +#[derive(Deserialize)] +pub struct BulkInsertItem { + pub id: u64, + pub vector: Vec, +} + +#[derive(Deserialize)] +pub struct BulkInsertRequest { + pub vectors: Vec, +} + +#[derive(Serialize)] +pub struct BulkInsertResponse { + pub status: String, + pub inserted: usize, +} + +#[derive(Deserialize)] +pub struct SearchRequest { + pub vector: Vec, + pub top_k: u32, +} + +#[derive(Serialize)] +pub struct SearchResult { + pub id: u64, + pub distance: f64, +} + +#[derive(Serialize)] +pub struct SearchResponse { + pub results: Vec, +} diff --git a/2.0/problems/vector_db_ann/harbor/app/src/db.rs b/2.0/problems/vector_db_ann/harbor/app/src/db.rs new file mode 100644 index 000000000..e10305ff5 --- /dev/null +++ b/2.0/problems/vector_db_ann/harbor/app/src/db.rs @@ -0,0 +1,23 @@ +use crate::api::SearchResult; + +pub struct VectorDB { + // Implement your index here. +} + +impl VectorDB { + pub fn new() -> Self { + todo!("initialize your vector database") + } + + pub fn insert(&self, id: u64, vector: Vec) { + todo!("insert one vector") + } + + pub fn bulk_insert(&self, vectors: Vec<(u64, Vec)>) -> usize { + todo!("insert a batch of vectors") + } + + pub fn search(&self, vector: &[f32], top_k: u32) -> Vec { + todo!("return approximate nearest neighbors") + } +} diff --git a/2.0/problems/vector_db_ann/harbor/app/src/distance.rs b/2.0/problems/vector_db_ann/harbor/app/src/distance.rs new file mode 100644 index 000000000..c67b90928 --- /dev/null +++ b/2.0/problems/vector_db_ann/harbor/app/src/distance.rs @@ -0,0 +1,9 @@ +pub fn l2_distance(a: &[f32], b: &[f32]) -> f64 { + a.iter() + .zip(b) + .map(|(x, y)| { + let d = (*x as f64) - (*y as f64); + d * d + }) + .sum() +} diff --git a/2.0/problems/vector_db_ann/harbor/app/src/main.rs b/2.0/problems/vector_db_ann/harbor/app/src/main.rs new file mode 100644 index 000000000..2c4e6ec73 --- /dev/null +++ b/2.0/problems/vector_db_ann/harbor/app/src/main.rs @@ -0,0 +1,54 @@ +use axum::{extract::State, routing::post, Json, Router}; +use std::sync::Arc; +use tokio::net::TcpListener; + +mod api; +mod db; +mod distance; + +use api::*; +use db::VectorDB; + +#[tokio::main] +async fn main() { + let db = Arc::new(VectorDB::new()); + let app = Router::new() + .route("/insert", post(handle_insert)) + .route("/bulk_insert", post(handle_bulk_insert)) + .route("/search", post(handle_search)) + .with_state(db); + + let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string()); + let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} + +async fn handle_insert( + State(db): State>, + Json(req): Json, +) -> Json { + db.insert(req.id, req.vector); + Json(InsertResponse { + status: "ok".to_string(), + }) +} + +async fn handle_bulk_insert( + State(db): State>, + Json(req): Json, +) -> Json { + let vectors = req.vectors.into_iter().map(|v| (v.id, v.vector)).collect(); + let inserted = db.bulk_insert(vectors); + Json(BulkInsertResponse { + status: "ok".to_string(), + inserted, + }) +} + +async fn handle_search( + State(db): State>, + Json(req): Json, +) -> Json { + let results = db.search(&req.vector, req.top_k); + Json(SearchResponse { results }) +} diff --git a/2.0/problems/vector_db_ann/readme b/2.0/problems/vector_db_ann/readme new file mode 100644 index 000000000..59cf88f15 --- /dev/null +++ b/2.0/problems/vector_db_ann/readme @@ -0,0 +1,140 @@ +# Vector DB ANN + +## Problem + +Build a fast approximate nearest-neighbor vector search engine for a +SIFT1M-scale benchmark. + +The hidden benchmark contains exactly `1,000,000` base vectors with dimension +`128`. Queries use the same dimension, distance is squared Euclidean distance, +and each query asks for the top `10` nearest vector ids. + +Your objective is to maximize serving throughput while preserving search +quality: submissions must reach `recall@10 >= 0.95`, and valid submissions are +ranked by an effective QPS that includes query time plus a small load/index-build +penalty. + +The Harbor agent container starts with a small Rust skeleton project in +`/app`. You may use it, modify it, or replace it entirely. You may also use any +Rust crates, internal harness, data structures, and build layout you want, as +long as the final project satisfies the judge contract below. + +The judge builds and runs your service with: + +```bash +cargo build --release +PORT= cargo run --release --quiet +``` + +The Harbor environment uses the Ubuntu `apt` Rust toolchain: + +```text +rustc 1.75 +cargo 1.75 +``` + +If you add crates, choose versions compatible with this toolchain or pin +transitive dependencies accordingly. + +The service must listen on `PORT` and implement these endpoints: + +```text +POST /insert +POST /bulk_insert +POST /search +``` + +`/bulk_insert` receives: + +```json +{"vectors":[{"id":0,"vector":[0.1,0.2,...]}]} +``` + +and returns: + +```json +{"status":"ok","inserted":1} +``` + +`/search` receives: + +```json +{"vector":[0.1,0.2,...],"top_k":10} +``` + +and returns: + +```json +{"results":[{"id":0,"distance":0.0}]} +``` + +## Local Harness + +The official evaluator uses hidden data and a black-box judge. You may call: + +```bash +bash /app/submit.sh +``` + +at any time to submit the current `/app` project to the official judge and get +score feedback. + +## Validity + +A submission is valid if: + +1. It builds successfully with `cargo build --release`. +2. `cargo run --release --quiet` starts the service and implements the required HTTP + endpoints. +3. Every returned id is in `[0, 1_000_000)`. +4. Its `recall@10` is at least `0.95` against the hidden exact top-10 ground + truth. + +## Scoring + +The evaluator runs an official exact-search reference HTTP service once per +judge process on the same hidden benchmark and through the same `/bulk_insert` +and `/search` client harnesses to measure: + +```text +baseline_qps +baseline_load_seconds +``` + +Each submission is then timed independently. The load phase includes all +`/bulk_insert` calls and any index construction performed by the service before +queries begin. The query phase measures only `/search` throughput: + +```text +candidate_qps +candidate_load_seconds +``` + +The reported `qps` is the raw query-only QPS. Scoring uses an effective QPS +that includes a small index-build/load penalty: + +```text +effective_qps = Q / (query_seconds + 0.01 * load_seconds) +``` + +The load phase has a default `900s` timeout. This keeps the benchmark focused +on serving performance while still making very expensive offline indexing pay a +bounded, explicit cost. + +During evaluation, the load and query phases may stop early and return `0` once +the elapsed time plus the load penalty makes it impossible for the final +effective QPS to beat the baseline. During load, this assumes a best-case query +time of zero. + +If the submission is invalid, if `recall@10 < 0.95`, or if +`candidate_effective_qps <= baseline_effective_qps`, the score is `0`. +Otherwise: + +```text +score = 100 * (1 - sqrt(baseline_effective_qps) / sqrt(candidate_effective_qps)) +``` + +The bounded and unbounded score fields both report this score. Harbor JSON +results include the measured `qps`, `effective_qps`, `baseline_qps`, +`baseline_effective_qps`, `recall_at_10`, load time, and runtime metrics under +the `metrics` field. diff --git a/2.0/problems/vector_db_ann/reference.py b/2.0/problems/vector_db_ann/reference.py new file mode 100644 index 000000000..47a3ad563 --- /dev/null +++ b/2.0/problems/vector_db_ann/reference.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +"""Reference note for Vector DB ANN. + +This is a project-style Rust service task. Agents provide their own harness in +/app; the official Python evaluator is the hidden judge control plane. +""" diff --git a/2.0/problems/vector_db_ann/reference.rs b/2.0/problems/vector_db_ann/reference.rs new file mode 100644 index 000000000..fbd5b6934 --- /dev/null +++ b/2.0/problems/vector_db_ann/reference.rs @@ -0,0 +1,41 @@ +use crate::api::SearchResult; +use crate::distance::l2_distance; +use std::sync::RwLock; + +pub struct VectorDB { + vectors: RwLock)>>, +} + +impl VectorDB { + pub fn new() -> Self { + Self { + vectors: RwLock::new(Vec::new()), + } + } + + pub fn insert(&self, id: u64, vector: Vec) { + self.vectors.write().unwrap().push((id, vector)); + } + + pub fn bulk_insert(&self, vectors: Vec<(u64, Vec)>) -> usize { + let inserted = vectors.len(); + self.vectors.write().unwrap().extend(vectors); + inserted + } + + pub fn search(&self, vector: &[f32], top_k: u32) -> Vec { + let mut scored: Vec = self + .vectors + .read() + .unwrap() + .iter() + .map(|(id, candidate)| SearchResult { + id: *id, + distance: l2_distance(vector, candidate), + }) + .collect(); + scored.sort_by(|a, b| a.distance.total_cmp(&b.distance)); + scored.truncate(top_k as usize); + scored + } +}