Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,31 @@ a documentation / housekeeping commit on `main`.
legacy, doubled `w_n` math, zero `w_L` drops cable term,
encoder/decoder round-trip, over-slot rejection, warm-start ≤
cold-start cost, weights flip layout choice, default-path parity).
- **Phase 29 — RSM surrogate in the GA hot loop**
(`src/ropeway/optimizer.py`, `src/ropeway/surrogate.py`). Truth-evaluate
the initial population + a few warm-up generations, fit a quadratic
polynomial-ridge response surface, then for each later generation
*pre-screen* offspring with the surrogate and truth-evaluate only the
predicted-best `surrogate_screen_fraction`. Discarded offspring are
penalised (not assigned the noisy surrogate prediction directly) so the
surrogate acts strictly as a filter — selection still operates on real
costs. Elitism injects the truth-evaluated best back each generation so
the gene pool can never lose more than it gains.
- New `GAConfig` knobs: `use_surrogate=False` (off by default;
bit-identical to pre-29 behaviour), `surrogate_screen_fraction=0.5`,
`surrogate_warmup_gens=3`, `surrogate_retrain_every=5`.
- `RSMSurrogate.train_from_samples(X, y)` fits directly from the GA's
truth pool (no extra random sweep). Works with both the 3-gene
vertical and 4-gene Phase 12c joint-H+V encodings.
- `GAResult` now carries `n_true_evals` + `n_surrogate_evals` so the
speed-up is observable.
- On the smoke corridor (40 pop × 30 gen, seed 11): pure GA = 999
truth evals, surrogate on = ~542 truth evals (**~46 % cut**), both
feasible.
- 6 new tests cover: train-from-samples shape, `use_surrogate=False`
parity, truth-eval reduction vs the baseline, feasibility under the
surrogate, screen-fraction monotonicity, and a truth-recompute on the
returned best.

### Fixed

Expand Down
188 changes: 178 additions & 10 deletions src/ropeway/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ class GAConfig:
mutation_prob: float = 0.3
tournament_size: int = 3
seed: int | None = 42
# P29 — Response-Surface-Method surrogate in the hot loop. When on, we
# truth-evaluate generation 0, fit a polynomial-ridge model, and use it
# to pre-screen later offspring: only the predicted-best
# ``surrogate_screen_fraction`` get the (much) more expensive
# ``evaluate_alignment`` call. The model is re-fit every
# ``surrogate_retrain_every`` generations from the accumulating
# truth-evaluated samples. Off by default — the GA stays bit-identical
# to pre-P29 behaviour.
use_surrogate: bool = False
surrogate_screen_fraction: float = 0.5
# Warmup must be ≥ 2-3 gens — generation 0's random pop is overwhelmingly
# infeasible, so an immediately-fit surrogate sees a log-cost cloud with
# near-zero variance and ranks noise. A couple of varied gens give it
# enough spread to rank meaningfully.
surrogate_warmup_gens: int = 3
surrogate_retrain_every: int = 5
# P30 — per-objective cost weights handed to ``evaluate_alignment``.
# ``None`` preserves the legacy (50 000, 1 000, 50) trio. Tune to bias
# the search: high ``w_n`` favours fewer towers (urban), high ``w_L``
Expand Down Expand Up @@ -359,6 +375,12 @@ class GAResult:
best_result: EvalResult
history_best: list[float] = field(default_factory=list)
history_avg: list[float] = field(default_factory=list)
# P29 — how many candidates went through the truth evaluator vs the
# surrogate. With ``use_surrogate=False`` this matches the number of
# individuals DEAP evaluated; with the surrogate on, expect a sizeable
# cut after the warmup generations.
n_true_evals: int = 0
n_surrogate_evals: int = 0


def optimize(
Expand Down Expand Up @@ -456,15 +478,26 @@ def _fitness(individual: list[float]) -> tuple[float]:
stats.register("min", np.min)
stats.register("avg", lambda v: float(np.mean(np.array(v)[np.isfinite(v)])) if np.any(np.isfinite(v)) else float("inf"))

pop, logbook = algorithms.eaSimple(
pop, toolbox,
cxpb=ga.crossover_prob,
mutpb=ga.mutation_prob,
ngen=ga.generations,
stats=stats,
halloffame=hof,
verbose=verbose,
)
if ga.use_surrogate:
pop, logbook, n_true_evals, n_surrogate_evals = _run_with_surrogate(
pop, toolbox, ga, stats, hof, verbose=verbose,
)
else:
pop, logbook = algorithms.eaSimple(
pop, toolbox,
cxpb=ga.crossover_prob,
mutpb=ga.mutation_prob,
ngen=ga.generations,
stats=stats,
halloffame=hof,
verbose=verbose,
)
# eaSimple evaluates the initial population (those without a valid
# fitness — all of them on first call) plus every offspring whose
# fitness was invalidated by varAnd in each generation. The DEAP
# logbook reports it under "nevals" per gen; sum it up.
n_true_evals = int(sum(rec.get("nevals", 0) for rec in logbook))
n_surrogate_evals = 0

best = hof[0]
towers = _decode(best, corridor_length, cfg, max_slots,
Expand All @@ -484,4 +517,139 @@ def _fitness(individual: list[float]) -> tuple[float]:

history_best = [rec["min"] for rec in logbook]
history_avg = [rec["avg"] for rec in logbook]
return GAResult(alignment, result, history_best, history_avg)
return GAResult(
alignment, result, history_best, history_avg,
n_true_evals=n_true_evals,
n_surrogate_evals=n_surrogate_evals,
)


def _run_with_surrogate(pop, toolbox, ga: "GAConfig", stats, hof, *, verbose: bool):
"""P29 — surrogate-screened GA loop.

Truth-evaluate the initial population (and any warm-up generations),
fit an :class:`~ropeway.surrogate.RSMSurrogate`, then for each
subsequent generation:

1. ``varAnd`` parents into offspring (DEAP-standard).
2. Predict offspring cost with the surrogate.
3. Truth-evaluate only the ``surrogate_screen_fraction`` predicted-best
— these update the training pool.
4. The rest get the surrogate's prediction as their fitness and never
touch the expensive evaluator.
5. Re-fit every ``surrogate_retrain_every`` generations.

The HoF receives only truth-evaluated individuals, so the returned
best is always real.
"""
from .surrogate import RSMSurrogate # local import: keeps sklearn off the hot path

logbook = tools.Logbook()
logbook.header = ["gen", "nevals"] + (stats.fields if stats else [])

def _truth_eval(individuals: list) -> int:
fitnesses = list(map(toolbox.evaluate, individuals))
for ind, fit in zip(individuals, fitnesses):
ind.fitness.values = fit
return len(individuals)

n_true_evals = _truth_eval(pop)
n_surrogate_evals = 0
for ind in pop:
ind.is_surrogate = False
hof.update(pop)

# Training pool: every truth-evaluated genome + its true cost. Used to
# (re-)fit the surrogate.
X_train: list[list[float]] = [list(ind) for ind in pop]
y_train: list[float] = [ind.fitness.values[0] for ind in pop]

record = stats.compile(pop) if stats else {}
logbook.record(gen=0, nevals=len(pop), **record)
if verbose:
print(logbook.stream)

surrogate: RSMSurrogate | None = None
keep_fraction = float(np.clip(ga.surrogate_screen_fraction, 0.05, 1.0))

for gen in range(1, ga.generations + 1):
offspring = algorithms.varAnd(
toolbox.select(pop, len(pop)),
toolbox, ga.crossover_prob, ga.mutation_prob,
)

# Train / re-train the surrogate from the accumulating truth pool.
retrain_now = (
gen >= ga.surrogate_warmup_gens
and (surrogate is None or gen % max(1, ga.surrogate_retrain_every) == 0)
)
if retrain_now:
surrogate = RSMSurrogate.train_from_samples(
np.array(X_train, dtype=float),
np.array(y_train, dtype=float),
)

invalid = [ind for ind in offspring if not ind.fitness.valid]

if surrogate is not None and len(invalid) > 1:
X_inv = np.array([list(ind) for ind in invalid], dtype=float)
preds = surrogate.predict(X_inv)
n_keep = max(1, int(len(invalid) * keep_fraction))
top_idx = np.argsort(preds)[:n_keep]
top_set = set(int(i) for i in top_idx)
truth_batch = [invalid[i] for i in range(len(invalid)) if i in top_set]
surrogate_batch = [
invalid[i] for i in range(len(invalid)) if i not in top_set
]
n_true_evals += _truth_eval(truth_batch)
for ind in truth_batch:
ind.is_surrogate = False
X_train.append(list(ind))
y_train.append(ind.fitness.values[0])
# Discarded offspring: don't trust the surrogate's absolute
# prediction as fitness — empirically the model can't tell
# "barely infeasible" from "catastrophically infeasible" while
# the truth pool is dominated by ~1e9 penalty values. Assigning
# the current worst-observed cost guarantees these candidates
# lose tournaments to anything truth-evaluated, so the surrogate
# acts purely as a *filter* on which offspring to spend a real
# evaluator call on, not as a noisy fitness source.
worst_truth = float(max(y_train)) if y_train else 1e12
for ind in surrogate_batch:
ind.fitness.values = (worst_truth,)
ind.is_surrogate = True
n_surrogate_evals += len(surrogate_batch)
else:
n_true_evals += _truth_eval(invalid)
for ind in invalid:
ind.is_surrogate = False
X_train.append(list(ind))
y_train.append(ind.fitness.values[0])

truth_only = [
ind for ind in offspring
if ind.fitness.valid and not getattr(ind, "is_surrogate", False)
]
if truth_only:
hof.update(truth_only)

# Elitism: keep the best truth-evaluated individual seen so far in
# the gene pool. Surrogate predictions are noisy enough that without
# this safeguard selection can drift toward genomes the surrogate
# likes but reality penalises — the elite carries the real best
# forward so the population can never lose more than it gains.
if len(hof) > 0:
elite = toolbox.clone(hof[0])
elite.is_surrogate = False
worst_idx = int(
np.argmax([ind.fitness.values[0] for ind in offspring])
)
offspring[worst_idx] = elite
pop[:] = offspring

record = stats.compile(pop) if stats else {}
logbook.record(gen=gen, nevals=len(invalid), **record)
if verbose:
print(logbook.stream)

return pop, logbook, n_true_evals, n_surrogate_evals
38 changes: 38 additions & 0 deletions src/ropeway/surrogate.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,44 @@ def predict(self, Xnew: np.ndarray) -> np.ndarray:

return cls(model=_ExpWrap(pipe), train_score=score, n_samples=n_samples)

@classmethod
def train_from_samples(
cls,
X: np.ndarray,
y: np.ndarray,
*,
alpha: float = 1.0,
) -> "RSMSurrogate":
"""Fit the polynomial-ridge response surface directly from samples.

Used by the GA when running with ``use_surrogate=True``: we feed in
the truth-evaluated population from generation 0 (and re-fit
periodically) rather than draw a fresh random sample. Works with
any genome width — both the 3-gene (vertical) and 4-gene
(Phase 12c joint H+V) encodings are fine.
"""
X = np.atleast_2d(np.asarray(X, dtype=float))
y = np.asarray(y, dtype=float)
pipe = Pipeline(
[
("scaler", StandardScaler()),
("poly", PolynomialFeatures(degree=2, interaction_only=False, include_bias=False)),
("ridge", Ridge(alpha=alpha)),
]
)
y_t = np.log1p(np.maximum(y, 0.0))
pipe.fit(X, y_t)
score = float(pipe.score(X, y_t))

class _ExpWrap:
def __init__(self, inner: Pipeline):
self.inner = inner

def predict(self, Xnew: np.ndarray) -> np.ndarray:
return np.expm1(self.inner.predict(Xnew))

return cls(model=_ExpWrap(pipe), train_score=score, n_samples=int(X.shape[0]))


def rank_genomes_by_surrogate(
surrogate: RSMSurrogate, genomes: np.ndarray, keep_fraction: float = 0.3
Expand Down
Loading
Loading