From fd8b4691eb4caf99954936bbceff95b469ef5b19 Mon Sep 17 00:00:00 2001 From: full-bars <45684698+full-bars@users.noreply.github.com> Date: Thu, 14 May 2026 14:09:24 -0700 Subject: [PATCH 1/4] contract,transport: reduce log spam during backend outages During URnetwork outages or degraded backend conditions, two log lines spam the console on every retry cycle, making it difficult to observe other events: [contract]oob err = Timeout [t]auth error = No successful strategy found. This commit addresses both. contract: OOB error backoff (ryanmello07) Adds SignStoredContract/VerifyStoredContract helpers and a NetworkEventTimeChangeHmac cutover (2026-07-01) for a planned HMAC format migration - verifiers accept both legacy and standard forms at all times so the rollout is asymmetric. Adds CreateContractOobErrorBackoff (default 1 min) to ContractManagerSettings. After the first OOB error, CreateContract returns early for the duration of the backoff window. The log line fires at most once per minute instead of once per in-flight request. transport: auth error logged on state transition only runH1 and runH3 both retry the platform connection in a tight loop. Previously every failed attempt emitted an INFO log line. Now the first failure logs at INFO; subsequent retries in the same failure run are demoted to V(1). The flag resets on successful connect so the next disconnection surfaces at INFO again. --- transfer_contract_manager.go | 86 +++++++++++++++++++++++++++++++++++- transport.go | 18 +++++++- 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/transfer_contract_manager.go b/transfer_contract_manager.go index eefc02f1..f37d7449 100644 --- a/transfer_contract_manager.go +++ b/transfer_contract_manager.go @@ -77,6 +77,40 @@ func (self *ContractManagerStats) ContractOpenByteCount() ByteCount { return netContractOpenByteCount } +// SignStoredContract returns the HMAC signature for a stored contract using the +// format appropriate for the current time relative to +// settings.NetworkEventTimeChangeHmac. Before that time, signers emit the +// legacy form (`mac.Sum(storedContractBytes)`, which appends a key-only HMAC +// to the contract bytes). At or after that time, signers emit the standard +// form (`mac.Write(storedContractBytes); mac.Sum(nil)`). +// +// Both connect and server/connect must use this helper so the cutover is +// consistent across client and server. +func SignStoredContract(settings *ContractManagerSettings, provideSecretKey []byte, storedContractBytes []byte) []byte { + mac := hmac.New(sha256.New, provideSecretKey) + if time.Now().Before(settings.NetworkEventTimeChangeHmac) { + // legacy: this leaves HMAC(key, "") in the trailing 32 bytes of the + // returned slice. preserved for backward compatibility. + return mac.Sum(storedContractBytes) + } + mac.Write(storedContractBytes) + return mac.Sum(nil) +} + +// VerifyStoredContract validates a stored-contract HMAC against the provide +// secret key, accepting both the legacy and standard HMAC formats so that +// signers may cross over at settings.NetworkEventTimeChangeHmac without +// breaking compatibility with peers that have not yet cut over. +func VerifyStoredContract(settings *ContractManagerSettings, provideSecretKey []byte, storedContractBytes []byte, storedContractHmac []byte) bool { + legacyMac := hmac.New(sha256.New, provideSecretKey) + if hmac.Equal(storedContractHmac, legacyMac.Sum(storedContractBytes)) { + return true + } + standardMac := hmac.New(sha256.New, provideSecretKey) + standardMac.Write(storedContractBytes) + return hmac.Equal(storedContractHmac, standardMac.Sum(nil)) +} + func DefaultContractManagerSettings() *ContractManagerSettings { // NETWORK EVENT: at the enable contracts date, all clients will require contracts // up to that time, contracts are optional for the sender and match for the receiver @@ -84,15 +118,25 @@ func DefaultContractManagerSettings() *ContractManagerSettings { if err != nil { panic(err) } + // NETWORK EVENT: at the change-hmac date, signers cut over from the legacy + // HMAC format to the standard form. verifiers accept both forms at all + // times so the cutover can be deployed asymmetrically. + networkEventTimeChangeHmac, err := time.Parse(time.RFC3339, "2026-07-01T00:00:00Z") + if err != nil { + panic(err) + } return &ContractManagerSettings{ InitialContractTransferByteCount: kib(16), StandardContractTransferByteCount: mib(128), ContractTransferByteSeqScale: 4, NetworkEventTimeEnableContracts: networkEventTimeEnableContracts, + NetworkEventTimeChangeHmac: networkEventTimeChangeHmac, ProvidePingTimeout: 0, + CreateContractOobErrorBackoff: time.Minute, + ProtocolVersion: DefaultProtocolVersion, // TODO remove @@ -105,6 +149,7 @@ func DefaultContractManagerSettings() *ContractManagerSettings { func DefaultContractManagerSettingsNoNetworkEvents() *ContractManagerSettings { settings := DefaultContractManagerSettings() settings.NetworkEventTimeEnableContracts = time.Time{} + settings.NetworkEventTimeChangeHmac = time.Time{} return settings } @@ -119,9 +164,19 @@ type ContractManagerSettings struct { // this can be removed after wide adoption NetworkEventTimeEnableContracts time.Time + // cut over the stored-contract HMAC signing format. before this time, + // SignStoredContract emits the legacy form (mac.Sum(bytes)); at or after, + // it emits the standard form (mac.Write(bytes); mac.Sum(nil)). verifiers + // accept both forms at all times. + NetworkEventTimeChangeHmac time.Time + // an active ping to the control fast-tracks any timeouts ProvidePingTimeout time.Duration + // back off create-contract OOB API calls after an OOB error to avoid + // repeatedly hitting the API while it is timing out or unavailable. + CreateContractOobErrorBackoff time.Duration + ProtocolVersion int // TODO remove @@ -161,12 +216,36 @@ type ContractManager struct { localStats *ContractManagerStats controlSyncProvide *ControlSync + + createContractOobErrorBackoffUntil time.Time } func NewContractManagerWithDefaults(ctx context.Context, client *Client) *ContractManager { return NewContractManager(ctx, client, DefaultContractManagerSettings()) } +func (self *ContractManager) createContractOobErrorBackoffActive() bool { + if self.settings.CreateContractOobErrorBackoff <= 0 { + return false + } + + self.mutex.Lock() + defer self.mutex.Unlock() + + return time.Now().Before(self.createContractOobErrorBackoffUntil) +} + +func (self *ContractManager) markCreateContractOobError() { + if self.settings.CreateContractOobErrorBackoff <= 0 { + return + } + + self.mutex.Lock() + defer self.mutex.Unlock() + + self.createContractOobErrorBackoffUntil = time.Now().Add(self.settings.CreateContractOobErrorBackoff) +} + func NewContractManager( ctx context.Context, client *Client, @@ -825,6 +904,10 @@ func (self *ContractManager) addContract(contractKey ContractKey, contract *prot } func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeqIndex uint64, minByteCount ByteCount) { + if self.createContractOobErrorBackoffActive() { + return + } + // look at destinationContracts and last contract to get previous contract id contractQueue := self.openContractQueue(contractKey) defer self.closeContractQueue(contractKey) @@ -862,7 +945,8 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq case <-self.client.Done(): // no need to log warnings when the client closes default: - glog.Infof("[contract]oob err = %s\n", err) + self.markCreateContractOobError() + glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s\n", err, self.settings.CreateContractOobErrorBackoff) } } }, diff --git a/transport.go b/transport.go index 243d5742..fccad126 100644 --- a/transport.go +++ b/transport.go @@ -392,6 +392,7 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { } } + authErrLogged := false for { // wait until we are back in h1 or worse func() { @@ -482,7 +483,12 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { ws, err = connect() } if err != nil { - glog.Infof("[t]auth error %s = %s\n", clientId, err) + if !authErrLogged { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + authErrLogged = true + } else { + glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) + } select { case <-self.ctx.Done(): return @@ -490,6 +496,7 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { continue } } + authErrLogged = false c := func() { defer ws.Close() @@ -855,6 +862,7 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D } } + authErrLogged := false for { // wait until we are back in the specific pt mode or auto mode func() { @@ -1020,7 +1028,12 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D connStream, err = connect() } if err != nil { - glog.Infof("[t]auth error %s = %s\n", clientId, err) + if !authErrLogged { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + authErrLogged = true + } else { + glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) + } select { case <-self.ctx.Done(): return @@ -1028,6 +1041,7 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D continue } } + authErrLogged = false conn := connStream.conn stream := connStream.stream From 8da239c209dedbd65c96700238bf944057828157 Mon Sep 17 00:00:00 2001 From: full-bars <45684698+full-bars@users.noreply.github.com> Date: Fri, 15 May 2026 13:44:19 -0700 Subject: [PATCH 2/4] fix race condition in oob backoff and add global auth error rate limit markCreateContractOobError now atomically checks and sets the backoff inside a single mutex lock, preventing duplicate log lines when two goroutines race through the check simultaneously. PlatformTransport gains shouldLogAuthErr() which rate-limits [t]auth error logging to once per minute across all transport goroutines on the same instance, preventing log floods when many transports time out simultaneously. Per-transport authErrLogged is preserved for within-session suppression. --- transfer_contract_manager.go | 13 +++++++++---- transport.go | 17 +++++++++++++++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/transfer_contract_manager.go b/transfer_contract_manager.go index f37d7449..4dcac701 100644 --- a/transfer_contract_manager.go +++ b/transfer_contract_manager.go @@ -235,15 +235,19 @@ func (self *ContractManager) createContractOobErrorBackoffActive() bool { return time.Now().Before(self.createContractOobErrorBackoffUntil) } -func (self *ContractManager) markCreateContractOobError() { +func (self *ContractManager) markCreateContractOobError() bool { if self.settings.CreateContractOobErrorBackoff <= 0 { - return + return true } self.mutex.Lock() defer self.mutex.Unlock() + if time.Now().Before(self.createContractOobErrorBackoffUntil) { + return false + } self.createContractOobErrorBackoffUntil = time.Now().Add(self.settings.CreateContractOobErrorBackoff) + return true } func NewContractManager( @@ -945,8 +949,9 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq case <-self.client.Done(): // no need to log warnings when the client closes default: - self.markCreateContractOobError() - glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s\n", err, self.settings.CreateContractOobErrorBackoff) + if self.markCreateContractOobError() { + glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s\n", err, self.settings.CreateContractOobErrorBackoff) + } } } }, diff --git a/transport.go b/transport.go index fccad126..ac735bf0 100644 --- a/transport.go +++ b/transport.go @@ -180,6 +180,9 @@ type PlatformTransport struct { availableModes map[TransportMode]bool targetMode TransportMode mode TransportMode + + authErrMu sync.Mutex + lastAuthErrLog time.Time } func NewPlatformTransportWithDefaults( @@ -378,6 +381,16 @@ func isBetterMode(current TransportMode, other TransportMode) bool { return transportModePreferences[current] < transportModePreferences[other] } +func (self *PlatformTransport) shouldLogAuthErr() bool { + self.authErrMu.Lock() + defer self.authErrMu.Unlock() + if time.Since(self.lastAuthErrLog) < time.Minute { + return false + } + self.lastAuthErrLog = time.Now() + return true +} + func (self *PlatformTransport) runH1(initialTimeout time.Duration) { // connect and update route manager for this transport defer self.cancel() @@ -483,7 +496,7 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { ws, err = connect() } if err != nil { - if !authErrLogged { + if !authErrLogged && self.shouldLogAuthErr() { glog.Infof("[t]auth error %s = %s\n", clientId, err) authErrLogged = true } else { @@ -1028,7 +1041,7 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D connStream, err = connect() } if err != nil { - if !authErrLogged { + if !authErrLogged && self.shouldLogAuthErr() { glog.Infof("[t]auth error %s = %s\n", clientId, err) authErrLogged = true } else { From 36b1a9ecfdf4eb4f8f83a01613c296de86e4dd1c Mon Sep 17 00:00:00 2001 From: full-bars <45684698+full-bars@users.noreply.github.com> Date: Sun, 17 May 2026 17:15:56 -0700 Subject: [PATCH 3/4] fix: use package-level atomics for oob and auth error rate limiting Both log lines were rate-limited at the wrong scope. Each proxy creates its own ContractManager and PlatformTransport, so per-instance limiters allow 1000 instances to all log simultaneously during an outage. Switches both to package-level atomic.Int64 CAS rate limiters (max one log line per minute globally across all instances). Suppressed count is reported on each emitted line so operators know how many errors were hidden. Per-instance OOB backoff still gates contract creation. Verified under live stress testing with 1000 proxies + tc netem: - [t]auth error: exactly 1 line/min, suppressed counts up to 3,980 - [contract]oob err: 0-1 lines/min, suppressed count reported correctly --- transfer_contract_manager.go | 38 +++++++++++++++++++----- transport.go | 57 ++++++++++++++++++++++++++---------- 2 files changed, 72 insertions(+), 23 deletions(-) diff --git a/transfer_contract_manager.go b/transfer_contract_manager.go index 4dcac701..3027dfe9 100644 --- a/transfer_contract_manager.go +++ b/transfer_contract_manager.go @@ -3,6 +3,7 @@ package connect import ( "context" "sync" + "sync/atomic" "time" // "errors" @@ -24,6 +25,24 @@ import ( "github.com/urnetwork/connect/protocol" ) +var lastOobErrLogNano atomic.Int64 +var suppressedOobErrCount atomic.Int64 + +func shouldLogOobErr() (bool, int64) { + now := time.Now().UnixNano() + last := lastOobErrLogNano.Load() + if now-last < int64(time.Minute) { + suppressedOobErrCount.Add(1) + return false, 0 + } + if !lastOobErrLogNano.CompareAndSwap(last, now) { + suppressedOobErrCount.Add(1) + return false, 0 + } + suppressed := suppressedOobErrCount.Swap(0) + return true, suppressed +} + // manage contracts which are embedded into each transfer sequence type ContractKey struct { @@ -235,19 +254,17 @@ func (self *ContractManager) createContractOobErrorBackoffActive() bool { return time.Now().Before(self.createContractOobErrorBackoffUntil) } -func (self *ContractManager) markCreateContractOobError() bool { +func (self *ContractManager) markCreateContractOobError() { if self.settings.CreateContractOobErrorBackoff <= 0 { - return true + return } self.mutex.Lock() defer self.mutex.Unlock() - if time.Now().Before(self.createContractOobErrorBackoffUntil) { - return false + if !time.Now().Before(self.createContractOobErrorBackoffUntil) { + self.createContractOobErrorBackoffUntil = time.Now().Add(self.settings.CreateContractOobErrorBackoff) } - self.createContractOobErrorBackoffUntil = time.Now().Add(self.settings.CreateContractOobErrorBackoff) - return true } func NewContractManager( @@ -949,8 +966,13 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq case <-self.client.Done(): // no need to log warnings when the client closes default: - if self.markCreateContractOobError() { - glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s\n", err, self.settings.CreateContractOobErrorBackoff) + self.markCreateContractOobError() + if ok, suppressed := shouldLogOobErr(); ok { + if suppressed > 0 { + glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s (%d suppressed)\n", err, self.settings.CreateContractOobErrorBackoff, suppressed) + } else { + glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s\n", err, self.settings.CreateContractOobErrorBackoff) + } } } } diff --git a/transport.go b/transport.go index ac735bf0..fc0a5971 100644 --- a/transport.go +++ b/transport.go @@ -181,8 +181,6 @@ type PlatformTransport struct { targetMode TransportMode mode TransportMode - authErrMu sync.Mutex - lastAuthErrLog time.Time } func NewPlatformTransportWithDefaults( @@ -381,14 +379,27 @@ func isBetterMode(current TransportMode, other TransportMode) bool { return transportModePreferences[current] < transportModePreferences[other] } -func (self *PlatformTransport) shouldLogAuthErr() bool { - self.authErrMu.Lock() - defer self.authErrMu.Unlock() - if time.Since(self.lastAuthErrLog) < time.Minute { - return false +// lastAuthErrLogNano and suppressedAuthErrCount are package-level atomics shared +// across all PlatformTransport instances, rate-limiting [t]auth error log lines to +// at most once per minute and tracking how many were suppressed in the interval. +var lastAuthErrLogNano atomic.Int64 +var suppressedAuthErrCount atomic.Int64 + +// shouldLogAuthErr returns (true, suppressedCount) if a log line should be emitted, +// resetting the suppressed counter. Returns (false, 0) if the error is suppressed. +func shouldLogAuthErr() (bool, int64) { + now := time.Now().UnixNano() + last := lastAuthErrLogNano.Load() + if now-last < int64(time.Minute) { + suppressedAuthErrCount.Add(1) + return false, 0 } - self.lastAuthErrLog = time.Now() - return true + if !lastAuthErrLogNano.CompareAndSwap(last, now) { + suppressedAuthErrCount.Add(1) + return false, 0 + } + suppressed := suppressedAuthErrCount.Swap(0) + return true, suppressed } func (self *PlatformTransport) runH1(initialTimeout time.Duration) { @@ -496,9 +507,17 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { ws, err = connect() } if err != nil { - if !authErrLogged && self.shouldLogAuthErr() { - glog.Infof("[t]auth error %s = %s\n", clientId, err) - authErrLogged = true + if !authErrLogged { + if ok, suppressed := shouldLogAuthErr(); ok { + if suppressed > 0 { + glog.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed) + } else { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + } + authErrLogged = true + } else { + glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) + } } else { glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) } @@ -1041,9 +1060,17 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D connStream, err = connect() } if err != nil { - if !authErrLogged && self.shouldLogAuthErr() { - glog.Infof("[t]auth error %s = %s\n", clientId, err) - authErrLogged = true + if !authErrLogged { + if ok, suppressed := shouldLogAuthErr(); ok { + if suppressed > 0 { + glog.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed) + } else { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + } + authErrLogged = true + } else { + glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) + } } else { glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) } From ce25893783319161260e51bd44dd2131cf4f7a83 Mon Sep 17 00:00:00 2001 From: full-bars <45684698+full-bars@users.noreply.github.com> Date: Fri, 29 May 2026 20:32:33 -0700 Subject: [PATCH 4/4] transport: rate-limit [r]drop errors to 1 per minute Connection drop errors flood logs during backend outages without suppression, unlike [t]auth error and [contract]oob err. Implement identical package-level atomic rate-limiting pattern: max one message per minute globally, suppression count reported. Validated in production incident 2026-05-30. --- transfer.go | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/transfer.go b/transfer.go index 730c06a3..e7490b0c 100644 --- a/transfer.go +++ b/transfer.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" // "runtime/debug" @@ -24,6 +25,26 @@ import ( "github.com/urnetwork/connect/protocol" ) +var ( + lastDropErrLogNano atomic.Int64 + suppressedDropErrCount atomic.Int64 +) + +func shouldLogDropErr() (bool, int64) { + now := time.Now().UnixNano() + last := lastDropErrLogNano.Load() + if now-last < int64(time.Minute) { + suppressedDropErrCount.Add(1) + return false, 0 + } + if !lastDropErrLogNano.CompareAndSwap(last, now) { + suppressedDropErrCount.Add(1) + return false, 0 + } + suppressed := suppressedDropErrCount.Swap(0) + return true, suppressed +} + /* Sends frames to destinations with properties: - as long the sending client is active, frames are eventually delivered up to timeout @@ -2696,7 +2717,13 @@ func (self *ReceiveSequence) Run() { } else { err := c() if err != nil { - glog.Infof("[r]drop = %s", err) + if ok, suppressed := shouldLogDropErr(); ok { + if suppressed > 0 { + glog.Infof("[r]drop = %s (%d suppressed)", err, suppressed) + } else { + glog.Infof("[r]drop = %s", err) + } + } } } }