From 49f00e107f2fb2cd32fb30dba752558be7e4aa1e Mon Sep 17 00:00:00 2001 From: anderdc Date: Thu, 28 May 2026 13:30:57 -0500 Subject: [PATCH 1/2] Replay collateral per block so capacity reflects window state (#409) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Capacity weighting used to multiply already-replayed crown by the miner's *current* collateral, read once at scoring time. A miner could hold crown through the window with low collateral and top up before the next scoring pass to receive full capacity credit retroactively. Mirror the active/busy/rate treatment: emit a per-block CollateralEvent series from CollateralPosted, CollateralWithdrawn, and the silent fee/ slash deductions hidden inside SwapCompleted/SwapTimedOut. The crown replay walks the series alongside everything else and accumulates a cap- weighted-blocks counter that the scoring loop divides back into a time- weighted capacity factor — no contract reads at scoring time. Cold bootstrap anchors current collateral at cursor (same approximation already used for the active set). Warm restart hydrates collateral_events from state.db. Prune preserves the latest row per hotkey as a reconstruction anchor, same rule as active_events. --- allways/validator/event_watcher.py | 142 +++++++++++++++++- allways/validator/scoring.py | 68 ++++++--- allways/validator/scoring_trace.py | 13 +- allways/validator/state_store.py | 57 ++++++- tests/test_scoring_v1.py | 229 +++++++++++++++++++++++++++-- 5 files changed, 461 insertions(+), 48 deletions(-) diff --git a/allways/validator/event_watcher.py b/allways/validator/event_watcher.py index eb3dd44..deb84a7 100644 --- a/allways/validator/event_watcher.py +++ b/allways/validator/event_watcher.py @@ -197,6 +197,19 @@ class ActiveEvent: block: int +@dataclass +class CollateralEvent: + """Transition of a miner's posted collateral. ``collateral_rao`` is the + post-event total (not a delta), matching the on-chain ``total`` field of + ``CollateralPosted`` / ``CollateralWithdrawn``. Replayed per-block so the + capacity multiplier in scoring reflects collateral as-of each crown + block, not as-of the scoring moment.""" + + hotkey: str + collateral_rao: int + block: int + + MAX_BLOCKS_PER_SYNC = 50 EVENT_PRUNE_INTERVAL_BLOCKS = 100 # O(events) sweep; window much wider than per-step delta. @@ -241,6 +254,13 @@ def __init__( self.busy_events: List[BusyEvent] = [] self.active_events: List[ActiveEvent] = [] self.active_events_by_hotkey: Dict[str, List[ActiveEvent]] = {} + # Per-block collateral series — feeds the scoring replay's capacity + # multiplier so a miner cannot top up collateral after the fact and + # retroactively boost the capacity weight on already-earned crown. + # Bootstrapped from a single anchor read at cursor; subsequent + # CollateralPosted/CollateralWithdrawn events build the series. + self.collateral_events: List[CollateralEvent] = [] + self.collateral_events_by_hotkey: Dict[str, List[CollateralEvent]] = {} # Direction-keyed reservation pin lifecycle — feeds the scoring # replay's pinned-rate overlay so a miner who pins at a moderate # rate then bumps live to absurd cannot earn crown at the inflated @@ -303,6 +323,32 @@ def get_active_miners_at(self, block: int) -> Set[str]: latest[ev.hotkey] = ev.active return {hk for hk, is_active in latest.items() if is_active} + def get_collateral_events_in_range(self, start_block: int, end_block: int) -> List[dict]: + """Collateral transitions in ``(start_block, end_block]``, oldest first. + ``collateral_rao`` is the post-event total.""" + out: List[dict] = [] + for ev in self.collateral_events: + if ev.block <= start_block: + continue + if ev.block > end_block: + break + out.append({'hotkey': ev.hotkey, 'collateral_rao': ev.collateral_rao, 'block': ev.block}) + return out + + def get_miner_collaterals_at(self, block: int) -> Dict[str, int]: + """Per-hotkey posted collateral at ``block``, reconstructed by taking + the most recent transition at or before ``block`` for each hotkey. + Hotkeys with no recorded event default to absent (caller treats as 0). + Cold bootstrap seeds an anchor event at ``cursor`` for every active + hotkey, so any rewardable miner with a meaningful collateral position + appears in the result for ``block >= cursor``.""" + latest: Dict[str, int] = {} + for ev in self.collateral_events: + if ev.block > block: + break + latest[ev.hotkey] = ev.collateral_rao + return latest + def get_reservation_pin_events_in_range( self, start_block: int, end_block: int, from_chain: str, to_chain: str ) -> List[dict]: @@ -418,6 +464,20 @@ def cold_bootstrap( self.active_events.append(event) self.active_events_by_hotkey.setdefault(hotkey, []).append(event) self.state_store.insert_active_event(self.cursor, hotkey, True) + # Same anchor logic for collateral: a fresh boot can only read current + # collateral, so we record it at ``cursor`` and treat the window as if + # the miner held that collateral the whole time. CollateralPosted / + # CollateralWithdrawn events replayed during sync_to refine the series. + if metagraph_hotkeys and contract_client is not None: + for hotkey in metagraph_hotkeys: + try: + collateral = int(contract_client.get_miner_collateral(hotkey)) + except Exception as e: + bt.logging.debug(f'EventWatcher bootstrap: collateral read failed for {hotkey[:8]}: {e}') + continue + if collateral <= 0: + continue + self._record_collateral_event(self.cursor, hotkey, collateral) self.state_store.set_event_cursor(self.cursor) def hydrate_from_db(self) -> None: @@ -458,10 +518,22 @@ def hydrate_from_db(self) -> None: for r in pin_event_rows ] + collateral_rows = self.state_store.load_all_collateral_events() + self.collateral_events = [ + CollateralEvent( + hotkey=r['hotkey'], collateral_rao=int(r['collateral_rao']), block=r['block_num'] + ) + for r in collateral_rows + ] + self.collateral_events_by_hotkey = {} + for ev in self.collateral_events: + self.collateral_events_by_hotkey.setdefault(ev.hotkey, []).append(ev) + bt.logging.info( f'EventWatcher hydrated from DB: cursor={self.cursor}, ' f'{len(self.active_miners)} active, {sum(self.open_swap_count.values())} open swaps, ' - f'{len(self.reservation_pin_events)} pin events' + f'{len(self.reservation_pin_events)} pin events, ' + f'{len(self.collateral_events)} collateral events' ) def sync_to(self, current_block: int) -> None: @@ -605,6 +677,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None miner = values.get('miner', '') if isinstance(swap_id, int) and miner: tao = int(values.get('tao_amount') or 0) + fee = int(values.get('fee_amount') or 0) from_chain, to_chain = self._lookup_swap_direction(swap_id) self.state_store.insert_swap_outcome( swap_id=swap_id, @@ -615,6 +688,11 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None from_chain=from_chain, to_chain=to_chain, ) + # The contract's apply_collateral_penalty deducts ``fee_amount`` + # from collateral without emitting a CollateralWithdrawn event, + # so we mirror it here to keep the replayed series in step. + if fee > 0: + self._apply_collateral_delta(block_num, miner, -fee) self.apply_busy_delta(block_num, miner, -1, swap_id) # Defensive: if SwapInitiated was missed (out-of-order delivery, # bootstrap), terminal SwapCompleted is the last chance to clear @@ -631,6 +709,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None swap_id = values.get('swap_id') miner = values.get('miner', '') if isinstance(swap_id, int) and miner: + slash = int(values.get('slash_amount') or 0) from_chain, to_chain = self._lookup_swap_direction(swap_id) self.state_store.insert_swap_outcome( swap_id=swap_id, @@ -640,6 +719,13 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None from_chain=from_chain, to_chain=to_chain, ) + # The slash side mirrors the fee side: apply_collateral_penalty + # silently reduces collateral. CollateralSlashed *does* fire + # right before SwapTimedOut, but we deliberately don't double- + # count by handling both — we drive the series off the terminal + # event whose direction-aware busy-delta we already process. + if slash > 0: + self._apply_collateral_delta(block_num, miner, -slash) # Defensive: a SwapInitiated this validator missed would leave # a stale pin behind — clear it on the terminal event too. self.state_store.remove_reservation_pin(miner) @@ -652,6 +738,16 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None bt.logging.warning( f'EventWatcher: {self._label(miner)} SwapTimedOut swap=#{swap_id} @ block {block_num} (slash)' ) + elif name == 'CollateralPosted': + miner = values.get('miner', '') + total = values.get('total') + if miner and isinstance(total, int): + self._record_collateral_event(block_num, miner, int(total)) + elif name == 'CollateralWithdrawn': + miner = values.get('miner', '') + remaining = values.get('remaining') + if miner and isinstance(remaining, int): + self._record_collateral_event(block_num, miner, int(remaining)) elif name == 'ReservationExtensionFinalized': # Event-driven cache update for the local pending_confirms row — # replaces the polling refresh that the legacy vote-extend flow @@ -880,6 +976,35 @@ def apply_busy_delta(self, block_num: int, hotkey: str, delta: int, swap_id: Opt self.busy_events.append(BusyEvent(hotkey=hotkey, delta=delta, block=block_num)) self.state_store.insert_busy_event(block_num, hotkey, delta, swap_id) + def _latest_collateral(self, hotkey: str) -> int: + """Last known collateral for ``hotkey`` (0 if no event has fired).""" + history = self.collateral_events_by_hotkey.get(hotkey) + if not history: + return 0 + return history[-1].collateral_rao + + def _record_collateral_event(self, block_num: int, hotkey: str, collateral_rao: int) -> None: + """Append a collateral transition and persist it. ``collateral_rao`` is + the post-event total (matches the on-chain event field). Duplicate + same-value events are still recorded so the latest block_num always + reflects when the position was last touched, but caller can suppress + no-op writes if they want a tighter series.""" + if not hotkey: + return + clipped = max(0, int(collateral_rao)) + event = CollateralEvent(hotkey=hotkey, collateral_rao=clipped, block=block_num) + self.collateral_events.append(event) + self.collateral_events_by_hotkey.setdefault(hotkey, []).append(event) + self.state_store.insert_collateral_event(block_num, hotkey, clipped) + + def _apply_collateral_delta(self, block_num: int, hotkey: str, delta_rao: int) -> None: + """Apply a signed delta against the last-known collateral. Used for + fee (``confirm_swap``) and slash (``timeout_swap``) deductions that + ``apply_collateral_penalty`` silently makes without emitting a + ``CollateralWithdrawn``. Clipped at zero.""" + prior = self._latest_collateral(hotkey) + self._record_collateral_event(block_num, hotkey, prior + delta_rao) + def prune_old_events(self, current_block: int) -> None: """Drop busy and active events older than one scoring window. Latest active event per hotkey is preserved as a state-reconstruction anchor; @@ -915,6 +1040,21 @@ def prune_old_events(self, current_block: int) -> None: for ev in self.reservation_pin_events if ev.block_num >= cutoff or latest_per_dir.get((ev.hotkey, ev.from_chain, ev.to_chain)) is ev ] + if self.collateral_events: + latest_collateral: Dict[str, CollateralEvent] = {} + for ev in self.collateral_events: + latest_collateral[ev.hotkey] = ev + self.collateral_events = [ + ev for ev in self.collateral_events if ev.block >= cutoff or latest_collateral.get(ev.hotkey) is ev + ] + for hotkey, events in list(self.collateral_events_by_hotkey.items()): + latest = events[-1] if events else None + pruned = [ev for ev in events if ev.block >= cutoff or ev is latest] + if pruned: + self.collateral_events_by_hotkey[hotkey] = pruned + else: + del self.collateral_events_by_hotkey[hotkey] self.state_store.prune_active_events(cutoff) self.state_store.prune_busy_events(cutoff) + self.state_store.prune_collateral_events(cutoff) self.state_store.prune_reservation_pin_events(cutoff) diff --git a/allways/validator/scoring.py b/allways/validator/scoring.py index 1bde9be..095f125 100644 --- a/allways/validator/scoring.py +++ b/allways/validator/scoring.py @@ -38,6 +38,7 @@ class DirectionTrace: pool: float = 0.0 crown_blocks: Dict[str, float] = field(default_factory=dict) + cap_weighted_blocks: Dict[str, float] = field(default_factory=dict) unfilled_blocks: int = 0 best_rate: float = 0.0 @@ -132,7 +133,6 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: except Exception as e: bt.logging.warning(f'min_swap_amount read failed: {e}') min_swap_amount = 0 - collaterals: Dict[str, int] = {} miner_volume_total: Dict[str, int] = {} miner_crown_total: Dict[str, float] = {} network_volume_total: int = 0 @@ -175,15 +175,15 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: uid = hotkey_to_uid.get(hotkey) if uid is None: continue # dereg'd mid-window; credit forfeited - if hotkey not in collaterals: - try: - collaterals[hotkey] = int(self.contract_client.get_miner_collateral(hotkey)) - except Exception as e: - bt.logging.warning(f'collateral read failed for {hotkey[:8]}: {e}') - collaterals[hotkey] = 0 - cap = capacity_factor(collaterals[hotkey], max_swap_amount) + # Capacity is integrated per-block during the replay, so the + # effective multiplier is the time-weighted average over the + # miner's crown intervals. Reading current collateral here + # would let a post-window top-up retroactively boost credit + # already earned (#409). + cap_blocks = trace.cap_weighted_blocks.get(hotkey, 0.0) + cap = (cap_blocks / blocks) if blocks > 0 else 0.0 wt = weighting_traces.setdefault(hotkey, WeightingTrace()) - wt.record_capacity(collateral=collaterals[hotkey], factor=cap) + wt.record_capacity(factor=cap) wt.record_credibility( closed_swaps=sum(success_stats.get(hotkey, (0, 0))), ramp_target=CREDIBILITY_RAMP_OBSERVATIONS, @@ -321,13 +321,19 @@ class EventKind(IntEnum): wrote: a miner who posts a rate change in the same block they get reserved has the post-update value pinned, matching the contract's block-end commitment read. + + COLLATERAL is independent of qualification — it only scales the credit + of an interval. Ordered between RATE and RESERVED_START so that a same- + block post lands before any reservation pin captures, matching the + intuition that capacity is observable as soon as it's posted. """ ACTIVE = 0 BUSY = 1 RESERVED_END = 2 RATE = 3 - RESERVED_START = 4 + COLLATERAL = 4 + RESERVED_START = 5 @dataclass @@ -353,9 +359,9 @@ def reconstruct_window_start_state( to_chain: str, window_start: int, rewardable_hotkeys: Set[str], -) -> Tuple[Dict[str, float], Dict[str, int], Set[str], Dict[str, float]]: - """Snapshot rates, busy counts, active set, and reservation-pin overlay - as they stood at window_start.""" +) -> Tuple[Dict[str, float], Dict[str, int], Set[str], Dict[str, float], Dict[str, int]]: + """Snapshot rates, busy counts, active set, reservation-pin overlay, and + posted collateral as they stood at window_start.""" rates: Dict[str, float] = {} busy_count: Dict[str, int] = dict(event_watcher.get_busy_miners_at(window_start)) active_set: Set[str] = set(event_watcher.get_active_miners_at(window_start)) @@ -371,7 +377,9 @@ def reconstruct_window_start_state( if hk in rewardable_hotkeys and rate > 0 } - return rates, busy_count, active_set, pinned_rates + collaterals: Dict[str, int] = dict(event_watcher.get_miner_collaterals_at(window_start)) + + return rates, busy_count, active_set, pinned_rates, collaterals def merge_replay_events( @@ -401,6 +409,11 @@ def merge_replay_events( kind = EventKind.RESERVED_START if e['kind'] == 'start' else EventKind.RESERVED_END events.append(ReplayEvent(block=e['block'], hotkey=e['hotkey'], kind=kind, value=float(e['rate']))) + for e in event_watcher.get_collateral_events_in_range(window_start, window_end): + events.append( + ReplayEvent(block=e['block'], hotkey=e['hotkey'], kind=EventKind.COLLATERAL, value=float(e['collateral_rao'])) + ) + events.sort(key=lambda ev: ev.sort_key) return events @@ -421,15 +434,22 @@ def replay_crown_time_window( Ties at the same rate split credit evenly. A miner qualifies for crown at an instant iff they are on the current metagraph, were active at that instant, not busy, had a positive rate posted, and their rate is - executable under the current contract swap bounds. Active/rate/busy - are evaluated per-block via the replay — a miner's status at scoring - time is irrelevant other than metagraph membership (used to credit the - UID). Collateral-floor invariants are trusted to the contract's active - flag; halt state is handled at ``score_and_reward_miners`` entry. + executable under the current contract swap bounds. Active/rate/busy/ + collateral are evaluated per-block via the replay — a miner's status at + scoring time is irrelevant other than metagraph membership (used to + credit the UID). The collateral-floor activation gate is still trusted + to the contract's active flag; halt state is handled at + ``score_and_reward_miners`` entry. Bounds at 0 disable the executability filter (matches the contract's - "unset" sentinel); the rate-positive floor still applies.""" - rates, busy_count, active_set, pinned_rates = reconstruct_window_start_state( + "unset" sentinel); the rate-positive floor still applies. + + When ``trace`` is supplied, ``trace.cap_weighted_blocks`` is populated + alongside ``trace.crown_blocks``. The weighted series multiplies each + interval's split by ``capacity_factor(collateral_at_block, max_swap_rao)`` + so a post-window collateral boost cannot retroactively scale credit + already earned (closes #409).""" + rates, busy_count, active_set, pinned_rates, collaterals = reconstruct_window_start_state( store, event_watcher, from_chain, to_chain, window_start, rewardable_hotkeys ) replay_events = merge_replay_events(store, event_watcher, from_chain, to_chain, window_start, window_end) @@ -444,6 +464,7 @@ def executable_check(rate: float) -> bool: return is_executable_rate(rate, from_chain, to_chain, min_swap_rao, max_swap_rao) crown_blocks: Dict[str, float] = {} + cap_weighted_blocks: Dict[str, float] = {} prev_block = window_start def effective_rates() -> Dict[str, float]: @@ -481,6 +502,8 @@ def credit_interval(interval_start: int, interval_end: int) -> None: split = duration / len(holders) for hk in holders: crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split + cap = capacity_factor(collaterals.get(hk, 0), max_swap_rao) + cap_weighted_blocks[hk] = cap_weighted_blocks.get(hk, 0.0) + split * cap def apply_event(event: ReplayEvent) -> None: if event.kind is EventKind.RATE: @@ -496,6 +519,8 @@ def apply_event(event: ReplayEvent) -> None: pinned_rates[event.hotkey] = event.value elif event.kind is EventKind.RESERVED_END: pinned_rates.pop(event.hotkey, None) + elif event.kind is EventKind.COLLATERAL: + collaterals[event.hotkey] = max(0, int(event.value)) else: # ACTIVE if event.value > 0: active_set.add(event.hotkey) @@ -510,6 +535,7 @@ def apply_event(event: ReplayEvent) -> None: credit_interval(prev_block, window_end) if trace is not None: trace.crown_blocks = dict(crown_blocks) + trace.cap_weighted_blocks = dict(cap_weighted_blocks) return crown_blocks diff --git a/allways/validator/scoring_trace.py b/allways/validator/scoring_trace.py index 113620e..11e29da 100644 --- a/allways/validator/scoring_trace.py +++ b/allways/validator/scoring_trace.py @@ -22,9 +22,13 @@ @dataclass class WeightingTrace: - """Per-hotkey capacity + volume + credibility factors for the scoring log.""" + """Per-hotkey capacity + volume + credibility factors for the scoring log. + + ``capacity_factor`` is the time-weighted average of + ``min(1, collateral / max_swap)`` over the miner's crown intervals — the + per-block series lives in the event watcher, so a post-window collateral + top-up cannot retroactively scale it (#409).""" - collateral: int = 0 capacity_factor: float = 1.0 volume_rao: int = 0 crown_share: float = 0.0 @@ -34,8 +38,7 @@ class WeightingTrace: closed_swaps: int = 0 credibility_ramp: float = 0.0 - def record_capacity(self, collateral: int, factor: float) -> None: - self.collateral = collateral + def record_capacity(self, factor: float) -> None: self.capacity_factor = factor def record_volume(self, vol_rao: int, total_volume_rao: int, crown_share: float, factor: float) -> None: @@ -93,7 +96,7 @@ def log_scoring_trace( if wt is not None: extras = ( f' ({wt.closed_swaps}/{CREDIBILITY_RAMP_OBSERVATIONS} closed, ramp={wt.credibility_ramp:.2f})' - f' cap={wt.capacity_factor:.2f} (col={wt.collateral / TAO_TO_RAO:g}t)' + f' cap={wt.capacity_factor:.2f}' f' vol={wt.volume_rao / TAO_TO_RAO:g}t vol_share={wt.volume_share:.2f}' f' crown_share={wt.crown_share:.2f} vol_f={wt.volume_factor:.2f}' ) diff --git a/allways/validator/state_store.py b/allways/validator/state_store.py index 9757094..aa717e2 100644 --- a/allways/validator/state_store.py +++ b/allways/validator/state_store.py @@ -2,13 +2,13 @@ Tables: ``pending_confirms`` (axon→forward queue), ``rate_events`` (crown-time input), ``swap_outcomes`` (credibility ledger), ``active_events`` + -``busy_events`` + ``event_watcher_meta`` + ``bootstrapped_swaps`` (event -watcher persistence — warm restarts hydrate from these instead of replaying -contract history). Single connection guarded by one lock; opened with -``check_same_thread=False``. ``busy_timeout`` is set before -``journal_mode=WAL`` because the WAL flip takes a brief exclusive lock that -concurrent openers would otherwise hit as "database is locked" — the local -dev env runs two validators against the same file. +``busy_events`` + ``collateral_events`` + ``event_watcher_meta`` + +``bootstrapped_swaps`` (event watcher persistence — warm restarts hydrate +from these instead of replaying contract history). Single connection guarded +by one lock; opened with ``check_same_thread=False``. ``busy_timeout`` is +set before ``journal_mode=WAL`` because the WAL flip takes a brief exclusive +lock that concurrent openers would otherwise hit as "database is locked" — +the local dev env runs two validators against the same file. """ import sqlite3 @@ -575,6 +575,22 @@ def load_all_busy_events(self) -> List[dict]: for r in rows ] + def insert_collateral_event(self, block_num: int, hotkey: str, collateral_rao: int) -> None: + self._execute( + 'INSERT INTO collateral_events (block_num, hotkey, collateral_rao) VALUES (?, ?, ?)', + (block_num, hotkey, int(collateral_rao)), + ) + + def load_all_collateral_events(self) -> List[dict]: + rows = self._fetchall( + 'SELECT block_num, hotkey, collateral_rao FROM collateral_events ' + 'ORDER BY block_num ASC, id ASC' + ) + return [ + {'block_num': r['block_num'], 'hotkey': r['hotkey'], 'collateral_rao': int(r['collateral_rao'])} + for r in rows + ] + def get_event_cursor(self) -> Optional[int]: row = self._fetchone('SELECT value FROM event_watcher_meta WHERE key = ?', ('cursor',)) return int(row['value']) if row is not None else None @@ -628,6 +644,21 @@ def prune_busy_events(self, cutoff_block: int) -> None: (cutoff_block,), ) + def prune_collateral_events(self, cutoff_block: int) -> None: + """Drop collateral events older than ``cutoff_block``, preserving the + latest row per hotkey as a reconstruction anchor (mirrors + ``prune_active_events``).""" + if cutoff_block <= 0: + return + self._execute( + """ + DELETE FROM collateral_events + WHERE block_num < ? + AND id NOT IN (SELECT MAX(id) FROM collateral_events GROUP BY hotkey) + """, + (cutoff_block,), + ) + # ─── dest_tip_snapshots ───────────────────────────────────────────── def upsert_dest_tip_snapshot( @@ -681,6 +712,7 @@ def reset_event_watcher_state(self) -> None: conn = self.require_connection() conn.execute('DELETE FROM active_events') conn.execute('DELETE FROM busy_events') + conn.execute('DELETE FROM collateral_events') conn.execute('DELETE FROM reservation_pin_events') conn.execute("DELETE FROM event_watcher_meta WHERE key = 'cursor'") conn.execute('DELETE FROM bootstrapped_swaps') @@ -858,6 +890,17 @@ def init_db(self) -> None: CREATE INDEX IF NOT EXISTS idx_busy_events_hotkey ON busy_events(hotkey); + CREATE TABLE IF NOT EXISTS collateral_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + block_num INTEGER NOT NULL, + hotkey TEXT NOT NULL, + collateral_rao INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_collateral_events_block + ON collateral_events(block_num); + CREATE INDEX IF NOT EXISTS idx_collateral_events_hotkey + ON collateral_events(hotkey); + CREATE TABLE IF NOT EXISTS reservation_pin_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, block_num INTEGER NOT NULL, diff --git a/tests/test_scoring_v1.py b/tests/test_scoring_v1.py index d137f45..acaa063 100644 --- a/tests/test_scoring_v1.py +++ b/tests/test_scoring_v1.py @@ -7,7 +7,7 @@ import numpy as np from allways.constants import RECYCLE_UID, SUCCESS_EXPONENT -from allways.validator.event_watcher import ActiveEvent, ContractEventWatcher +from allways.validator.event_watcher import ActiveEvent, CollateralEvent, ContractEventWatcher from allways.validator.scoring import ( calculate_miner_rewards, credibility_ramp, @@ -62,6 +62,17 @@ def seed_active(watcher: ContractEventWatcher, hotkey: str, active: bool, block: watcher.active_miners.discard(hotkey) +def seed_collateral(watcher: ContractEventWatcher, hotkey: str, collateral_rao: int, block: int) -> None: + """Insert a collateral event directly into the watcher's in-memory state. + Mirrors the cold-bootstrap anchor that ``ContractEventWatcher`` writes + when it first sees a miner with a positive contract collateral position.""" + event = CollateralEvent(hotkey=hotkey, collateral_rao=int(collateral_rao), block=block) + watcher.collateral_events.append(event) + watcher.collateral_events_by_hotkey.setdefault(hotkey, []).append(event) + watcher.collateral_events.sort(key=lambda ev: ev.block) + watcher.collateral_events_by_hotkey[hotkey].sort(key=lambda ev: ev.block) + + def make_validator( tmp_path: Path, hotkeys: list[str], @@ -88,6 +99,13 @@ def make_validator( store = ValidatorStateStore(db_path=tmp_path / 'state.db') watcher = make_watcher(store, active=set(hotkeys)) collaterals = collaterals or {} + # Mirror the cold-bootstrap collateral anchor: scoring now reads collateral + # from the event watcher's per-block series, not from a live contract call. + # Seeding at block 0 puts the value before any test's window_start, so the + # reconstruction at window_start returns it as the anchor value. + for hotkey, amount in collaterals.items(): + if amount > 0: + seed_collateral(watcher, hotkey, amount, block=0) bounds_cache = MagicMock() bounds_cache.max_swap_amount.return_value = max_swap_amount contract_client = MagicMock() @@ -1565,20 +1583,24 @@ def test_cold_start_max_swap_zero_is_fail_safe(self, tmp_path: Path): np.testing.assert_allclose(rewards[0], POOL_TAO_BTC, atol=1e-6) v.state_store.close() - def test_collateral_rpc_failure_is_logged_not_fatal(self, tmp_path: Path): - """A failing get_miner_collateral logs and treats as 0 → factor 0 → - miner's reward zeroes but the scoring pass completes.""" + def test_no_collateral_anchor_zeros_reward(self, tmp_path: Path): + """A miner with no collateral event in the watcher's series (e.g. cold- + bootstrap saw zero on-chain collateral, or the read failed) is treated + as zero collateral throughout the window → factor 0 → zero reward. + Scoring no longer reads collateral via RPC, so the failure mode is now + a missing series rather than a failing call.""" hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) - v = make_validator(tmp_path, hotkeys, max_swap_amount=500_000_000) - v.contract_client.get_miner_collateral.side_effect = RuntimeError('rpc down') + v = make_validator(tmp_path, hotkeys, max_swap_amount=500_000_000) # no collaterals dict self.seed_tao_btc_crown(v, 'hk_a') rewards, _ = calculate_miner_rewards(v) assert rewards[0] == 0.0 v.state_store.close() - def test_collateral_read_cached_within_pass(self, tmp_path: Path): - """A miner holding crown in both directions has collateral fetched once, - not per-direction. Keeps the RPC budget bounded.""" + def test_scoring_does_not_call_contract_for_collateral(self, tmp_path: Path): + """Scoring must derive capacity from the replayed event series, not a + live contract read. Closes #409: any path that reads current collateral + at scoring time would let a miner top up after the window and + retroactively boost capacity on already-earned crown.""" hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) v = make_validator( tmp_path, @@ -1594,8 +1616,7 @@ def test_collateral_read_cached_within_pass(self, tmp_path: Path): ) conn.commit() calculate_miner_rewards(v) - # hk_a held crown in both directions → exactly one RPC for collateral. - assert v.contract_client.get_miner_collateral.call_count == 1 + assert v.contract_client.get_miner_collateral.call_count == 0 v.state_store.close() def test_max_swap_amount_rpc_failure_falls_back_to_unity(self, tmp_path: Path): @@ -1616,12 +1637,11 @@ class TestWeightingTraceRecorders: direct unit coverage so changes there don't have to be inferred from integration tests.""" - def test_record_capacity_sets_both_fields(self): + def test_record_capacity_sets_factor(self): from allways.validator.scoring_trace import WeightingTrace wt = WeightingTrace() - wt.record_capacity(collateral=250_000_000, factor=0.5) - assert wt.collateral == 250_000_000 + wt.record_capacity(factor=0.5) assert wt.capacity_factor == 0.5 def test_record_volume_computes_share_and_participation(self): @@ -2157,3 +2177,184 @@ def test_swap_completed_without_tao_amount_defaults_to_zero(self, tmp_path: Path ) assert row['tao_amount'] == 0 store.close() + + +class TestHistoricalCollateralReplay: + """Capacity weighting is now derived from a per-block collateral series + replayed alongside active/busy/rate, not a contract read at scoring time. + Closes #409 — a miner who tops up collateral after the window cannot + retroactively boost the capacity multiplier on crown they've already + earned.""" + + def seed_tao_btc_crown(self, v: SimpleNamespace, hotkey: str, rate: float = 0.00020) -> None: + conn = v.state_store.require_connection() + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + (hotkey, 'tao', 'btc', rate, 0), + ) + conn.commit() + + def test_409_retroactive_topup_does_not_boost_window(self, tmp_path: Path): + """Reproduces the #409 proof. Window holds collateral at 0.1 TAO the + entire time; a post-window CollateralPosted to 0.5 TAO must not + change the reward.""" + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator( + tmp_path, + hotkeys, + block=10_000, + max_swap_amount=500_000_000, + collaterals={'hk_a': 100_000_000}, # held throughout the window + ) + self.seed_tao_btc_crown(v, 'hk_a') + # Top-up fires *after* window_end (= 10_000). Window is (8_800, 10_000]. + v.event_watcher.apply_event( + 10_500, + 'CollateralPosted', + {'miner': 'hk_a', 'amount': 400_000_000, 'total': 500_000_000}, + ) + rewards, _ = calculate_miner_rewards(v) + # capacity_factor = 100M / 500M = 0.2; pool 0.5 → reward 0.1. + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * 0.2, atol=1e-6) + v.state_store.close() + + def test_mid_window_topup_blends_capacity(self, tmp_path: Path): + """A miner posts more collateral midway through the window. Capacity + is integrated per-block: half the window at 1/4 cap, half at full cap + → time-weighted average 0.625. Validates that the multiplier reflects + collateral *during* the interval, not at the end of it.""" + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator( + tmp_path, + hotkeys, + block=10_000, + max_swap_amount=500_000_000, + collaterals={'hk_a': 125_000_000}, # window-start anchor + ) + self.seed_tao_btc_crown(v, 'hk_a') + # SCORING_WINDOW_BLOCKS = 600 → window is (9_400, 10_000]. Midpoint + # 9_700 splits credit 300/300 between low and full capacity. + v.event_watcher.apply_event( + 9_700, + 'CollateralPosted', + {'miner': 'hk_a', 'amount': 375_000_000, 'total': 500_000_000}, + ) + rewards, _ = calculate_miner_rewards(v) + # First 300 blocks at cap 0.25, next 300 at cap 1.0 → mean cap 0.625. + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * 0.625, atol=1e-6) + v.state_store.close() + + def test_get_miner_collaterals_at_returns_latest(self, tmp_path: Path): + """Unit test the event watcher's per-block collateral reconstruction: + the latest event at or before the queried block wins.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active=set()) + watcher.apply_event( + 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 50, 'total': 100_000_000} + ) + watcher.apply_event( + 500, 'CollateralPosted', {'miner': 'hk_a', 'amount': 50, 'total': 250_000_000} + ) + watcher.apply_event( + 800, 'CollateralWithdrawn', {'miner': 'hk_a', 'amount': 50, 'remaining': 50_000_000} + ) + assert watcher.get_miner_collaterals_at(50) == {} + assert watcher.get_miner_collaterals_at(100) == {'hk_a': 100_000_000} + assert watcher.get_miner_collaterals_at(499) == {'hk_a': 100_000_000} + assert watcher.get_miner_collaterals_at(500) == {'hk_a': 250_000_000} + assert watcher.get_miner_collaterals_at(799) == {'hk_a': 250_000_000} + assert watcher.get_miner_collaterals_at(800) == {'hk_a': 50_000_000} + store.close() + + def test_swap_completed_deducts_fee_from_collateral_series(self, tmp_path: Path): + """``apply_collateral_penalty`` silently deducts the fee inside + ``confirm_swap`` — the contract emits no CollateralWithdrawn for it, + so the watcher mirrors the deduction from ``SwapCompleted.fee_amount`` + to keep the replayed series in step with on-chain collateral.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + watcher.apply_event( + 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 500_000_000} + ) + watcher.apply_event(200, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + watcher.apply_event( + 300, + 'SwapCompleted', + {'swap_id': 1, 'miner': 'hk_a', 'tao_amount': 0, 'fee_amount': 50_000_000}, + ) + assert watcher.get_miner_collaterals_at(300) == {'hk_a': 450_000_000} + store.close() + + def test_swap_timed_out_deducts_slash_from_collateral_series(self, tmp_path: Path): + """Same mirror for slashes — ``SwapTimedOut.slash_amount`` reduces the + replayed series so a post-slash crown interval gets the lower + capacity, not the pre-slash value.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + watcher.apply_event( + 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 500_000_000} + ) + watcher.apply_event(200, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + watcher.apply_event( + 300, + 'SwapTimedOut', + {'swap_id': 1, 'miner': 'hk_a', 'slash_amount': 200_000_000}, + ) + assert watcher.get_miner_collaterals_at(300) == {'hk_a': 300_000_000} + store.close() + + def test_prune_keeps_latest_collateral_event_per_hotkey(self, tmp_path: Path): + """Mirrors the active-prune anchor rule: collateral events older than + the cutoff drop, but the most recent per-hotkey row is kept so + post-prune reconstruction at any block ≥ cutoff still returns the + correct value.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active=set()) + watcher.apply_event( + 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 100_000_000} + ) + watcher.apply_event( + 200, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 200_000_000} + ) + watcher.apply_event( + 5_000, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 500_000_000} + ) + watcher.apply_event( + 50, 'CollateralPosted', {'miner': 'hk_b', 'amount': 0, 'total': 300_000_000} + ) + # current_block=10_000, SCORING_WINDOW_BLOCKS=1200 → cutoff=8_800. + watcher.prune_old_events(10_000) + blocks_a = [ev.block for ev in watcher.collateral_events_by_hotkey['hk_a']] + assert blocks_a == [5_000] + blocks_b = [ev.block for ev in watcher.collateral_events_by_hotkey['hk_b']] + assert blocks_b == [50] + assert watcher.get_miner_collaterals_at(20_000) == {'hk_a': 500_000_000, 'hk_b': 300_000_000} + store.close() + + def test_collateral_events_persist_across_hydrate(self, tmp_path: Path): + """Warm-restart hydration: collateral events written to state.db + round-trip through hydrate_from_db so the in-memory series and the + ``by_hotkey`` index match what was on disk.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + watcher.apply_event( + 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 100_000_000} + ) + watcher.apply_event( + 500, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 200_000_000} + ) + # Build a second watcher pointed at the same DB and hydrate. + store.set_event_cursor(600) + watcher2 = ContractEventWatcher( + substrate=MagicMock(), + contract_address='5contract', + metadata_path=METADATA_PATH, + state_store=store, + ) + watcher2.hydrate_from_db() + assert [(ev.block, ev.collateral_rao) for ev in watcher2.collateral_events] == [ + (100, 100_000_000), + (500, 200_000_000), + ] + assert watcher2.get_miner_collaterals_at(1_000) == {'hk_a': 200_000_000} + store.close() From d9cc740cd38bd57eee42f1bd6812f557b14419f0 Mon Sep 17 00:00:00 2001 From: anderdc <61125407+anderdc@users.noreply.github.com> Date: Thu, 28 May 2026 18:31:54 +0000 Subject: [PATCH 2/2] style: auto-fix pre-commit hooks --- allways/validator/event_watcher.py | 4 +-- allways/validator/scoring.py | 4 ++- allways/validator/state_store.py | 3 +- tests/test_scoring_v1.py | 44 ++++++++---------------------- 4 files changed, 16 insertions(+), 39 deletions(-) diff --git a/allways/validator/event_watcher.py b/allways/validator/event_watcher.py index deb84a7..fadf553 100644 --- a/allways/validator/event_watcher.py +++ b/allways/validator/event_watcher.py @@ -520,9 +520,7 @@ def hydrate_from_db(self) -> None: collateral_rows = self.state_store.load_all_collateral_events() self.collateral_events = [ - CollateralEvent( - hotkey=r['hotkey'], collateral_rao=int(r['collateral_rao']), block=r['block_num'] - ) + CollateralEvent(hotkey=r['hotkey'], collateral_rao=int(r['collateral_rao']), block=r['block_num']) for r in collateral_rows ] self.collateral_events_by_hotkey = {} diff --git a/allways/validator/scoring.py b/allways/validator/scoring.py index 095f125..e77c6f7 100644 --- a/allways/validator/scoring.py +++ b/allways/validator/scoring.py @@ -411,7 +411,9 @@ def merge_replay_events( for e in event_watcher.get_collateral_events_in_range(window_start, window_end): events.append( - ReplayEvent(block=e['block'], hotkey=e['hotkey'], kind=EventKind.COLLATERAL, value=float(e['collateral_rao'])) + ReplayEvent( + block=e['block'], hotkey=e['hotkey'], kind=EventKind.COLLATERAL, value=float(e['collateral_rao']) + ) ) events.sort(key=lambda ev: ev.sort_key) diff --git a/allways/validator/state_store.py b/allways/validator/state_store.py index aa717e2..bd18713 100644 --- a/allways/validator/state_store.py +++ b/allways/validator/state_store.py @@ -583,8 +583,7 @@ def insert_collateral_event(self, block_num: int, hotkey: str, collateral_rao: i def load_all_collateral_events(self) -> List[dict]: rows = self._fetchall( - 'SELECT block_num, hotkey, collateral_rao FROM collateral_events ' - 'ORDER BY block_num ASC, id ASC' + 'SELECT block_num, hotkey, collateral_rao FROM collateral_events ORDER BY block_num ASC, id ASC' ) return [ {'block_num': r['block_num'], 'hotkey': r['hotkey'], 'collateral_rao': int(r['collateral_rao'])} diff --git a/tests/test_scoring_v1.py b/tests/test_scoring_v1.py index acaa063..f477185 100644 --- a/tests/test_scoring_v1.py +++ b/tests/test_scoring_v1.py @@ -2249,15 +2249,9 @@ def test_get_miner_collaterals_at_returns_latest(self, tmp_path: Path): the latest event at or before the queried block wins.""" store = ValidatorStateStore(db_path=tmp_path / 'state.db') watcher = make_watcher(store, active=set()) - watcher.apply_event( - 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 50, 'total': 100_000_000} - ) - watcher.apply_event( - 500, 'CollateralPosted', {'miner': 'hk_a', 'amount': 50, 'total': 250_000_000} - ) - watcher.apply_event( - 800, 'CollateralWithdrawn', {'miner': 'hk_a', 'amount': 50, 'remaining': 50_000_000} - ) + watcher.apply_event(100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 50, 'total': 100_000_000}) + watcher.apply_event(500, 'CollateralPosted', {'miner': 'hk_a', 'amount': 50, 'total': 250_000_000}) + watcher.apply_event(800, 'CollateralWithdrawn', {'miner': 'hk_a', 'amount': 50, 'remaining': 50_000_000}) assert watcher.get_miner_collaterals_at(50) == {} assert watcher.get_miner_collaterals_at(100) == {'hk_a': 100_000_000} assert watcher.get_miner_collaterals_at(499) == {'hk_a': 100_000_000} @@ -2273,9 +2267,7 @@ def test_swap_completed_deducts_fee_from_collateral_series(self, tmp_path: Path) to keep the replayed series in step with on-chain collateral.""" store = ValidatorStateStore(db_path=tmp_path / 'state.db') watcher = make_watcher(store, active={'hk_a'}) - watcher.apply_event( - 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 500_000_000} - ) + watcher.apply_event(100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 500_000_000}) watcher.apply_event(200, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) watcher.apply_event( 300, @@ -2291,9 +2283,7 @@ def test_swap_timed_out_deducts_slash_from_collateral_series(self, tmp_path: Pat capacity, not the pre-slash value.""" store = ValidatorStateStore(db_path=tmp_path / 'state.db') watcher = make_watcher(store, active={'hk_a'}) - watcher.apply_event( - 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 500_000_000} - ) + watcher.apply_event(100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 500_000_000}) watcher.apply_event(200, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) watcher.apply_event( 300, @@ -2310,18 +2300,10 @@ def test_prune_keeps_latest_collateral_event_per_hotkey(self, tmp_path: Path): correct value.""" store = ValidatorStateStore(db_path=tmp_path / 'state.db') watcher = make_watcher(store, active=set()) - watcher.apply_event( - 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 100_000_000} - ) - watcher.apply_event( - 200, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 200_000_000} - ) - watcher.apply_event( - 5_000, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 500_000_000} - ) - watcher.apply_event( - 50, 'CollateralPosted', {'miner': 'hk_b', 'amount': 0, 'total': 300_000_000} - ) + watcher.apply_event(100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 100_000_000}) + watcher.apply_event(200, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 200_000_000}) + watcher.apply_event(5_000, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 500_000_000}) + watcher.apply_event(50, 'CollateralPosted', {'miner': 'hk_b', 'amount': 0, 'total': 300_000_000}) # current_block=10_000, SCORING_WINDOW_BLOCKS=1200 → cutoff=8_800. watcher.prune_old_events(10_000) blocks_a = [ev.block for ev in watcher.collateral_events_by_hotkey['hk_a']] @@ -2337,12 +2319,8 @@ def test_collateral_events_persist_across_hydrate(self, tmp_path: Path): ``by_hotkey`` index match what was on disk.""" store = ValidatorStateStore(db_path=tmp_path / 'state.db') watcher = make_watcher(store, active={'hk_a'}) - watcher.apply_event( - 100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 100_000_000} - ) - watcher.apply_event( - 500, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 200_000_000} - ) + watcher.apply_event(100, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 100_000_000}) + watcher.apply_event(500, 'CollateralPosted', {'miner': 'hk_a', 'amount': 0, 'total': 200_000_000}) # Build a second watcher pointed at the same DB and hydrate. store.set_event_cursor(600) watcher2 = ContractEventWatcher(