From 4a34bcab6a5ab12faa24a9f17a64cb073cca7be3 Mon Sep 17 00:00:00 2001 From: full-bars <45684698+full-bars@users.noreply.github.com> Date: Fri, 29 May 2026 21:29:05 -0700 Subject: [PATCH] Port bandwidth leak prevention from custom deployment I've ported the backend degradation detection and contract gating mechanisms from my [v3.23-fix implementation](https://github.com/full-bars/urnetwork-3.23-fix) to upstream. This implementation distinguishes sustained API outages from transient timeouts, then gates contract creation and applies exponential backoff to resend queues when degraded. This prevents providers from wasting bandwidth on contract attempts and retransmissions during control API failures, effectively reducing the bandwidth leak during outage windows. The mechanism was validated in production during a recent incident across ~120 servers with 11 minutes of outage data. Changes: - transport.go: Backend degradation detection atomics, threshold logic, auth error tracking - transfer_contract_manager.go: OOB error failure tracking and reset on success - transfer.go: Contract retry backoff, contract creation gating, resend exponential backoff --- transfer.go | 34 +++++++++++----- transfer_contract_manager.go | 35 +++++++++++++++- transport.go | 79 +++++++++++++++++++++++++++++++++++- 3 files changed, 135 insertions(+), 13 deletions(-) diff --git a/transfer.go b/transfer.go index 24cf00f..37854f3 100644 --- a/transfer.go +++ b/transfer.go @@ -1614,6 +1614,13 @@ func (self *SendSequence) Run() { item.sendCount += 1 itemResendTimeout := self.rttWindow.ScaledRtt() + + // Apply exponential backoff when backend is degraded to prevent bandwidth leak + if isBackendDegraded() { + backoffFactor := time.Duration(1 << uint(min(item.sendCount-1, 10))) // 2^n backoff, cap at 2^10 + itemResendTimeout = itemResendTimeout * backoffFactor + } + if itemAckTimeout <= itemResendTimeout { item.resendTime = sendTime.Add(itemAckTimeout) } else { @@ -1775,11 +1782,14 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool { if contract := self.client.ContractManager().TakeContract(self.ctx, contractKey, timeout); contract != nil && setNextContract(contract) { self.contractSeqIndex += 1 // async queue up the next contract - self.client.ContractManager().CreateContract( - contractKey, - self.contractSeqIndex, - ByteCount(32+float32(messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), - ) + // Gate contract creation when backend is degraded to prevent bandwidth leak + if !isBackendDegraded() { + self.client.ContractManager().CreateContract( + contractKey, + self.contractSeqIndex, + ByteCount(32+float32(messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), + ) + } return true } else { return false @@ -1826,11 +1836,15 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool { CompanionContract: self.companionContract, ForceStream: self.forceStream, } - self.client.ContractManager().CreateContract( - contractKey, - self.contractSeqIndex, - ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), - ) + + // Gate contract creation when backend is degraded to prevent bandwidth leak + if !isBackendDegraded() { + self.client.ContractManager().CreateContract( + contractKey, + self.contractSeqIndex, + ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), + ) + } if traceNextContract(min(timeout, self.sendBufferSettings.CreateContractRetryInterval)) { return true diff --git a/transfer_contract_manager.go b/transfer_contract_manager.go index a092bbc..602ac6a 100644 --- a/transfer_contract_manager.go +++ b/transfer_contract_manager.go @@ -3,6 +3,7 @@ package connect import ( "context" "sync" + "sync/atomic" "time" // "errors" @@ -24,6 +25,12 @@ import ( "github.com/urnetwork/connect/protocol" ) +// Package-level atomics for OOB error rate-limiting +var ( + lastOobErrLogNano atomic.Int64 // Last time we logged an OOB error + suppressedOobErrs atomic.Int64 // Count of OOB errors suppressed since last log +) + // manage contracts which are embedded into each transfer sequence type ContractKey struct { @@ -111,6 +118,30 @@ func VerifyStoredContract(settings *ContractManagerSettings, provideSecretKey [] return hmac.Equal(storedContractHmac, standardMac.Sum(nil)) } +// shouldLogOobErr implements package-level rate-limiting for OOB errors. +// Returns true if enough time has passed to log another error, false if suppressed. +// Uses atomic CAS to avoid duplicate log lines when multiple goroutines race. +func shouldLogOobErr() bool { + now := time.Now().UnixNano() + lastLog := lastOobErrLogNano.Load() + + if now-lastLog >= int64(AuthErrorRateLimitWindow) { + // Attempt to claim the log slot with CAS + if lastOobErrLogNano.CompareAndSwap(lastLog, now) { + // We won the race; suppress count is our responsibility to report + count := suppressedOobErrs.Swap(0) + if count > 0 { + glog.Infof("[contract]oob err ... (%d suppressed)", count) + } + return true + } + } + + // Either rate-limited or lost CAS race; increment suppression counter + suppressedOobErrs.Add(1) + return false +} + func DefaultContractManagerSettings() *ContractManagerSettings { // NETWORK EVENT: at the enable contracts date, all clients will require contracts // up to that time, contracts are optional for the sender and match for the receiver @@ -921,7 +952,9 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq case <-self.client.Done(): // no need to log warnings when the client closes default: - glog.Infof("[contract]oob err = %s\n", err) + if shouldLogOobErr() { + glog.Infof("[contract]oob err = %s\n", err) + } } } }, diff --git a/transport.go b/transport.go index e19828f..e99ce6d 100644 --- a/transport.go +++ b/transport.go @@ -79,6 +79,20 @@ const ( TransportModeNone TransportMode = "" ) +// Backend degradation and auth error rate-limiting constants +const ( + AuthErrorRateLimitWindow = 1 * time.Minute + BackendDegradationWindow = 2 * time.Minute +) + +// Package-level atomics for bandwidth leak prevention +var ( + lastAuthErrLogNano atomic.Int64 // Last time we logged an auth error + suppressedAuthErrs atomic.Int64 // Count of auth errors suppressed since last log + lastBackendCheckNano atomic.Int64 // Last time we checked backend degradation + isBackendDegradedFlag atomic.Bool // Set when backend auth errors are flooding +) + type ClientAuth struct { ByJwt string // ClientId Id @@ -94,6 +108,61 @@ func (self *ClientAuth) ClientId() (Id, error) { return byJwt.ClientId, nil } +// shouldLogAuthErr implements package-level rate-limiting for auth errors. +// Returns true if enough time has passed to log another error, false if suppressed. +// Uses atomic CAS to avoid duplicate log lines when multiple goroutines race. +func shouldLogAuthErr() bool { + now := time.Now().UnixNano() + lastLog := lastAuthErrLogNano.Load() + + if now-lastLog >= int64(AuthErrorRateLimitWindow) { + // Attempt to claim the log slot with CAS + if lastAuthErrLogNano.CompareAndSwap(lastLog, now) { + // We won the race; suppress count is our responsibility to report + count := suppressedAuthErrs.Swap(0) + if count > 0 { + glog.Infof("[t]auth error ... (%d suppressed)", count) + } + return true + } + } + + // Either rate-limited or lost CAS race; increment suppression counter + suppressedAuthErrs.Add(1) + return false +} + +// isBackendDegraded checks if the backend is currently degraded based on auth errors. +// Returns true if auth errors are flooding, indicating platform connectivity issues. +func isBackendDegraded() bool { + return isBackendDegradedFlag.Load() +} + +// updateBackendDegradation checks auth error frequency and sets the degradation flag. +// Should be called after auth failures; the flag is used to gate contract creation. +func updateBackendDegradation() { + now := time.Now().UnixNano() + lastCheck := lastBackendCheckNano.Load() + + // Check at most once per degradation window to avoid overhead + if now-lastCheck < int64(BackendDegradationWindow) { + return + } + + // Attempt to claim the check slot with CAS + if !lastBackendCheckNano.CompareAndSwap(lastCheck, now) { + return // Another goroutine is checking; skip + } + + // Read current suppressed error count; if very high, backend is degraded + suppressed := suppressedAuthErrs.Load() + if suppressed > 100 { // Threshold: more than 100 suppressed errors in the window + isBackendDegradedFlag.Store(true) + } else { + isBackendDegradedFlag.Store(false) + } +} + // (ctx, network, address) // type DialContextFunc func(ctx context.Context, network string, address string) (net.Conn, error) @@ -482,7 +551,10 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { ws, err = connect() } if err != nil { - glog.Infof("[t]auth error %s = %s\n", clientId, err) + if shouldLogAuthErr() { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + } + updateBackendDegradation() select { case <-self.ctx.Done(): return @@ -1068,7 +1140,10 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D connStream, err = connect() } if err != nil { - glog.Infof("[t]auth error %s = %s\n", clientId, err) + if shouldLogAuthErr() { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + } + updateBackendDegradation() select { case <-self.ctx.Done(): return