Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,13 @@ func (self *SendSequence) Run() {

item.sendCount += 1
itemResendTimeout := self.rttWindow.ScaledRtt()

// Apply exponential backoff when backend is degraded to prevent bandwidth leak
if isBackendDegraded() {
backoffFactor := time.Duration(1 << uint(min(item.sendCount-1, 10))) // 2^n backoff, cap at 2^10
itemResendTimeout = itemResendTimeout * backoffFactor
}

if itemAckTimeout <= itemResendTimeout {
item.resendTime = sendTime.Add(itemAckTimeout)
} else {
Expand Down Expand Up @@ -1775,11 +1782,14 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool {
if contract := self.client.ContractManager().TakeContract(self.ctx, contractKey, timeout); contract != nil && setNextContract(contract) {
self.contractSeqIndex += 1
// async queue up the next contract
self.client.ContractManager().CreateContract(
contractKey,
self.contractSeqIndex,
ByteCount(32+float32(messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction),
)
// Gate contract creation when backend is degraded to prevent bandwidth leak
if !isBackendDegraded() {
self.client.ContractManager().CreateContract(
contractKey,
self.contractSeqIndex,
ByteCount(32+float32(messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction),
)
}
return true
} else {
return false
Expand Down Expand Up @@ -1826,11 +1836,15 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool {
CompanionContract: self.companionContract,
ForceStream: self.forceStream,
}
self.client.ContractManager().CreateContract(
contractKey,
self.contractSeqIndex,
ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction),
)

// Gate contract creation when backend is degraded to prevent bandwidth leak
if !isBackendDegraded() {
self.client.ContractManager().CreateContract(
contractKey,
self.contractSeqIndex,
ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction),
)
}

if traceNextContract(min(timeout, self.sendBufferSettings.CreateContractRetryInterval)) {
return true
Expand Down
35 changes: 34 additions & 1 deletion transfer_contract_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connect
import (
"context"
"sync"
"sync/atomic"
"time"

// "errors"
Expand All @@ -24,6 +25,12 @@ import (
"github.com/urnetwork/connect/protocol"
)

// Package-level atomics for OOB error rate-limiting
var (
lastOobErrLogNano atomic.Int64 // Last time we logged an OOB error
suppressedOobErrs atomic.Int64 // Count of OOB errors suppressed since last log
)

// manage contracts which are embedded into each transfer sequence

type ContractKey struct {
Expand Down Expand Up @@ -111,6 +118,30 @@ func VerifyStoredContract(settings *ContractManagerSettings, provideSecretKey []
return hmac.Equal(storedContractHmac, standardMac.Sum(nil))
}

// shouldLogOobErr implements package-level rate-limiting for OOB errors.
// Returns true if enough time has passed to log another error, false if suppressed.
// Uses atomic CAS to avoid duplicate log lines when multiple goroutines race.
func shouldLogOobErr() bool {
now := time.Now().UnixNano()
lastLog := lastOobErrLogNano.Load()

if now-lastLog >= int64(AuthErrorRateLimitWindow) {
// Attempt to claim the log slot with CAS
if lastOobErrLogNano.CompareAndSwap(lastLog, now) {
// We won the race; suppress count is our responsibility to report
count := suppressedOobErrs.Swap(0)
if count > 0 {
glog.Infof("[contract]oob err ... (%d suppressed)", count)
}
return true
}
}

// Either rate-limited or lost CAS race; increment suppression counter
suppressedOobErrs.Add(1)
return false
}

func DefaultContractManagerSettings() *ContractManagerSettings {
// NETWORK EVENT: at the enable contracts date, all clients will require contracts
// up to that time, contracts are optional for the sender and match for the receiver
Expand Down Expand Up @@ -921,7 +952,9 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq
case <-self.client.Done():
// no need to log warnings when the client closes
default:
glog.Infof("[contract]oob err = %s\n", err)
if shouldLogOobErr() {
glog.Infof("[contract]oob err = %s\n", err)
}
}
}
},
Expand Down
79 changes: 77 additions & 2 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ const (
TransportModeNone TransportMode = ""
)

// Backend degradation and auth error rate-limiting constants
const (
AuthErrorRateLimitWindow = 1 * time.Minute
BackendDegradationWindow = 2 * time.Minute
)

// Package-level atomics for bandwidth leak prevention
var (
lastAuthErrLogNano atomic.Int64 // Last time we logged an auth error
suppressedAuthErrs atomic.Int64 // Count of auth errors suppressed since last log
lastBackendCheckNano atomic.Int64 // Last time we checked backend degradation
isBackendDegradedFlag atomic.Bool // Set when backend auth errors are flooding
)

type ClientAuth struct {
ByJwt string
// ClientId Id
Expand All @@ -94,6 +108,61 @@ func (self *ClientAuth) ClientId() (Id, error) {
return byJwt.ClientId, nil
}

// shouldLogAuthErr implements package-level rate-limiting for auth errors.
// Returns true if enough time has passed to log another error, false if suppressed.
// Uses atomic CAS to avoid duplicate log lines when multiple goroutines race.
func shouldLogAuthErr() bool {
now := time.Now().UnixNano()
lastLog := lastAuthErrLogNano.Load()

if now-lastLog >= int64(AuthErrorRateLimitWindow) {
// Attempt to claim the log slot with CAS
if lastAuthErrLogNano.CompareAndSwap(lastLog, now) {
// We won the race; suppress count is our responsibility to report
count := suppressedAuthErrs.Swap(0)
if count > 0 {
glog.Infof("[t]auth error ... (%d suppressed)", count)
}
return true
}
}

// Either rate-limited or lost CAS race; increment suppression counter
suppressedAuthErrs.Add(1)
return false
}

// isBackendDegraded checks if the backend is currently degraded based on auth errors.
// Returns true if auth errors are flooding, indicating platform connectivity issues.
func isBackendDegraded() bool {
return isBackendDegradedFlag.Load()
}

// updateBackendDegradation checks auth error frequency and sets the degradation flag.
// Should be called after auth failures; the flag is used to gate contract creation.
func updateBackendDegradation() {
now := time.Now().UnixNano()
lastCheck := lastBackendCheckNano.Load()

// Check at most once per degradation window to avoid overhead
if now-lastCheck < int64(BackendDegradationWindow) {
return
}

// Attempt to claim the check slot with CAS
if !lastBackendCheckNano.CompareAndSwap(lastCheck, now) {
return // Another goroutine is checking; skip
}

// Read current suppressed error count; if very high, backend is degraded
suppressed := suppressedAuthErrs.Load()
if suppressed > 100 { // Threshold: more than 100 suppressed errors in the window
isBackendDegradedFlag.Store(true)
} else {
isBackendDegradedFlag.Store(false)
}
}

// (ctx, network, address)
// type DialContextFunc func(ctx context.Context, network string, address string) (net.Conn, error)

Expand Down Expand Up @@ -482,7 +551,10 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) {
ws, err = connect()
}
if err != nil {
glog.Infof("[t]auth error %s = %s\n", clientId, err)
if shouldLogAuthErr() {
glog.Infof("[t]auth error %s = %s\n", clientId, err)
}
updateBackendDegradation()
select {
case <-self.ctx.Done():
return
Expand Down Expand Up @@ -1068,7 +1140,10 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D
connStream, err = connect()
}
if err != nil {
glog.Infof("[t]auth error %s = %s\n", clientId, err)
if shouldLogAuthErr() {
glog.Infof("[t]auth error %s = %s\n", clientId, err)
}
updateBackendDegradation()
select {
case <-self.ctx.Done():
return
Expand Down