diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmChainSpecific.kt index 955e44ef..fa49bf70 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmChainSpecific.kt @@ -14,6 +14,7 @@ import io.emeraldpay.dshackle.upstream.GenericSingleCallValidator import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService @@ -95,26 +96,77 @@ object AvmChainSpecific : AbstractPollChainSpecific() { options: Options, config: ChainConfig, ): List> { - return emptyList() + if (chain.chainId.isBlank()) { + return emptyList() + } + return listOf( + GenericSingleCallValidator( + ChainRequest("GET#/genesis", RestParams.emptyParams()), + upstream, + ) { data -> + validateGenesis(data, chain, upstream.getId()) + }, + ) + } + + override fun upstreamSettingsDetector( + chain: Chain, + upstream: Upstream, + ): UpstreamSettingsDetector { + return AvmUpstreamSettingsDetector(upstream) } override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService { return AvmLowerBoundService(chain, upstream) } + fun validateGenesis(data: ByteArray, chain: Chain, upstreamId: String): ValidateUpstreamSettingsResult { + val expected = chain.chainId.trim() + if (expected.isBlank()) { + return ValidateUpstreamSettingsResult.UPSTREAM_VALID + } + if (data.isEmpty()) { + log.warn("AVM node {} returned empty genesis response", upstreamId) + return ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR + } + val genesis = try { + Global.objectMapper.readValue(data, AvmGenesis::class.java) + } catch (e: Exception) { + log.warn("AVM node {} returned unparseable genesis payload: {}", upstreamId, e.message) + return ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR + } + val expectedNetwork = chain.chainName.substringAfterLast(" ").lowercase() + if (genesis.network.equals(expectedNetwork, ignoreCase = true)) { + return ValidateUpstreamSettingsResult.UPSTREAM_VALID + } + log.warn( + "AVM node {} chain mismatch: chain={} (expected network={}) but node reports network={} id={}", + upstreamId, + chain.chainName, + expectedNetwork, + genesis.network, + genesis.id, + ) + return ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR + } + fun validate(data: ByteArray, upstreamId: String): UpstreamAvailability { val status = Global.objectMapper.readValue(data, AvmStatus::class.java) - return if (status.catchupTime > 0L) { + if (status.lastRound == 0L) { + log.warn("AVM node {} reports no last-round", upstreamId) + return UpstreamAvailability.UNAVAILABLE + } + if (status.stoppedAtUnsupportedRound) { + log.warn("AVM node {} halted on an unsupported consensus round", upstreamId) + return UpstreamAvailability.UNAVAILABLE + } + if (status.catchupTime > 0L) { log.warn("AVM node {} is catching up: catchupTime={}ns", upstreamId, status.catchupTime) - UpstreamAvailability.SYNCING - } else { - UpstreamAvailability.OK + return UpstreamAvailability.SYNCING } + return UpstreamAvailability.OK } - // Algorand JSON blocks encode 32-byte fields (seed, prev, txn) in base64. - // Decode to raw bytes; if decoding fails or the field is absent, fall back - // to a deterministic 32-byte encoding of the round number. private fun toHashBytes(raw: String?, round: Long): ByteArray { if (raw.isNullOrBlank()) { return roundToBytes(round) @@ -149,6 +201,7 @@ data class AvmStatus( @param:JsonProperty("time-since-last-round") var timeSinceLastRound: Long = 0, @param:JsonProperty("last-version") var lastVersion: String? = null, @param:JsonProperty("next-version") var nextVersion: String? = null, + @param:JsonProperty("stopped-at-unsupported-round") var stoppedAtUnsupportedRound: Boolean = false, ) @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmLowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmLowerBoundService.kt index 6fe24d36..d264e55e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmLowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmLowerBoundService.kt @@ -6,10 +6,10 @@ import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService class AvmLowerBoundService( - private val chain: Chain, - upstream: Upstream, + chain: Chain, + private val upstream: Upstream, ) : LowerBoundService(chain, upstream) { override fun detectors(): List { - return listOf(AvmLowerBoundStateDetector(chain)) + return listOf(AvmLowerBoundStateDetector(upstream)) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmLowerBoundStateDetector.kt index c83e8ab2..6e2d1eac 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmLowerBoundStateDetector.kt @@ -1,24 +1,108 @@ package io.emeraldpay.dshackle.upstream.avm -import io.emeraldpay.dshackle.Chain +import com.fasterxml.jackson.databind.JsonNode +import io.emeraldpay.dshackle.Defaults +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.upstream.ChainCallError +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.lowerbound.detector.RecursiveLowerBound +import io.emeraldpay.dshackle.upstream.rpcclient.RestParams +import org.slf4j.LoggerFactory import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.kotlin.core.publisher.toFlux class AvmLowerBoundStateDetector( - chain: Chain, -) : LowerBoundDetector(chain) { + private val upstream: Upstream, +) : LowerBoundDetector(upstream.getChain()) { - override fun period(): Long { - return 120 + private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, notFoundErrors, lowerBounds) + + companion object { + private val log = LoggerFactory.getLogger(AvmLowerBoundStateDetector::class.java) + + val notFoundErrors = setOf( + "block not found", + "not available", + "does not have entry", + "failed to retrieve information", + "no information found", + ) } + override fun period(): Long = 60 + override fun internalDetectLowerBound(): Flux { - return Flux.just(LowerBoundData(1, LowerBoundType.STATE)) + return recursiveLowerBound.recursiveDetectLowerBound { block -> + val round = if (block <= 0L) 1L else block + val params = RestParams( + headers = emptyList(), + queryParams = emptyList(), + pathParams = listOf(round.toString()), + payload = ByteArray(0), + ) + upstream.getIngressReader() + .read(ChainRequest("GET#/v2/blocks/*/hash", params)) + .timeout(Defaults.internalCallsTimeout) + .map { response -> interpretHashResponse(round, response) } + } + .switchIfEmpty(Mono.fromSupplier { cachedOrUnknown("recursive search returned no bound") }) + .onErrorResume { err -> Mono.just(cachedOrUnknown(err.message ?: "unknown error")) } + .toFlux() + } + + override fun types(): Set = setOf(LowerBoundType.STATE, LowerBoundType.UNKNOWN) + + private fun cachedOrUnknown(reason: String): LowerBoundData { + val cached = lowerBounds.getLastBound(LowerBoundType.STATE) + if (cached != null) { + log.debug( + "AVM upstream {} lower-bound search failed ({}); retaining cached STATE={}", + upstream.getId(), + reason, + cached.lowerBound, + ) + return cached + } + log.warn( + "AVM upstream {} lower-bound search failed ({}) and no cache is available; emitting UNKNOWN", + upstream.getId(), + reason, + ) + return LowerBoundData(0, LowerBoundType.UNKNOWN) + } + + private fun interpretHashResponse(round: Long, response: ChainResponse): ChainResponse { + if (response.hasError()) { + return response + } + val raw = response.getResult() + if (raw.isEmpty()) { + return ChainResponse(null, ChainCallError(404, "empty body for round $round")) + } + val node = runCatching { Global.objectMapper.readTree(raw) }.getOrNull() ?: return response + val message = node.get("message")?.asText().orEmpty() + if (message.isNotBlank() && looksLikeNotFound(message)) { + return ChainResponse(null, ChainCallError(404, message)) + } + if (!hasHashPayload(node)) { + return ChainResponse(null, ChainCallError(404, "round $round not available")) + } + return response + } + + private fun looksLikeNotFound(message: String): Boolean { + val lower = message.lowercase() + return notFoundErrors.any { lower.contains(it) } } - override fun types(): Set { - return setOf(LowerBoundType.STATE) + private fun hasHashPayload(node: JsonNode): Boolean { + val hash = node.get("blockHash")?.asText().orEmpty() + return hash.isNotBlank() } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmUpstreamSettingsDetector.kt new file mode 100644 index 00000000..4ef66a5c --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/avm/AvmUpstreamSettingsDetector.kt @@ -0,0 +1,46 @@ +package io.emeraldpay.dshackle.upstream.avm + +import com.fasterxml.jackson.databind.JsonNode +import io.emeraldpay.dshackle.upstream.BasicUpstreamSettingsDetector +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.NodeTypeRequest +import io.emeraldpay.dshackle.upstream.UNKNOWN_CLIENT_VERSION +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.rpcclient.RestParams +import reactor.core.publisher.Flux + +class AvmUpstreamSettingsDetector( + upstream: Upstream, +) : BasicUpstreamSettingsDetector(upstream) { + + override fun internalDetectLabels(): Flux> { + return Flux.merge( + detectNodeType(), + ) + } + + override fun clientVersionRequest(): ChainRequest = + ChainRequest("GET#/versions", RestParams.emptyParams()) + + override fun parseClientVersion(data: ByteArray): String { + if (data.isEmpty()) return UNKNOWN_CLIENT_VERSION + val node = runCatching { io.emeraldpay.dshackle.Global.objectMapper.readTree(data) }.getOrNull() + ?: return UNKNOWN_CLIENT_VERSION + return clientVersion(node) ?: UNKNOWN_CLIENT_VERSION + } + + override fun nodeTypeRequest(): NodeTypeRequest = NodeTypeRequest(clientVersionRequest()) + + override fun clientType(node: JsonNode): String = "algod" + + override fun clientVersion(node: JsonNode): String { + val build = node.get("build") ?: return UNKNOWN_CLIENT_VERSION + val major = build.get("major")?.asInt(-1) ?: -1 + val minor = build.get("minor")?.asInt(-1) ?: -1 + val patch = build.get("build_number")?.asInt(-1) ?: -1 + if (major < 0 || minor < 0 || patch < 0) { + return UNKNOWN_CLIENT_VERSION + } + return "$major.$minor.$patch" + } +}