Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

// "runtime/debug"
Expand All @@ -24,6 +25,26 @@ import (
"github.com/urnetwork/connect/protocol"
)

var (
lastDropErrLogNano atomic.Int64
suppressedDropErrCount atomic.Int64
)

func shouldLogDropErr() (bool, int64) {
now := time.Now().UnixNano()
last := lastDropErrLogNano.Load()
if now-last < int64(time.Minute) {
suppressedDropErrCount.Add(1)
return false, 0
}
if !lastDropErrLogNano.CompareAndSwap(last, now) {
suppressedDropErrCount.Add(1)
return false, 0
}
suppressed := suppressedDropErrCount.Swap(0)
return true, suppressed
}

/*
Sends frames to destinations with properties:
- as long the sending client is active, frames are eventually delivered up to timeout
Expand Down Expand Up @@ -2696,7 +2717,13 @@ func (self *ReceiveSequence) Run() {
} else {
err := c()
if err != nil {
glog.Infof("[r]drop = %s", err)
if ok, suppressed := shouldLogDropErr(); ok {
if suppressed > 0 {
glog.Infof("[r]drop = %s (%d suppressed)", err, suppressed)
} else {
glog.Infof("[r]drop = %s", err)
}
}
}
}
}
Expand Down
113 changes: 112 additions & 1 deletion transfer_contract_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connect
import (
"context"
"sync"
"sync/atomic"
"time"

// "errors"
Expand All @@ -24,6 +25,24 @@ import (
"github.com/urnetwork/connect/protocol"
)

var lastOobErrLogNano atomic.Int64
var suppressedOobErrCount atomic.Int64

func shouldLogOobErr() (bool, int64) {
now := time.Now().UnixNano()
last := lastOobErrLogNano.Load()
if now-last < int64(time.Minute) {
suppressedOobErrCount.Add(1)
return false, 0
}
if !lastOobErrLogNano.CompareAndSwap(last, now) {
suppressedOobErrCount.Add(1)
return false, 0
}
suppressed := suppressedOobErrCount.Swap(0)
return true, suppressed
}

// manage contracts which are embedded into each transfer sequence

type ContractKey struct {
Expand Down Expand Up @@ -77,22 +96,66 @@ func (self *ContractManagerStats) ContractOpenByteCount() ByteCount {
return netContractOpenByteCount
}

// SignStoredContract returns the HMAC signature for a stored contract using the
// format appropriate for the current time relative to
// settings.NetworkEventTimeChangeHmac. Before that time, signers emit the
// legacy form (`mac.Sum(storedContractBytes)`, which appends a key-only HMAC
// to the contract bytes). At or after that time, signers emit the standard
// form (`mac.Write(storedContractBytes); mac.Sum(nil)`).
//
// Both connect and server/connect must use this helper so the cutover is
// consistent across client and server.
func SignStoredContract(settings *ContractManagerSettings, provideSecretKey []byte, storedContractBytes []byte) []byte {
mac := hmac.New(sha256.New, provideSecretKey)
if time.Now().Before(settings.NetworkEventTimeChangeHmac) {
// legacy: this leaves HMAC(key, "") in the trailing 32 bytes of the
// returned slice. preserved for backward compatibility.
return mac.Sum(storedContractBytes)
}
mac.Write(storedContractBytes)
return mac.Sum(nil)
}

// VerifyStoredContract validates a stored-contract HMAC against the provide
// secret key, accepting both the legacy and standard HMAC formats so that
// signers may cross over at settings.NetworkEventTimeChangeHmac without
// breaking compatibility with peers that have not yet cut over.
func VerifyStoredContract(settings *ContractManagerSettings, provideSecretKey []byte, storedContractBytes []byte, storedContractHmac []byte) bool {
legacyMac := hmac.New(sha256.New, provideSecretKey)
if hmac.Equal(storedContractHmac, legacyMac.Sum(storedContractBytes)) {
return true
}
standardMac := hmac.New(sha256.New, provideSecretKey)
standardMac.Write(storedContractBytes)
return hmac.Equal(storedContractHmac, standardMac.Sum(nil))
}

func DefaultContractManagerSettings() *ContractManagerSettings {
// NETWORK EVENT: at the enable contracts date, all clients will require contracts
// up to that time, contracts are optional for the sender and match for the receiver
networkEventTimeEnableContracts, err := time.Parse(time.RFC3339, "2024-05-01T00:00:00Z")
if err != nil {
panic(err)
}
// NETWORK EVENT: at the change-hmac date, signers cut over from the legacy
// HMAC format to the standard form. verifiers accept both forms at all
// times so the cutover can be deployed asymmetrically.
networkEventTimeChangeHmac, err := time.Parse(time.RFC3339, "2026-07-01T00:00:00Z")
if err != nil {
panic(err)
}
return &ContractManagerSettings{
InitialContractTransferByteCount: kib(16),
StandardContractTransferByteCount: mib(128),
ContractTransferByteSeqScale: 4,

NetworkEventTimeEnableContracts: networkEventTimeEnableContracts,
NetworkEventTimeChangeHmac: networkEventTimeChangeHmac,

ProvidePingTimeout: 0,

CreateContractOobErrorBackoff: time.Minute,

ProtocolVersion: DefaultProtocolVersion,

// TODO remove
Expand All @@ -105,6 +168,7 @@ func DefaultContractManagerSettings() *ContractManagerSettings {
func DefaultContractManagerSettingsNoNetworkEvents() *ContractManagerSettings {
settings := DefaultContractManagerSettings()
settings.NetworkEventTimeEnableContracts = time.Time{}
settings.NetworkEventTimeChangeHmac = time.Time{}
return settings
}

Expand All @@ -119,9 +183,19 @@ type ContractManagerSettings struct {
// this can be removed after wide adoption
NetworkEventTimeEnableContracts time.Time

// cut over the stored-contract HMAC signing format. before this time,
// SignStoredContract emits the legacy form (mac.Sum(bytes)); at or after,
// it emits the standard form (mac.Write(bytes); mac.Sum(nil)). verifiers
// accept both forms at all times.
NetworkEventTimeChangeHmac time.Time

// an active ping to the control fast-tracks any timeouts
ProvidePingTimeout time.Duration

// back off create-contract OOB API calls after an OOB error to avoid
// repeatedly hitting the API while it is timing out or unavailable.
CreateContractOobErrorBackoff time.Duration

ProtocolVersion int

// TODO remove
Expand Down Expand Up @@ -161,12 +235,38 @@ type ContractManager struct {
localStats *ContractManagerStats

controlSyncProvide *ControlSync

createContractOobErrorBackoffUntil time.Time
}

func NewContractManagerWithDefaults(ctx context.Context, client *Client) *ContractManager {
return NewContractManager(ctx, client, DefaultContractManagerSettings())
}

func (self *ContractManager) createContractOobErrorBackoffActive() bool {
if self.settings.CreateContractOobErrorBackoff <= 0 {
return false
}

self.mutex.Lock()
defer self.mutex.Unlock()

return time.Now().Before(self.createContractOobErrorBackoffUntil)
}

func (self *ContractManager) markCreateContractOobError() {
if self.settings.CreateContractOobErrorBackoff <= 0 {
return
}

self.mutex.Lock()
defer self.mutex.Unlock()

if !time.Now().Before(self.createContractOobErrorBackoffUntil) {
self.createContractOobErrorBackoffUntil = time.Now().Add(self.settings.CreateContractOobErrorBackoff)
}
}

func NewContractManager(
ctx context.Context,
client *Client,
Expand Down Expand Up @@ -825,6 +925,10 @@ func (self *ContractManager) addContract(contractKey ContractKey, contract *prot
}

func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeqIndex uint64, minByteCount ByteCount) {
if self.createContractOobErrorBackoffActive() {
return
}

// look at destinationContracts and last contract to get previous contract id
contractQueue := self.openContractQueue(contractKey)
defer self.closeContractQueue(contractKey)
Expand Down Expand Up @@ -862,7 +966,14 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq
case <-self.client.Done():
// no need to log warnings when the client closes
default:
glog.Infof("[contract]oob err = %s\n", err)
self.markCreateContractOobError()
if ok, suppressed := shouldLogOobErr(); ok {
if suppressed > 0 {
glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s (%d suppressed)\n", err, self.settings.CreateContractOobErrorBackoff, suppressed)
} else {
glog.Infof("[contract]oob err = %s; backing off create contract OOB requests for %s\n", err, self.settings.CreateContractOobErrorBackoff)
}
}
}
}
},
Expand Down
58 changes: 56 additions & 2 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ type PlatformTransport struct {
availableModes map[TransportMode]bool
targetMode TransportMode
mode TransportMode

}

func NewPlatformTransportWithDefaults(
Expand Down Expand Up @@ -378,6 +379,29 @@ func isBetterMode(current TransportMode, other TransportMode) bool {
return transportModePreferences[current] < transportModePreferences[other]
}

// lastAuthErrLogNano and suppressedAuthErrCount are package-level atomics shared
// across all PlatformTransport instances, rate-limiting [t]auth error log lines to
// at most once per minute and tracking how many were suppressed in the interval.
var lastAuthErrLogNano atomic.Int64
var suppressedAuthErrCount atomic.Int64

// shouldLogAuthErr returns (true, suppressedCount) if a log line should be emitted,
// resetting the suppressed counter. Returns (false, 0) if the error is suppressed.
func shouldLogAuthErr() (bool, int64) {
now := time.Now().UnixNano()
last := lastAuthErrLogNano.Load()
if now-last < int64(time.Minute) {
suppressedAuthErrCount.Add(1)
return false, 0
}
if !lastAuthErrLogNano.CompareAndSwap(last, now) {
suppressedAuthErrCount.Add(1)
return false, 0
}
suppressed := suppressedAuthErrCount.Swap(0)
return true, suppressed
}

func (self *PlatformTransport) runH1(initialTimeout time.Duration) {
// connect and update route manager for this transport
defer self.cancel()
Expand All @@ -392,6 +416,7 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) {
}
}

authErrLogged := false
for {
// wait until we are back in h1 or worse
func() {
Expand Down Expand Up @@ -482,14 +507,28 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) {
ws, err = connect()
}
if err != nil {
glog.Infof("[t]auth error %s = %s\n", clientId, err)
if !authErrLogged {
if ok, suppressed := shouldLogAuthErr(); ok {
if suppressed > 0 {
glog.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed)
} else {
glog.Infof("[t]auth error %s = %s\n", clientId, err)
}
authErrLogged = true
} else {
glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err)
}
} else {
glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err)
}
select {
case <-self.ctx.Done():
return
case <-reconnect.After():
continue
}
}
authErrLogged = false

c := func() {
defer ws.Close()
Expand Down Expand Up @@ -855,6 +894,7 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D
}
}

authErrLogged := false
for {
// wait until we are back in the specific pt mode or auto mode
func() {
Expand Down Expand Up @@ -1020,14 +1060,28 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D
connStream, err = connect()
}
if err != nil {
glog.Infof("[t]auth error %s = %s\n", clientId, err)
if !authErrLogged {
if ok, suppressed := shouldLogAuthErr(); ok {
if suppressed > 0 {
glog.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed)
} else {
glog.Infof("[t]auth error %s = %s\n", clientId, err)
}
authErrLogged = true
} else {
glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err)
}
} else {
glog.V(1).Infof("[t]auth error %s = %s\n", clientId, err)
}
select {
case <-self.ctx.Done():
return
case <-reconnect.After():
continue
}
}
authErrLogged = false
conn := connStream.conn
stream := connStream.stream

Expand Down