Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.emeraldpay.dshackle.upstream.GenericSingleCallValidator
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService
Expand Down Expand Up @@ -95,26 +96,77 @@ object AvmChainSpecific : AbstractPollChainSpecific() {
options: Options,
config: ChainConfig,
): List<SingleValidator<ValidateUpstreamSettingsResult>> {
return emptyList()
if (chain.chainId.isBlank()) {
return emptyList()
}
return listOf(
GenericSingleCallValidator(
ChainRequest("GET#/genesis", RestParams.emptyParams()),
upstream,
) { data ->
validateGenesis(data, chain, upstream.getId())
},
)
}

override fun upstreamSettingsDetector(
chain: Chain,
upstream: Upstream,
): UpstreamSettingsDetector {
return AvmUpstreamSettingsDetector(upstream)
}
Comment on lines +113 to 117

override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService {
return AvmLowerBoundService(chain, upstream)
}

fun validateGenesis(data: ByteArray, chain: Chain, upstreamId: String): ValidateUpstreamSettingsResult {
val expected = chain.chainId.trim()
if (expected.isBlank()) {
return ValidateUpstreamSettingsResult.UPSTREAM_VALID
}
if (data.isEmpty()) {
log.warn("AVM node {} returned empty genesis response", upstreamId)
return ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR
}
val genesis = try {
Global.objectMapper.readValue(data, AvmGenesis::class.java)
} catch (e: Exception) {
log.warn("AVM node {} returned unparseable genesis payload: {}", upstreamId, e.message)
return ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR
}
val expectedNetwork = chain.chainName.substringAfterLast(" ").lowercase()
if (genesis.network.equals(expectedNetwork, ignoreCase = true)) {
return ValidateUpstreamSettingsResult.UPSTREAM_VALID
}
log.warn(
"AVM node {} chain mismatch: chain={} (expected network={}) but node reports network={} id={}",
upstreamId,
chain.chainName,
expectedNetwork,
genesis.network,
genesis.id,
)
return ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR
}

fun validate(data: ByteArray, upstreamId: String): UpstreamAvailability {
val status = Global.objectMapper.readValue(data, AvmStatus::class.java)
return if (status.catchupTime > 0L) {
if (status.lastRound == 0L) {
log.warn("AVM node {} reports no last-round", upstreamId)
return UpstreamAvailability.UNAVAILABLE
}
if (status.stoppedAtUnsupportedRound) {
log.warn("AVM node {} halted on an unsupported consensus round", upstreamId)
return UpstreamAvailability.UNAVAILABLE
}
if (status.catchupTime > 0L) {
log.warn("AVM node {} is catching up: catchupTime={}ns", upstreamId, status.catchupTime)
UpstreamAvailability.SYNCING
} else {
UpstreamAvailability.OK
return UpstreamAvailability.SYNCING
}
return UpstreamAvailability.OK
}

// Algorand JSON blocks encode 32-byte fields (seed, prev, txn) in base64.
// Decode to raw bytes; if decoding fails or the field is absent, fall back
// to a deterministic 32-byte encoding of the round number.
private fun toHashBytes(raw: String?, round: Long): ByteArray {
if (raw.isNullOrBlank()) {
return roundToBytes(round)
Expand Down Expand Up @@ -149,6 +201,7 @@ data class AvmStatus(
@param:JsonProperty("time-since-last-round") var timeSinceLastRound: Long = 0,
@param:JsonProperty("last-version") var lastVersion: String? = null,
@param:JsonProperty("next-version") var nextVersion: String? = null,
@param:JsonProperty("stopped-at-unsupported-round") var stoppedAtUnsupportedRound: Boolean = false,
)

@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService

class AvmLowerBoundService(
private val chain: Chain,
upstream: Upstream,
chain: Chain,
private val upstream: Upstream,
) : LowerBoundService(chain, upstream) {
override fun detectors(): List<LowerBoundDetector> {
return listOf(AvmLowerBoundStateDetector(chain))
return listOf(AvmLowerBoundStateDetector(upstream))
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,108 @@
package io.emeraldpay.dshackle.upstream.avm

import io.emeraldpay.dshackle.Chain
import com.fasterxml.jackson.databind.JsonNode
import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainCallError
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.lowerbound.detector.RecursiveLowerBound
import io.emeraldpay.dshackle.upstream.rpcclient.RestParams
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toFlux

class AvmLowerBoundStateDetector(
chain: Chain,
) : LowerBoundDetector(chain) {
private val upstream: Upstream,
) : LowerBoundDetector(upstream.getChain()) {

override fun period(): Long {
return 120
private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, notFoundErrors, lowerBounds)

companion object {
private val log = LoggerFactory.getLogger(AvmLowerBoundStateDetector::class.java)

val notFoundErrors = setOf(
"block not found",
"not available",
"does not have entry",
"failed to retrieve information",
"no information found",
)
}

override fun period(): Long = 60

override fun internalDetectLowerBound(): Flux<LowerBoundData> {
return Flux.just(LowerBoundData(1, LowerBoundType.STATE))
return recursiveLowerBound.recursiveDetectLowerBound { block ->
val round = if (block <= 0L) 1L else block
val params = RestParams(
headers = emptyList(),
queryParams = emptyList(),
pathParams = listOf(round.toString()),
payload = ByteArray(0),
)
upstream.getIngressReader()
.read(ChainRequest("GET#/v2/blocks/*/hash", params))
.timeout(Defaults.internalCallsTimeout)
.map { response -> interpretHashResponse(round, response) }
}
.switchIfEmpty(Mono.fromSupplier { cachedOrUnknown("recursive search returned no bound") })
.onErrorResume { err -> Mono.just(cachedOrUnknown(err.message ?: "unknown error")) }
.toFlux()
}

override fun types(): Set<LowerBoundType> = setOf(LowerBoundType.STATE, LowerBoundType.UNKNOWN)

private fun cachedOrUnknown(reason: String): LowerBoundData {
val cached = lowerBounds.getLastBound(LowerBoundType.STATE)
if (cached != null) {
log.debug(
"AVM upstream {} lower-bound search failed ({}); retaining cached STATE={}",
upstream.getId(),
reason,
cached.lowerBound,
)
return cached
}
log.warn(
"AVM upstream {} lower-bound search failed ({}) and no cache is available; emitting UNKNOWN",
upstream.getId(),
reason,
)
return LowerBoundData(0, LowerBoundType.UNKNOWN)
}

private fun interpretHashResponse(round: Long, response: ChainResponse): ChainResponse {
if (response.hasError()) {
return response
}
val raw = response.getResult()
if (raw.isEmpty()) {
return ChainResponse(null, ChainCallError(404, "empty body for round $round"))
}
val node = runCatching { Global.objectMapper.readTree(raw) }.getOrNull() ?: return response
val message = node.get("message")?.asText().orEmpty()
if (message.isNotBlank() && looksLikeNotFound(message)) {
return ChainResponse(null, ChainCallError(404, message))
}
if (!hasHashPayload(node)) {
return ChainResponse(null, ChainCallError(404, "round $round not available"))
}
return response
}

private fun looksLikeNotFound(message: String): Boolean {
val lower = message.lowercase()
return notFoundErrors.any { lower.contains(it) }
}

override fun types(): Set<LowerBoundType> {
return setOf(LowerBoundType.STATE)
private fun hasHashPayload(node: JsonNode): Boolean {
val hash = node.get("blockHash")?.asText().orEmpty()
return hash.isNotBlank()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.emeraldpay.dshackle.upstream.avm

import com.fasterxml.jackson.databind.JsonNode
import io.emeraldpay.dshackle.upstream.BasicUpstreamSettingsDetector
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.NodeTypeRequest
import io.emeraldpay.dshackle.upstream.UNKNOWN_CLIENT_VERSION
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.RestParams
import reactor.core.publisher.Flux

class AvmUpstreamSettingsDetector(
upstream: Upstream,
) : BasicUpstreamSettingsDetector(upstream) {

override fun internalDetectLabels(): Flux<Pair<String, String>> {
return Flux.merge(
detectNodeType(),
)
}

override fun clientVersionRequest(): ChainRequest =
ChainRequest("GET#/versions", RestParams.emptyParams())

override fun parseClientVersion(data: ByteArray): String {
if (data.isEmpty()) return UNKNOWN_CLIENT_VERSION
val node = runCatching { io.emeraldpay.dshackle.Global.objectMapper.readTree(data) }.getOrNull()
?: return UNKNOWN_CLIENT_VERSION
return clientVersion(node) ?: UNKNOWN_CLIENT_VERSION
}

override fun nodeTypeRequest(): NodeTypeRequest = NodeTypeRequest(clientVersionRequest())

override fun clientType(node: JsonNode): String = "algod"

override fun clientVersion(node: JsonNode): String {
val build = node.get("build") ?: return UNKNOWN_CLIENT_VERSION
val major = build.get("major")?.asInt(-1) ?: -1
val minor = build.get("minor")?.asInt(-1) ?: -1
val patch = build.get("build_number")?.asInt(-1) ?: -1
if (major < 0 || minor < 0 || patch < 0) {
return UNKNOWN_CLIENT_VERSION
}
return "$major.$minor.$patch"
Comment on lines +12 to +44
}
}
Loading