diff --git a/allways/contract_client.py b/allways/contract_client.py index 64d5905..672a4cb 100644 --- a/allways/contract_client.py +++ b/allways/contract_client.py @@ -351,6 +351,7 @@ def __init__( contract_address: Optional[str] = None, subtensor: Optional[bt.Subtensor] = None, reconnect_subtensor: Optional[Callable[[], None]] = None, + substrate_lock: Optional[Any] = None, ): self.contract_address = contract_address or get_contract_address() or '' self.subtensor = subtensor @@ -362,8 +363,9 @@ def __init__( self.readonly_keypair = Keypair.create_from_uri('//Alice') self.initialized = False # substrate-interface's WebSocketProvider isn't thread-safe; serialize - # access so concurrent threads can't both land in recv at the same time. - self._substrate_lock = threading.Lock() + # access so concurrent threads can't both land in recv. Callers sharing + # this subtensor elsewhere pass that path's lock to serialize as one. + self._substrate_lock = substrate_lock or threading.Lock() if not self.contract_address: bt.logging.warning('Allways contract address not set') diff --git a/allways/validator/bounds_cache.py b/allways/validator/bounds_cache.py index 980caa7..83c5289 100644 --- a/allways/validator/bounds_cache.py +++ b/allways/validator/bounds_cache.py @@ -6,7 +6,8 @@ toggle), just shorter TTL because the dashboard wants quick freshness. """ -from typing import Any, Callable +import threading +from typing import Any, Callable, Optional import bittensor as bt @@ -21,19 +22,28 @@ class BoundsCache: # unhalt. 5 blocks (~60s) keeps the worst-case lag tolerable. HALT_TTL_BLOCKS = 5 - def __init__(self, contract: AllwaysContractClient, get_block: Callable[[], int]): + def __init__( + self, + contract: AllwaysContractClient, + get_block: Callable[[], int], + lock: Optional[Any] = None, + ): self._contract = contract self._get_block = get_block self._cache: dict[str, tuple[int, Any]] = {} + # get_block and the reads share the caller's subtensor websocket; hold + # the connection's lock so a shared thread can't race us into recv. + self._lock = lock or threading.Lock() def _cached(self, key: str, read: Callable[[], int]) -> int: - now = self._get_block() - entry = self._cache.get(key) - if entry is not None and now - entry[0] < self.TTL_BLOCKS: - return entry[1] - value = read() - self._cache[key] = (now, value) - return value + with self._lock: + now = self._get_block() + entry = self._cache.get(key) + if entry is not None and now - entry[0] < self.TTL_BLOCKS: + return entry[1] + value = read() + self._cache[key] = (now, value) + return value def min_collateral(self) -> int: return self._cached('min_collateral', self._contract.get_min_collateral) @@ -51,14 +61,15 @@ def halted(self) -> bool: flakes don't churn the writer) or False (no prior cache → assume not halted, matching scoring.contract_is_halted's fail-open behavior so a flaky RPC can't zero every miner's reward).""" - now = self._get_block() - entry = self._cache.get('halted') - if entry is not None and now - entry[0] < self.HALT_TTL_BLOCKS: - return bool(entry[1]) - try: - value = bool(self._contract.get_halted()) - self._cache['halted'] = (now, value) - return value - except Exception as e: - bt.logging.warning(f'halt RPC check failed: {e}') - return bool(entry[1]) if entry is not None else False + with self._lock: + now = self._get_block() + entry = self._cache.get('halted') + if entry is not None and now - entry[0] < self.HALT_TTL_BLOCKS: + return bool(entry[1]) + try: + value = bool(self._contract.get_halted()) + self._cache['halted'] = (now, value) + return value + except Exception as e: + bt.logging.warning(f'halt RPC check failed: {e}') + return bool(entry[1]) if entry is not None else False diff --git a/neurons/validator.py b/neurons/validator.py index 62a266e..9b58faa 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -154,22 +154,23 @@ def __init__(self, config=None): ) # Separate subtensor/contract/providers for axon handlers (thread safety). - # axon_lock serialises substrate websocket calls across handler threads - # to prevent "cannot call recv while another coroutine is already running recv" errors. - self.axon_lock = threading.Lock() + # axon_lock serialises every call on axon_subtensor's websocket so two + # threads can't both land in recv. Reentrant: handlers hold it, then + # nest a bounds_cache read (which re-acquires it via axon_contract_client). + self.axon_lock = threading.RLock() self.axon_subtensor = bt.Subtensor(config=self.config) self.axon_contract_client = AllwaysContractClient( subtensor=self.axon_subtensor, reconnect_subtensor=self.reconnect_axon_subtensor, + substrate_lock=self.axon_lock, ) self.axon_chain_providers = create_chain_providers(subtensor=self.axon_subtensor) - # Must read the current block via axon_subtensor — the block getter on - # self (self.block) goes through self.subtensor, which the forward loop - # is already using; concurrent axon + forward reads collide on the same - # websocket and raise ConcurrencyError. + # Read block/bounds via axon_subtensor; the forward loop calls this too, + # so it shares axon_lock rather than colliding with handler threads. self.bounds_cache = BoundsCache( self.axon_contract_client, self.axon_subtensor.get_current_block, + lock=self.axon_lock, ) # Attach synapse handlers to axon