diff --git a/transfer.go b/transfer.go index 730c06a..e7490b0 100644 --- a/transfer.go +++ b/transfer.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" // "runtime/debug" @@ -24,6 +25,26 @@ import ( "github.com/urnetwork/connect/protocol" ) +var ( + lastDropErrLogNano atomic.Int64 + suppressedDropErrCount atomic.Int64 +) + +func shouldLogDropErr() (bool, int64) { + now := time.Now().UnixNano() + last := lastDropErrLogNano.Load() + if now-last < int64(time.Minute) { + suppressedDropErrCount.Add(1) + return false, 0 + } + if !lastDropErrLogNano.CompareAndSwap(last, now) { + suppressedDropErrCount.Add(1) + return false, 0 + } + suppressed := suppressedDropErrCount.Swap(0) + return true, suppressed +} + /* Sends frames to destinations with properties: - as long the sending client is active, frames are eventually delivered up to timeout @@ -2696,7 +2717,13 @@ func (self *ReceiveSequence) Run() { } else { err := c() if err != nil { - glog.Infof("[r]drop = %s", err) + if ok, suppressed := shouldLogDropErr(); ok { + if suppressed > 0 { + glog.Infof("[r]drop = %s (%d suppressed)", err, suppressed) + } else { + glog.Infof("[r]drop = %s", err) + } + } } } } diff --git a/transfer_contract_manager.go b/transfer_contract_manager.go index eefc02f..3027dfe 100644 --- a/transfer_contract_manager.go +++ b/transfer_contract_manager.go @@ -3,6 +3,7 @@ package connect import ( "context" "sync" + "sync/atomic" "time" // "errors" @@ -24,6 +25,24 @@ import ( "github.com/urnetwork/connect/protocol" ) +var lastOobErrLogNano atomic.Int64 +var suppressedOobErrCount atomic.Int64 + +func shouldLogOobErr() (bool, int64) { + now := time.Now().UnixNano() + last := lastOobErrLogNano.Load() + if now-last < int64(time.Minute) { + suppressedOobErrCount.Add(1) + return false, 0 + } + if !lastOobErrLogNano.CompareAndSwap(last, now) { + suppressedOobErrCount.Add(1) + return false, 0 + } + suppressed := suppressedOobErrCount.Swap(0) + return true, suppressed +} + // manage contracts which are embedded into each transfer sequence type ContractKey struct { @@ -77,6 +96,40 @@ func (self *ContractManagerStats) ContractOpenByteCount() ByteCount { return netContractOpenByteCount } +// SignStoredContract returns the HMAC signature for a stored contract using the +// format appropriate for the current time relative to +// settings.NetworkEventTimeChangeHmac. Before that time, signers emit the +// legacy form (`mac.Sum(storedContractBytes)`, which appends a key-only HMAC +// to the contract bytes). At or after that time, signers emit the standard +// form (`mac.Write(storedContractBytes); mac.Sum(nil)`). +// +// Both connect and server/connect must use this helper so the cutover is +// consistent across client and server. +func SignStoredContract(settings *ContractManagerSettings, provideSecretKey []byte, storedContractBytes []byte) []byte { + mac := hmac.New(sha256.New, provideSecretKey) + if time.Now().Before(settings.NetworkEventTimeChangeHmac) { + // legacy: this leaves HMAC(key, "") in the trailing 32 bytes of the + // returned slice. preserved for backward compatibility. + return mac.Sum(storedContractBytes) + } + mac.Write(storedContractBytes) + return mac.Sum(nil) +} + +// VerifyStoredContract validates a stored-contract HMAC against the provide +// secret key, accepting both the legacy and standard HMAC formats so that +// signers may cross over at settings.NetworkEventTimeChangeHmac without +// breaking compatibility with peers that have not yet cut over. +func VerifyStoredContract(settings *ContractManagerSettings, provideSecretKey []byte, storedContractBytes []byte, storedContractHmac []byte) bool { + legacyMac := hmac.New(sha256.New, provideSecretKey) + if hmac.Equal(storedContractHmac, legacyMac.Sum(storedContractBytes)) { + return true + } + standardMac := hmac.New(sha256.New, provideSecretKey) + standardMac.Write(storedContractBytes) + return hmac.Equal(storedContractHmac, standardMac.Sum(nil)) +} + func DefaultContractManagerSettings() *ContractManagerSettings { // NETWORK EVENT: at the enable contracts date, all clients will require contracts // up to that time, contracts are optional for the sender and match for the receiver @@ -84,15 +137,25 @@ func DefaultContractManagerSettings() *ContractManagerSettings { if err != nil { panic(err) } + // NETWORK EVENT: at the change-hmac date, signers cut over from the legacy + // HMAC format to the standard form. verifiers accept both forms at all + // times so the cutover can be deployed asymmetrically. + networkEventTimeChangeHmac, err := time.Parse(time.RFC3339, "2026-07-01T00:00:00Z") + if err != nil { + panic(err) + } return &ContractManagerSettings{ InitialContractTransferByteCount: kib(16), StandardContractTransferByteCount: mib(128), ContractTransferByteSeqScale: 4, NetworkEventTimeEnableContracts: networkEventTimeEnableContracts, + NetworkEventTimeChangeHmac: networkEventTimeChangeHmac, ProvidePingTimeout: 0, + CreateContractOobErrorBackoff: time.Minute, + ProtocolVersion: DefaultProtocolVersion, // TODO remove @@ -105,6 +168,7 @@ func DefaultContractManagerSettings() *ContractManagerSettings { func DefaultContractManagerSettingsNoNetworkEvents() *ContractManagerSettings { settings := DefaultContractManagerSettings() settings.NetworkEventTimeEnableContracts = time.Time{} + settings.NetworkEventTimeChangeHmac = time.Time{} return settings } @@ -119,9 +183,19 @@ type ContractManagerSettings struct { // this can be removed after wide adoption NetworkEventTimeEnableContracts time.Time + // cut over the stored-contract HMAC signing format. before this time, + // SignStoredContract emits the legacy form (mac.Sum(bytes)); at or after, + // it emits the standard form (mac.Write(bytes); mac.Sum(nil)). verifiers + // accept both forms at all times. + NetworkEventTimeChangeHmac time.Time + // an active ping to the control fast-tracks any timeouts ProvidePingTimeout time.Duration + // back off create-contract OOB API calls after an OOB error to avoid + // repeatedly hitting the API while it is timing out or unavailable. + CreateContractOobErrorBackoff time.Duration + ProtocolVersion int // TODO remove @@ -161,12 +235,38 @@ type ContractManager struct { localStats *ContractManagerStats controlSyncProvide *ControlSync + + createContractOobErrorBackoffUntil time.Time } func NewContractManagerWithDefaults(ctx context.Context, client *Client) *ContractManager { return NewContractManager(ctx, client, DefaultContractManagerSettings()) } +func (self *ContractManager) createContractOobErrorBackoffActive() bool { + if self.settings.CreateContractOobErrorBackoff <= 0 { + return false + } + + self.mutex.Lock() + defer self.mutex.Unlock() + + return time.Now().Before(self.createContractOobErrorBackoffUntil) +} + +func (self *ContractManager) markCreateContractOobError() { + if self.settings.CreateContractOobErrorBackoff <= 0 { + return + } + + self.mutex.Lock() + defer self.mutex.Unlock() + + if !time.Now().Before(self.createContractOobErrorBackoffUntil) { + self.createContractOobErrorBackoffUntil = time.Now().Add(self.settings.CreateContractOobErrorBackoff) + } +} + func NewContractManager( ctx context.Context, client *Client, @@ -825,6 +925,10 @@ func (self *ContractManager) addContract(contractKey ContractKey, contract *prot } func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeqIndex uint64, minByteCount ByteCount) { + if self.createContractOobErrorBackoffActive() { + return + } + // look at destinationContracts and last contract to get previous contract id contractQueue := self.openContractQueue(contractKey) defer self.closeContractQueue(contractKey) @@ -862,7 +966,14 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq case <-self.client.Done(): // no need to log warnings when the client closes default: - glog.Infof("[contract]oob err = %s\n", err) + self.markCreateContractOobError() + if ok, suppressed := shouldLogOobErr(); ok { + if suppressed > 0 { + glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s (%d suppressed)\n", err, self.settings.CreateContractOobErrorBackoff, suppressed) + } else { + glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s\n", err, self.settings.CreateContractOobErrorBackoff) + } + } } } }, diff --git a/transport.go b/transport.go index 243d574..fc0a597 100644 --- a/transport.go +++ b/transport.go @@ -180,6 +180,7 @@ type PlatformTransport struct { availableModes map[TransportMode]bool targetMode TransportMode mode TransportMode + } func NewPlatformTransportWithDefaults( @@ -378,6 +379,29 @@ func isBetterMode(current TransportMode, other TransportMode) bool { return transportModePreferences[current] < transportModePreferences[other] } +// lastAuthErrLogNano and suppressedAuthErrCount are package-level atomics shared +// across all PlatformTransport instances, rate-limiting [t]auth error log lines to +// at most once per minute and tracking how many were suppressed in the interval. +var lastAuthErrLogNano atomic.Int64 +var suppressedAuthErrCount atomic.Int64 + +// shouldLogAuthErr returns (true, suppressedCount) if a log line should be emitted, +// resetting the suppressed counter. Returns (false, 0) if the error is suppressed. +func shouldLogAuthErr() (bool, int64) { + now := time.Now().UnixNano() + last := lastAuthErrLogNano.Load() + if now-last < int64(time.Minute) { + suppressedAuthErrCount.Add(1) + return false, 0 + } + if !lastAuthErrLogNano.CompareAndSwap(last, now) { + suppressedAuthErrCount.Add(1) + return false, 0 + } + suppressed := suppressedAuthErrCount.Swap(0) + return true, suppressed +} + func (self *PlatformTransport) runH1(initialTimeout time.Duration) { // connect and update route manager for this transport defer self.cancel() @@ -392,6 +416,7 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { } } + authErrLogged := false for { // wait until we are back in h1 or worse func() { @@ -482,7 +507,20 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { ws, err = connect() } if err != nil { - glog.Infof("[t]auth error %s = %s\n", clientId, err) + if !authErrLogged { + if ok, suppressed := shouldLogAuthErr(); ok { + if suppressed > 0 { + glog.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed) + } else { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + } + authErrLogged = true + } else { + glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) + } + } else { + glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) + } select { case <-self.ctx.Done(): return @@ -490,6 +528,7 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { continue } } + authErrLogged = false c := func() { defer ws.Close() @@ -855,6 +894,7 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D } } + authErrLogged := false for { // wait until we are back in the specific pt mode or auto mode func() { @@ -1020,7 +1060,20 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D connStream, err = connect() } if err != nil { - glog.Infof("[t]auth error %s = %s\n", clientId, err) + if !authErrLogged { + if ok, suppressed := shouldLogAuthErr(); ok { + if suppressed > 0 { + glog.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed) + } else { + glog.Infof("[t]auth error %s = %s\n", clientId, err) + } + authErrLogged = true + } else { + glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) + } + } else { + glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err) + } select { case <-self.ctx.Done(): return @@ -1028,6 +1081,7 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D continue } } + authErrLogged = false conn := connStream.conn stream := connStream.stream