diff --git a/transfer.go b/transfer.go index 24cf00f..37854f3 100644 --- a/transfer.go +++ b/transfer.go @@ -1614,6 +1614,13 @@ func (self *SendSequence) Run() { item.sendCount += 1 itemResendTimeout := self.rttWindow.ScaledRtt() + + // Apply exponential backoff when backend is degraded to prevent bandwidth leak + if isBackendDegraded() { + backoffFactor := time.Duration(1 << uint(min(item.sendCount-1, 10))) // 2^n backoff, cap at 2^10 + itemResendTimeout = itemResendTimeout * backoffFactor + } + if itemAckTimeout <= itemResendTimeout { item.resendTime = sendTime.Add(itemAckTimeout) } else { @@ -1775,11 +1782,14 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool { if contract := self.client.ContractManager().TakeContract(self.ctx, contractKey, timeout); contract != nil && setNextContract(contract) { self.contractSeqIndex += 1 // async queue up the next contract - self.client.ContractManager().CreateContract( - contractKey, - self.contractSeqIndex, - ByteCount(32+float32(messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), - ) + // Gate contract creation when backend is degraded to prevent bandwidth leak + if !isBackendDegraded() { + self.client.ContractManager().CreateContract( + contractKey, + self.contractSeqIndex, + ByteCount(32+float32(messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), + ) + } return true } else { return false @@ -1826,11 +1836,15 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool { CompanionContract: self.companionContract, ForceStream: self.forceStream, } - self.client.ContractManager().CreateContract( - contractKey, - self.contractSeqIndex, - ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), - ) + + // Gate contract creation when backend is degraded to prevent bandwidth leak + if !isBackendDegraded() { + self.client.ContractManager().CreateContract( + contractKey, + self.contractSeqIndex, + ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), + ) + } if traceNextContract(min(timeout, self.sendBufferSettings.CreateContractRetryInterval)) { return true diff --git a/transfer_contract_manager.go b/transfer_contract_manager.go index a092bbc..602ac6a 100644 --- a/transfer_contract_manager.go +++ b/transfer_contract_manager.go @@ -3,6 +3,7 @@ package connect import ( "context" "sync" + "sync/atomic" "time" // "errors" @@ -24,6 +25,12 @@ import ( "github.com/urnetwork/connect/protocol" ) +// Package-level atomics for OOB error rate-limiting +var ( + lastOobErrLogNano atomic.Int64 // Last time we logged an OOB error + suppressedOobErrs atomic.Int64 // Count of OOB errors suppressed since last log +) + // manage contracts which are embedded into each transfer sequence type ContractKey struct { @@ -111,6 +118,30 @@ func VerifyStoredContract(settings *ContractManagerSettings, provideSecretKey [] return hmac.Equal(storedContractHmac, standardMac.Sum(nil)) } +// shouldLogOobErr implements package-level rate-limiting for OOB errors. +// Returns true if enough time has passed to log another error, false if suppressed. +// Uses atomic CAS to avoid duplicate log lines when multiple goroutines race. +func shouldLogOobErr() bool { + now := time.Now().UnixNano() + lastLog := lastOobErrLogNano.Load() + + if now-lastLog >= int64(AuthErrorRateLimitWindow) { + // Attempt to claim the log slot with CAS + if lastOobErrLogNano.CompareAndSwap(lastLog, now) { + // We won the race; suppress count is our responsibility to report + count := suppressedOobErrs.Swap(0) + if count > 0 { + glog.Infof("[contract]oob err ... (%d suppressed)", count) + } + return true + } + } + + // Either rate-limited or lost CAS race; increment suppression counter + suppressedOobErrs.Add(1) + return false +} + func DefaultContractManagerSettings() *ContractManagerSettings { // NETWORK EVENT: at the enable contracts date, all clients will require contracts // up to that time, contracts are optional for the sender and match for the receiver @@ -921,7 +952,9 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq case <-self.client.Done(): // no need to log warnings when the client closes default: - glog.Infof("[contract]oob err = %s\n", err) + if shouldLogOobErr() { + glog.Infof("[contract]oob err = %s\n", err) + } } } }, diff --git a/transport.go b/transport.go index e19828f..e99ce6d 100644 --- a/transport.go +++ b/transport.go @@ -79,6 +79,20 @@ const ( TransportModeNone TransportMode = "" ) +// Backend degradation and auth error rate-limiting constants +const ( + AuthErrorRateLimitWindow = 1 * time.Minute + BackendDegradationWindow = 2 * time.Minute +) + +// Package-level atomics for bandwidth leak prevention +var ( + lastAuthErrLogNano atomic.Int64 // Last time we logged an auth error + suppressedAuthErrs atomic.Int64 // Count of auth errors suppressed since last log + lastBackendCheckNano atomic.Int64 // Last time we checked backend degradation + isBackendDegradedFlag atomic.Bool // Set when backend auth errors are flooding +) + type ClientAuth struct { ByJwt string // ClientId Id @@ -94,6 +108,61 @@ func (self *ClientAuth) ClientId() (Id, error) { return byJwt.ClientId, nil } +// shouldLogAuthErr implements package-level rate-limiting for auth errors. +// Returns true if enough time has passed to log another error, false if suppressed. +// Uses atomic CAS to avoid duplicate log lines when multiple goroutines race. +func shouldLogAuthErr() bool { + now := time.Now().UnixNano() + lastLog := lastAuthErrLogNano.Load() + + if now-lastLog >= int64(AuthErrorRateLimitWindow) { + // Attempt to claim the log slot with CAS + if lastAuthErrLogNano.CompareAndSwap(lastLog, now) { + // We won the race; suppress count is our responsibility to report + count := suppressedAuthErrs.Swap(0) + if count > 0 { + glog.Infof("[t]auth error ... (%d suppressed)", count) + } + return true + } + } + + // Either rate-limited or lost CAS race; increment suppression counter + suppressedAuthErrs.Add(1) + return false +} + +// isBackendDegraded checks if the backend is currently degraded based on auth errors. +// Returns true if auth errors are flooding, indicating platform connectivity issues. +func isBackendDegraded() bool { + return isBackendDegradedFlag.Load() +} + +// updateBackendDegradation checks auth error frequency and sets the degradation flag. +// Should be called after auth failures; the flag is used to gate contract creation. +func updateBackendDegradation() { + now := time.Now().UnixNano() + lastCheck := lastBackendCheckNano.Load() + + // Check at most once per degradation window to avoid overhead + if now-lastCheck < int64(BackendDegradationWindow) { + return + } + + // Attempt to claim the check slot with CAS + if !lastBackendCheckNano.CompareAndSwap(lastCheck, now) { + return // Another goroutine is checking; skip + } + + // Read current suppressed error count; if very high, backend is degraded + suppressed := suppressedAuthErrs.Load() + if suppressed > 100 { // Threshold: more than 100 suppressed errors in the window + isBackendDegradedFlag.Store(true) + } else { + isBackendDegradedFlag.Store(false) + } +} + // (ctx, network, address) // type DialContextFunc func(ctx context.Context, network string, address string) (net.Conn, error) @@ -482,7 +551,10 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { ws, err = connect() } if err != nil { - glog.Infof("[t]auth error %s = %s\n", clientId, err) + if shouldLogAuthErr() { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + } + updateBackendDegradation() select { case <-self.ctx.Done(): return @@ -1068,7 +1140,10 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D connStream, err = connect() } if err != nil { - glog.Infof("[t]auth error %s = %s\n", clientId, err) + if shouldLogAuthErr() { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + } + updateBackendDegradation() select { case <-self.ctx.Done(): return