Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions allways/contract_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def __init__(
contract_address: Optional[str] = None,
subtensor: Optional[bt.Subtensor] = None,
reconnect_subtensor: Optional[Callable[[], None]] = None,
substrate_lock: Optional[Any] = None,
):
self.contract_address = contract_address or get_contract_address() or ''
self.subtensor = subtensor
Expand All @@ -362,8 +363,9 @@ def __init__(
self.readonly_keypair = Keypair.create_from_uri('//Alice')
self.initialized = False
# substrate-interface's WebSocketProvider isn't thread-safe; serialize
# access so concurrent threads can't both land in recv at the same time.
self._substrate_lock = threading.Lock()
# access so concurrent threads can't both land in recv. Callers sharing
# this subtensor elsewhere pass that path's lock to serialize as one.
self._substrate_lock = substrate_lock or threading.Lock()

if not self.contract_address:
bt.logging.warning('Allways contract address not set')
Expand Down
51 changes: 31 additions & 20 deletions allways/validator/bounds_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
toggle), just shorter TTL because the dashboard wants quick freshness.
"""

from typing import Any, Callable
import threading
from typing import Any, Callable, Optional

import bittensor as bt

Expand All @@ -21,19 +22,28 @@ class BoundsCache:
# unhalt. 5 blocks (~60s) keeps the worst-case lag tolerable.
HALT_TTL_BLOCKS = 5

def __init__(self, contract: AllwaysContractClient, get_block: Callable[[], int]):
def __init__(
self,
contract: AllwaysContractClient,
get_block: Callable[[], int],
lock: Optional[Any] = None,
):
self._contract = contract
self._get_block = get_block
self._cache: dict[str, tuple[int, Any]] = {}
# get_block and the reads share the caller's subtensor websocket; hold
# the connection's lock so a shared thread can't race us into recv.
self._lock = lock or threading.Lock()

def _cached(self, key: str, read: Callable[[], int]) -> int:
now = self._get_block()
entry = self._cache.get(key)
if entry is not None and now - entry[0] < self.TTL_BLOCKS:
return entry[1]
value = read()
self._cache[key] = (now, value)
return value
with self._lock:
now = self._get_block()
entry = self._cache.get(key)
if entry is not None and now - entry[0] < self.TTL_BLOCKS:
return entry[1]
value = read()
self._cache[key] = (now, value)
return value

def min_collateral(self) -> int:
return self._cached('min_collateral', self._contract.get_min_collateral)
Expand All @@ -51,14 +61,15 @@ def halted(self) -> bool:
flakes don't churn the writer) or False (no prior cache → assume
not halted, matching scoring.contract_is_halted's fail-open
behavior so a flaky RPC can't zero every miner's reward)."""
now = self._get_block()
entry = self._cache.get('halted')
if entry is not None and now - entry[0] < self.HALT_TTL_BLOCKS:
return bool(entry[1])
try:
value = bool(self._contract.get_halted())
self._cache['halted'] = (now, value)
return value
except Exception as e:
bt.logging.warning(f'halt RPC check failed: {e}')
return bool(entry[1]) if entry is not None else False
with self._lock:
now = self._get_block()
entry = self._cache.get('halted')
if entry is not None and now - entry[0] < self.HALT_TTL_BLOCKS:
return bool(entry[1])
try:
value = bool(self._contract.get_halted())
self._cache['halted'] = (now, value)
return value
except Exception as e:
bt.logging.warning(f'halt RPC check failed: {e}')
return bool(entry[1]) if entry is not None else False
15 changes: 8 additions & 7 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,23 @@ def __init__(self, config=None):
)

# Separate subtensor/contract/providers for axon handlers (thread safety).
# axon_lock serialises substrate websocket calls across handler threads
# to prevent "cannot call recv while another coroutine is already running recv" errors.
self.axon_lock = threading.Lock()
# axon_lock serialises every call on axon_subtensor's websocket so two
# threads can't both land in recv. Reentrant: handlers hold it, then
# nest a bounds_cache read (which re-acquires it via axon_contract_client).
self.axon_lock = threading.RLock()
self.axon_subtensor = bt.Subtensor(config=self.config)
self.axon_contract_client = AllwaysContractClient(
subtensor=self.axon_subtensor,
reconnect_subtensor=self.reconnect_axon_subtensor,
substrate_lock=self.axon_lock,
)
self.axon_chain_providers = create_chain_providers(subtensor=self.axon_subtensor)
# Must read the current block via axon_subtensor — the block getter on
# self (self.block) goes through self.subtensor, which the forward loop
# is already using; concurrent axon + forward reads collide on the same
# websocket and raise ConcurrencyError.
# Read block/bounds via axon_subtensor; the forward loop calls this too,
# so it shares axon_lock rather than colliding with handler threads.
self.bounds_cache = BoundsCache(
self.axon_contract_client,
self.axon_subtensor.get_current_block,
lock=self.axon_lock,
)

# Attach synapse handlers to axon
Expand Down
Loading