Skip to content
Merged
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ graph TD
- **Selection** (`evoforge/core/selection.py`) — four strategies. Lexicase is the default and tends to maintain more diversity than tournament.
- **Search memory** (`evoforge/core/memory.py`) — tracks patterns that led to fitness gains and dead ends to avoid. Fed into LLM prompts so the model learns from the population's history.
- **Tree search** (`evoforge/backends/lean/tree_search.py`) — best-first search over REPL proof states. Used as a refinement step on promising partial proofs found by evolution.
- **LLM client** (`evoforge/llm/client.py`) — Anthropic API wrapper with exponential backoff, budget tracking, and graceful degradation (if calls fail, cheap operators fill in).
- **LLM client** (`evoforge/llm/client.py`) — Anthropic API wrapper with exponential backoff, prompt caching (90% input cost reduction on repeated system prompts), budget tracking, and graceful degradation (if calls fail, cheap operators fill in).
- **Batch collector** (`evoforge/llm/batch.py`) — optional Message Batch API integration that collects per-generation LLM requests into a single batch for 50% cost savings (stacks with prompt caching for up to 95% savings on cached input tokens). Falls back to individual calls on failure.

### Proof verification

Expand All @@ -89,8 +90,8 @@ evoforge/
cfd/ — CFD turbulence closure optimization: SymPy IR, solver adapter,
ablation credit, expression mutation operators
llm/ — Anthropic client, LLM mutation/crossover operators,
Jinja2 prompt templates
tests/ — 652 tests, strict mypy, ruff
Jinja2 prompt templates, batch API collector
tests/ — 679 tests, strict mypy, ruff
configs/ — TOML experiment configs
scripts/ — CLI entry point (run.py)
```
Expand Down Expand Up @@ -144,7 +145,7 @@ Experiments are configured via TOML files. See `configs/lean_default.toml` for a
| `[population]` | Size, elite count |
| `[selection]` | Strategy (lexicase, tournament, pareto, map_elites), parameters |
| `[mutation]` | LLM vs cheap operator weights, crossover weight |
| `[llm]` | Model, temperature schedule, token/cost budgets |
| `[llm]` | Model, temperature schedule, token/cost budgets, prompt caching, batch API |
| `[evolution]` | Max generations, stagnation window, tree search settings, checkpointing |
| `[backend]` | Theorem statement, project dir, imports, seed proofs |
| `[ablation]` | Flags to disable individual components for experiments |
Expand All @@ -167,6 +168,5 @@ Research software. The core evolutionary engine, LLM integration, and Lean 4 bac

Known limitations:
- Two backends (Lean 4 on hold, CFD active)
- LLM mutations are expensive and the search space is vast
- Tree search helps but is limited by the quality of tactic suggestions
- `greenlet` pinned to 3.1.0 due to a macOS compiler crash on newer versions
8 changes: 7 additions & 1 deletion evoforge/backends/lean/tactic_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import jinja2

from evoforge.llm.batch import batch_aware_generate

logger = logging.getLogger(__name__)

_TEMPLATES_DIR = Path(__file__).parent / "templates"
Expand Down Expand Up @@ -51,13 +53,17 @@ async def suggest_tactics(self, goal_state: str, proof_so_far: list[str], n: int
)
logger.debug("Requesting %d tactics for goal: %s", n, goal_state[:80])
try:
response = await self._client.async_generate(
response = await batch_aware_generate(
self._client,
prompt,
self._system_prompt,
self._model,
self._temperature,
self._max_tokens,
)
if response is None:
logger.warning("Tactic generation batch request failed")
return []
except (RuntimeError, TimeoutError):
logger.warning("Tactic generation LLM call failed", exc_info=True)
return []
Expand Down
3 changes: 3 additions & 0 deletions evoforge/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class LLMConfig(BaseModel):
max_calls: int = 1000
max_cost_usd: float = 50.0
max_attempts: int = 3
prompt_caching: bool = True
batch_enabled: bool = False
batch_poll_interval: float = 2.0


class EvalConfig(BaseModel):
Expand Down
69 changes: 40 additions & 29 deletions evoforge/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import asyncio
import contextlib
import logging
import random
from dataclasses import dataclass
Expand Down Expand Up @@ -41,6 +42,7 @@
SelectionStrategy,
)
from evoforge.core.types import Fitness, Individual
from evoforge.llm.batch import BatchCollector

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -283,36 +285,45 @@ async def run(self) -> ExperimentResult:
# Mutate (concurrently)
offspring_genomes: list[tuple[str, str, str]] = []
guidance = self._memory.prompt_section(max_tokens=200)
tasks: list[asyncio.Task[tuple[str, str, str] | None]] = []
for parent in parents:
operator = self._ensemble.select_operator()

# Skip LLM operators when per-gen budget exhausted
if operator.cost == "llm" and not self._scheduler.can_use_llm():
operator = self._ensemble.cheapest_operator()

# Pick a second parent for crossover operators
guidance_ind = None
if "crossover" in operator.name:
other_parents = [p for p in parents if p.ir_hash != parent.ir_hash]
if other_parents:
guidance_ind = random.choice(other_parents)
else:
guidance_ind = random.choice(parents)

context = MutationContext(
generation=gen,
memory=self._memory,
guidance=guidance,
temperature=self._temperature,
backend=self.backend,
credits=parent.credits,
guidance_individual=guidance_ind,
)
tasks.append(
asyncio.create_task(self._mutate_one(parent, operator, context))
batch_cm: Any = (
BatchCollector(
self.llm_client,
poll_interval=self.config.llm.batch_poll_interval,
)
results = await asyncio.gather(*tasks)
if self.config.llm.batch_enabled
else contextlib.nullcontext()
)
async with batch_cm:
tasks: list[asyncio.Task[tuple[str, str, str] | None]] = []
for parent in parents:
operator = self._ensemble.select_operator()

# Skip LLM operators when per-gen budget exhausted
if operator.cost == "llm" and not self._scheduler.can_use_llm():
operator = self._ensemble.cheapest_operator()

# Pick a second parent for crossover operators
guidance_ind = None
if "crossover" in operator.name:
other_parents = [p for p in parents if p.ir_hash != parent.ir_hash]
if other_parents:
guidance_ind = random.choice(other_parents)
else:
guidance_ind = random.choice(parents)

context = MutationContext(
generation=gen,
memory=self._memory,
guidance=guidance,
temperature=self._temperature,
backend=self.backend,
credits=parent.credits,
guidance_individual=guidance_ind,
)
tasks.append(
asyncio.create_task(self._mutate_one(parent, operator, context))
)
results = await asyncio.gather(*tasks)
for r in results:
if r is not None:
offspring_genomes.append(r)
Expand Down
167 changes: 167 additions & 0 deletions evoforge/llm/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright (c) 2026 evocode contributors. MIT License. See LICENSE.
"""Batch API support for collecting and submitting LLM requests as a Message Batch."""

from __future__ import annotations

import asyncio
import logging
from contextvars import ContextVar
from typing import TYPE_CHECKING, Any, NamedTuple

if TYPE_CHECKING:
from evoforge.llm.client import LLMClient, LLMResponse

logger = logging.getLogger(__name__)

_active_collector: ContextVar[BatchCollector | None] = ContextVar(
"_active_collector", default=None
)

_DEFAULT_MAX_WAIT: float = 1800.0 # 30 minutes


def get_batch_collector() -> BatchCollector | None:
"""Return the active BatchCollector, or None if not inside a batch context."""
return _active_collector.get()


async def batch_aware_generate(
client: LLMClient,
prompt: str,
system: str,
model: str,
temperature: float,
max_tokens: int,
) -> LLMResponse | None:
"""Generate via batch collector if active, otherwise via direct async call.

Returns None if the batch request failed; raises on direct-call failure.
"""
collector = get_batch_collector()
if collector is not None:
return await collector.register(prompt, system, model, temperature, max_tokens)
return await client.async_generate(prompt, system, model, temperature, max_tokens)


class _BatchRequest(NamedTuple):
prompt: str
system: str
model: str
temperature: float
max_tokens: int


class BatchCollector:
"""Async context manager that collects LLM requests and submits them as a batch."""

def __init__(
self,
client: LLMClient,
poll_interval: float = 2.0,
max_wait: float = _DEFAULT_MAX_WAIT,
) -> None:
self._client = client
self._poll_interval = poll_interval
self._max_wait = max_wait
self._requests: list[_BatchRequest] = []
self._futures: list[asyncio.Future[LLMResponse | None]] = []
self._token: Any = None

def register(
self, prompt: str, system: str, model: str, temperature: float, max_tokens: int
) -> asyncio.Future[LLMResponse | None]:
"""Register a request and return a future resolved with the result."""
loop = asyncio.get_running_loop()
future: asyncio.Future[LLMResponse | None] = loop.create_future()
self._requests.append(_BatchRequest(prompt, system, model, temperature, max_tokens))
self._futures.append(future)
return future

async def __aenter__(self) -> BatchCollector:
self._token = _active_collector.set(self)
return self

async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
_active_collector.reset(self._token)
if not self._requests:
return
try:
await self._submit_and_resolve()
except Exception:
logger.warning(
"Batch submission failed, falling back to individual calls",
exc_info=True,
)
await self._fallback_individual()

async def _submit_and_resolve(self) -> None:
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request

from evoforge.llm.client import LLMClient, LLMResponse

async_client = self._client.get_async_client()

batch_requests = []
for i, req in enumerate(self._requests):
system_param = self._client.format_system(req.system)
batch_requests.append(
Request(
custom_id=f"req-{i}",
params=MessageCreateParamsNonStreaming(
model=req.model,
max_tokens=req.max_tokens,
temperature=req.temperature,
system=system_param,
messages=[{"role": "user", "content": req.prompt}],
),
)
)

batch = await async_client.messages.batches.create(requests=batch_requests)
batch_id = batch.id

elapsed = 0.0
while batch.processing_status != "ended":
if elapsed >= self._max_wait:
msg = f"Batch {batch_id} not ended after {elapsed:.0f}s"
raise TimeoutError(msg)
await asyncio.sleep(self._poll_interval)
elapsed += self._poll_interval
batch = await async_client.messages.batches.retrieve(batch_id)

results_by_id: dict[str, LLMResponse | None] = {}
async for result in await async_client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
msg_obj = result.result.message
text = msg_obj.content[0].text # type: ignore[union-attr]
cache_read, cache_creation = LLMClient.extract_cache_tokens(msg_obj.usage)
results_by_id[result.custom_id] = LLMResponse(
text=text,
input_tokens=msg_obj.usage.input_tokens,
output_tokens=msg_obj.usage.output_tokens,
model=msg_obj.model,
cache_read_tokens=cache_read,
cache_creation_tokens=cache_creation,
)
else:
logger.warning("Batch request %s: %s", result.custom_id, result.result.type)
results_by_id[result.custom_id] = None

for i, future in enumerate(self._futures):
req_id = f"req-{i}"
future.set_result(results_by_id.get(req_id))

async def _fallback_individual(self) -> None:
async def _do_one(i: int, req: _BatchRequest) -> None:
try:
result = await self._client.async_generate(
req.prompt, req.system, req.model, req.temperature, req.max_tokens
)
self._futures[i].set_result(result)
except Exception:
logger.warning("Fallback call %d failed", i, exc_info=True)
if not self._futures[i].done():
self._futures[i].set_result(None)

await asyncio.gather(*(_do_one(i, req) for i, req in enumerate(self._requests)))
Loading