From 88c7334de76b2f095d8a4ac3941bce1c01f13011 Mon Sep 17 00:00:00 2001 From: not-amplify Date: Tue, 19 May 2026 10:36:48 -0400 Subject: [PATCH 1/7] wisp: populate wispConnection.remoteIP via ResolveClientIP Wires the existing remoteIP/ParseRealIP scaffolding to a proper client-IP resolver that honors X-Forwarded-For / CF-Connecting-IP only when the immediate peer is in trustedProxies. Required for accurate flood-protection, reputation, and logging. Adds: - wisp/clientip.go + tests (5 covering trusted/untrusted/XFF/garbage) - Config.TrustedProxies []string (CIDRs) - Config.TrustedHeaders []string (default: CF-Connecting-IP, X-Forwarded-For) - parsed cache cfg.trustedProxyNets, populated in InitResolver --- wisp/clientip.go | 56 +++++++++++++++++++++++++++++++++++++ wisp/clientip_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++ wisp/config.go | 21 ++++++++++---- wisp/wisp.go | 26 +++++++++++++++++ 4 files changed, 162 insertions(+), 6 deletions(-) create mode 100644 wisp/clientip.go create mode 100644 wisp/clientip_test.go diff --git a/wisp/clientip.go b/wisp/clientip.go new file mode 100644 index 0000000..4ac552c --- /dev/null +++ b/wisp/clientip.go @@ -0,0 +1,56 @@ +package wisp + +import ( + "net" + "net/http" + "strings" +) + +// ResolveClientIP returns the originating client IP for r. If the immediate +// peer (r.RemoteAddr) is contained in trustedProxies, the named headers are +// consulted in order; the first usable IP that is itself NOT in +// trustedProxies (walking right-to-left for XFF semantics) is returned. +// Otherwise the peer IP is returned. Always returns a non-nil IP; falls +// back to the IPv4 unspecified address (0.0.0.0) on parse error. +func ResolveClientIP(r *http.Request, trustedProxies []*net.IPNet, headers []string) net.IP { + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + host = r.RemoteAddr + } + peer := net.ParseIP(host) + if peer == nil { + return net.IPv4zero + } + + if !ipInAny(peer, trustedProxies) { + return peer + } + + for _, h := range headers { + v := r.Header.Get(h) + if v == "" { + continue + } + parts := strings.Split(v, ",") + for i := len(parts) - 1; i >= 0; i-- { + candidate := net.ParseIP(strings.TrimSpace(parts[i])) + if candidate == nil { + continue + } + if ipInAny(candidate, trustedProxies) { + continue + } + return candidate + } + } + return peer +} + +func ipInAny(ip net.IP, nets []*net.IPNet) bool { + for _, n := range nets { + if n.Contains(ip) { + return true + } + } + return false +} diff --git a/wisp/clientip_test.go b/wisp/clientip_test.go new file mode 100644 index 0000000..fca1f54 --- /dev/null +++ b/wisp/clientip_test.go @@ -0,0 +1,65 @@ +package wisp + +import ( + "net" + "net/http" + "testing" +) + +func mustCIDR(s string) *net.IPNet { + _, n, err := net.ParseCIDR(s) + if err != nil { + panic(err) + } + return n +} + +func TestResolveClientIPFromRemoteAddr(t *testing.T) { + r, _ := http.NewRequest("GET", "/", nil) + r.RemoteAddr = "203.0.113.5:12345" + ip := ResolveClientIP(r, nil, nil) + if ip.String() != "203.0.113.5" { + t.Fatalf("got %v", ip) + } +} + +func TestResolveClientIPIgnoresUntrustedHeader(t *testing.T) { + r, _ := http.NewRequest("GET", "/", nil) + r.RemoteAddr = "203.0.113.5:1" + r.Header.Set("X-Forwarded-For", "1.1.1.1") + ip := ResolveClientIP(r, nil, []string{"X-Forwarded-For"}) + if ip.String() != "203.0.113.5" { + t.Fatalf("got %v", ip) + } +} + +func TestResolveClientIPTrustsHeaderFromTrustedProxy(t *testing.T) { + r, _ := http.NewRequest("GET", "/", nil) + r.RemoteAddr = "10.0.0.5:1" + r.Header.Set("CF-Connecting-IP", "198.51.100.7") + trusted := []*net.IPNet{mustCIDR("10.0.0.0/8")} + ip := ResolveClientIP(r, trusted, []string{"CF-Connecting-IP"}) + if ip.String() != "198.51.100.7" { + t.Fatalf("got %v", ip) + } +} + +func TestResolveClientIPXFFRightmostInTrust(t *testing.T) { + r, _ := http.NewRequest("GET", "/", nil) + r.RemoteAddr = "10.0.0.5:1" + r.Header.Set("X-Forwarded-For", "1.2.3.4, 5.6.7.8") + trusted := []*net.IPNet{mustCIDR("10.0.0.0/8")} + ip := ResolveClientIP(r, trusted, []string{"X-Forwarded-For"}) + if ip.String() != "5.6.7.8" { + t.Fatalf("got %v", ip) + } +} + +func TestResolveClientIPFallbackOnGarbage(t *testing.T) { + r, _ := http.NewRequest("GET", "/", nil) + r.RemoteAddr = "garbage" + ip := ResolveClientIP(r, nil, nil) + if !ip.IsUnspecified() { + t.Fatalf("got %v", ip) + } +} diff --git a/wisp/config.go b/wisp/config.go index 8e4f85d..2841c38 100644 --- a/wisp/config.go +++ b/wisp/config.go @@ -49,8 +49,13 @@ type Config struct { PasswordAuthRequired bool `json:"passwordAuthRequired"` PasswordUsers map[string]string `json:"passwordUsers"` - ParseRealIP bool `json:"parseRealIP"` - NonWSResponse string `json:"nonWSResponse"` + ParseRealIP bool `json:"parseRealIP"` + TrustedProxies []string `json:"trustedProxies"` + TrustedHeaders []string `json:"trustedHeaders"` + NonWSResponse string `json:"nonWSResponse"` + + // Parsed at construction; not user-visible JSON. + trustedProxyNets []*net.IPNet LogLevel string `json:"logLevel"` @@ -95,8 +100,10 @@ func DefaultConfig() Config { PasswordAuthRequired: false, PasswordUsers: map[string]string{}, - ParseRealIP: true, - NonWSResponse: "", + ParseRealIP: true, + TrustedProxies: []string{}, + TrustedHeaders: []string{"CF-Connecting-IP", "X-Forwarded-For"}, + NonWSResponse: "", LogLevel: "info", @@ -137,8 +144,10 @@ func CreateWispConfig(cfg *Config) *Config { PasswordAuthRequired: cfg.PasswordAuthRequired, PasswordUsers: cfg.PasswordUsers, - ParseRealIP: cfg.ParseRealIP, - NonWSResponse: cfg.NonWSResponse, + ParseRealIP: cfg.ParseRealIP, + TrustedProxies: cfg.TrustedProxies, + TrustedHeaders: cfg.TrustedHeaders, + NonWSResponse: cfg.NonWSResponse, LogLevel: cfg.LogLevel, diff --git a/wisp/wisp.go b/wisp/wisp.go index 0a96ed5..5fff186 100644 --- a/wisp/wisp.go +++ b/wisp/wisp.go @@ -1,8 +1,10 @@ package wisp import ( + "fmt" "net" "net/http" + "strings" "sync" "time" @@ -25,6 +27,23 @@ func (cfg *Config) InitResolver() { ResultOrder: cfg.DnsResultOrder, }) cfg.Logger = newLogger(cfg.LogLevel) + + cfg.trustedProxyNets = cfg.trustedProxyNets[:0] + for _, t := range cfg.TrustedProxies { + entry := t + if !strings.Contains(entry, "/") { + if ip := net.ParseIP(entry); ip != nil { + bits := 32 + if ip.To4() == nil { + bits = 128 + } + entry = fmt.Sprintf("%s/%d", entry, bits) + } + } + if _, n, err := net.ParseCIDR(entry); err == nil { + cfg.trustedProxyNets = append(cfg.trustedProxyNets, n) + } + } } type upgradeHandler struct { @@ -68,12 +87,19 @@ func CreateWispHandler(config *Config) http.HandlerFunc { tc.SetWriteBuffer(1 << 20) } + var trusted []*net.IPNet + if config.ParseRealIP { + trusted = config.trustedProxyNets + } + remoteIP := ResolveClientIP(r, trusted, config.TrustedHeaders) + wc := &wispConnection{ netConn: netConn, writeCh: make(chan writeReq, 4096), // funny number config: config, twispStreams: newTwisp(), isV2: useV2, + remoteIP: remoteIP.String(), } go wc.writeLoop() From 6938cfaa1704d52d7a4efd3fae1326eea23f1dae Mon Sep 17 00:00:00 2001 From: not-amplify Date: Tue, 19 May 2026 10:37:51 -0400 Subject: [PATCH 2/7] wisp: enforce AllowDirectIP/AllowPrivateIPs/AllowLoopbackIPs via EgressPolicy Activates the already-declared but unused safety flags in Config: - AllowDirectIP=false now refuses CONNECTs to IP-literal hostnames - AllowPrivateIPs=false / AllowLoopbackIPs=false now refuse CONNECTs whose resolved IP is in a private / loopback / link-local / multicast / unspecified range - IP-literal evaluation is also routed through the EgressPolicy so ::ffff:127.0.0.1 (IPv4-mapped IPv6 loopback) is rejected correctly - The DNS resolution loop now picks the first allowed IP rather than blindly taking ips[0], so DNS responses interleaving allowed and denied records cannot SSRF Adds wisp/egress.go + tests (6 covering private/public/allow/deny/v6). --- wisp/egress.go | 98 +++++++++++++++++++++++++++++++++++++++++++++ wisp/egress_test.go | 76 +++++++++++++++++++++++++++++++++++ wisp/wisp-stream.go | 32 ++++++++++++++- 3 files changed, 204 insertions(+), 2 deletions(-) create mode 100644 wisp/egress.go create mode 100644 wisp/egress_test.go diff --git a/wisp/egress.go b/wisp/egress.go new file mode 100644 index 0000000..9442236 --- /dev/null +++ b/wisp/egress.go @@ -0,0 +1,98 @@ +package wisp + +import "net" + +// EgressPolicy decides whether outbound connections to a given IP are +// permitted. Evaluation order: explicit deny (IPs, CIDRs), explicit allow +// (IPs, CIDRs), kind-based deny (loopback / private / link-local / +// multicast / unspecified) unless the corresponding AllowXxx flag is set. +type EgressPolicy struct { + AllowLoopback bool + AllowPrivate bool + + AllowIPs map[string]struct{} + AllowCIDRs []*net.IPNet + DenyIPs map[string]struct{} + DenyCIDRs []*net.IPNet +} + +// PolicyFromConfig builds an EgressPolicy from the Config knobs that +// upstream already exposes (AllowLoopbackIPs, AllowPrivateIPs). The +// caller may extend the returned policy with deny lists. +func PolicyFromConfig(cfg *Config) *EgressPolicy { + return &EgressPolicy{ + AllowLoopback: cfg.AllowLoopbackIPs, + AllowPrivate: cfg.AllowPrivateIPs, + } +} + +// Evaluate returns (allowed, reason). When allowed is true, reason is "". +// When false, reason is one of: "invalid", "deny_ip", "deny_cidr", +// "unspecified", "loopback", "link_local", "private", "multicast". +func (p *EgressPolicy) Evaluate(ip net.IP) (bool, string) { + if p == nil { + return true, "" + } + if ip == nil { + return false, "invalid" + } + // Unwrap IPv4-mapped IPv6 (::ffff:a.b.c.d) so v4 rules apply. + if v4 := ip.To4(); v4 != nil { + ip = v4 + } + + if p.DenyIPs != nil { + if _, ok := p.DenyIPs[ip.String()]; ok { + return false, "deny_ip" + } + } + for _, n := range p.DenyCIDRs { + if n.Contains(ip) { + return false, "deny_cidr" + } + } + + explicitAllow := false + if p.AllowIPs != nil { + if _, ok := p.AllowIPs[ip.String()]; ok { + explicitAllow = true + } + } + if !explicitAllow { + for _, n := range p.AllowCIDRs { + if n.Contains(ip) { + explicitAllow = true + break + } + } + } + if explicitAllow { + return true, "" + } + + if ip.IsUnspecified() { + return false, "unspecified" + } + if ip.IsLoopback() { + if p.AllowLoopback { + return true, "" + } + return false, "loopback" + } + if ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() { + if p.AllowPrivate { + return true, "" + } + return false, "link_local" + } + if ip.IsPrivate() { + if p.AllowPrivate { + return true, "" + } + return false, "private" + } + if ip.IsMulticast() { + return false, "multicast" + } + return true, "" +} diff --git a/wisp/egress_test.go b/wisp/egress_test.go new file mode 100644 index 0000000..38f9b93 --- /dev/null +++ b/wisp/egress_test.go @@ -0,0 +1,76 @@ +package wisp + +import ( + "net" + "testing" +) + +func mustNet(s string) *net.IPNet { + _, n, err := net.ParseCIDR(s) + if err != nil { + panic(err) + } + return n +} + +func TestEgressDeniesPrivateByDefault(t *testing.T) { + p := &EgressPolicy{} + cases := []string{ + "10.0.0.1", "192.168.1.5", "172.16.5.5", + "127.0.0.1", "169.254.1.1", + "::1", "fc00::1", "fe80::1", + "0.0.0.0", "224.0.0.1", + } + for _, c := range cases { + ok, _ := p.Evaluate(net.ParseIP(c)) + if ok { + t.Errorf("expected %s to be denied", c) + } + } +} + +func TestEgressAllowsPublic(t *testing.T) { + p := &EgressPolicy{} + cases := []string{"1.1.1.1", "8.8.8.8", "153.75.225.178", "2001:4860:4860::8888"} + for _, c := range cases { + ok, reason := p.Evaluate(net.ParseIP(c)) + if !ok { + t.Errorf("expected %s to be allowed, got reason %q", c, reason) + } + } +} + +func TestEgressAllowPrivate(t *testing.T) { + p := &EgressPolicy{AllowPrivate: true} + ok, _ := p.Evaluate(net.ParseIP("10.0.0.5")) + if !ok { + t.Fatal("expected AllowPrivate to permit 10.0.0.5") + } +} + +func TestEgressAllowLoopback(t *testing.T) { + p := &EgressPolicy{AllowLoopback: true} + ok, _ := p.Evaluate(net.ParseIP("127.0.0.1")) + if !ok { + t.Fatal("expected AllowLoopback to permit 127.0.0.1") + } +} + +func TestEgressDenyOverridesAllow(t *testing.T) { + p := &EgressPolicy{ + AllowPrivate: true, + DenyIPs: map[string]struct{}{"10.0.0.5": {}}, + } + ok, reason := p.Evaluate(net.ParseIP("10.0.0.5")) + if ok || reason != "deny_ip" { + t.Fatalf("got ok=%v reason=%q", ok, reason) + } +} + +func TestEgressIPv4MappedV6Unwrap(t *testing.T) { + p := &EgressPolicy{} + ok, reason := p.Evaluate(net.ParseIP("::ffff:127.0.0.1")) + if ok || reason != "loopback" { + t.Fatalf("got ok=%v reason=%q", ok, reason) + } +} diff --git a/wisp/wisp-stream.go b/wisp/wisp-stream.go index 525c5af..d99d792 100644 --- a/wisp/wisp-stream.go +++ b/wisp/wisp-stream.go @@ -57,8 +57,22 @@ func (s *wispStream) handleConnect(streamType uint8, port string, hostname strin } } + policy := PolicyFromConfig(cfg) + resolvedHostname := hostname - if cfg.DNSCache != nil { + if ip := net.ParseIP(hostname); ip != nil { + if !cfg.AllowDirectIP { + cfg.Logger.Warn("egress block: direct IP", "ip", s.wispConn.remoteIP, "dstIP", ip.String(), "port", port) + s.close(closeReasonBlocked) + return + } + if ok, reason := policy.Evaluate(ip); !ok { + cfg.Logger.Warn("egress block", "ip", s.wispConn.remoteIP, "dstIP", ip.String(), "port", port, "reason", reason) + s.close(closeReasonBlocked) + return + } + resolvedHostname = ip.String() + } else if cfg.DNSCache != nil { if _, whitelisted := cfg.Whitelist.Hostnames[hostname]; !whitelisted { ips, err := cfg.DNSCache.LookupIPAddr(context.Background(), hostname) if err != nil { @@ -69,7 +83,21 @@ func (s *wispStream) handleConnect(streamType uint8, port string, hostname strin s.close(closeReasonUnreachable) return } - resolvedHostname = ips[0].IP.String() + pickedReason := "" + for _, ipa := range ips { + if ok, reason := policy.Evaluate(ipa.IP); ok { + resolvedHostname = ipa.IP.String() + pickedReason = "" + break + } else { + pickedReason = reason + } + } + if pickedReason != "" { + cfg.Logger.Warn("egress block", "ip", s.wispConn.remoteIP, "host", hostname, "port", port, "reason", pickedReason) + s.close(closeReasonBlocked) + return + } } } From ffad0253468d8068c98d7201c8ce6e97cdce1bf2 Mon Sep 17 00:00:00 2001 From: not-amplify Date: Tue, 19 May 2026 10:39:11 -0400 Subject: [PATCH 3/7] wisp: add sliding-window limiter, counting semaphore, SYN-flood signature Three small dependency-free modules used by the next commit to enforce per-source / per-destination rate limits and detect SYN-flood patterns: - SlidingWindow: keyed fixed-window limiter (nil-safe = allow-all) - Semaphore: lock-free counting semaphore (nil-safe = unlimited) - Signatures + Detector: per-(connection, destination) ring buffer of dial outcomes; matches when MinSamples reached AND failed/total exceeds FailedHandshakeRatio in the configured time window Tests cover: rate limit boundary, window rollover, 16-way concurrent contention, nil-safety, signature matching/non-matching, eviction. --- wisp/limits.go | 95 +++++++++++++++++++++++++++++++++++++ wisp/limits_test.go | 93 +++++++++++++++++++++++++++++++++++++ wisp/signature.go | 103 +++++++++++++++++++++++++++++++++++++++++ wisp/signature_test.go | 78 +++++++++++++++++++++++++++++++ 4 files changed, 369 insertions(+) create mode 100644 wisp/limits.go create mode 100644 wisp/limits_test.go create mode 100644 wisp/signature.go create mode 100644 wisp/signature_test.go diff --git a/wisp/limits.go b/wisp/limits.go new file mode 100644 index 0000000..6ced925 --- /dev/null +++ b/wisp/limits.go @@ -0,0 +1,95 @@ +package wisp + +import ( + "sync" + "sync/atomic" + "time" +) + +// SlidingWindow is a keyed fixed-window rate limiter. Each key has its own +// window that resets after `window` duration; up to `limit` Allow() calls +// per window return true. A nil receiver behaves as allow-all. +type SlidingWindow struct { + limit int + window time.Duration + + mu sync.Mutex + entries map[string]*windowEntry +} + +type windowEntry struct { + start time.Time + count int +} + +func NewSlidingWindow(limit int, window time.Duration) *SlidingWindow { + return &SlidingWindow{ + limit: limit, + window: window, + entries: make(map[string]*windowEntry), + } +} + +func (w *SlidingWindow) Allow(key string) bool { + if w == nil || w.limit <= 0 { + return true + } + now := time.Now() + w.mu.Lock() + defer w.mu.Unlock() + e, ok := w.entries[key] + if !ok || now.Sub(e.start) >= w.window { + w.entries[key] = &windowEntry{start: now, count: 1} + return true + } + e.count++ + return e.count <= w.limit +} + +// Evict deletes entries whose window started more than `idle` ago. +func (w *SlidingWindow) Evict(idle time.Duration) { + if w == nil { + return + } + cutoff := time.Now().Add(-idle) + w.mu.Lock() + for k, e := range w.entries { + if e.start.Before(cutoff) { + delete(w.entries, k) + } + } + w.mu.Unlock() +} + +// Semaphore is a counting semaphore. TryAcquire is non-blocking and +// thread-safe; a nil receiver allows unlimited acquires. +type Semaphore struct { + max int64 + current int64 +} + +func NewSemaphore(max int) *Semaphore { + return &Semaphore{max: int64(max)} +} + +func (s *Semaphore) TryAcquire() bool { + if s == nil || s.max <= 0 { + return true + } + for { + cur := atomic.LoadInt64(&s.current) + if cur >= s.max { + return false + } + if atomic.CompareAndSwapInt64(&s.current, cur, cur+1) { + return true + } + } +} + +func (s *Semaphore) Release() { + if s == nil || s.max <= 0 { + return + } + atomic.AddInt64(&s.current, -1) +} diff --git a/wisp/limits_test.go b/wisp/limits_test.go new file mode 100644 index 0000000..1a3da38 --- /dev/null +++ b/wisp/limits_test.go @@ -0,0 +1,93 @@ +package wisp + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestSlidingWindowAllow(t *testing.T) { + w := NewSlidingWindow(3, time.Second) + for i := 0; i < 3; i++ { + if !w.Allow("k") { + t.Fatalf("attempt %d denied early", i) + } + } + if w.Allow("k") { + t.Fatal("4th attempt should be denied") + } + if !w.Allow("k2") { + t.Fatal("key2 should be independent") + } +} + +func TestSlidingWindowRollover(t *testing.T) { + w := NewSlidingWindow(2, 50*time.Millisecond) + w.Allow("k") + w.Allow("k") + if w.Allow("k") { + t.Fatal("should be denied") + } + time.Sleep(80 * time.Millisecond) + if !w.Allow("k") { + t.Fatal("should allow after rollover") + } +} + +func TestSlidingWindowConcurrent(t *testing.T) { + w := NewSlidingWindow(100, time.Second) + var wg sync.WaitGroup + var allowed int64 + for i := 0; i < 16; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 50; j++ { + if w.Allow("k") { + atomic.AddInt64(&allowed, 1) + } + } + }() + } + wg.Wait() + if got := atomic.LoadInt64(&allowed); got != 100 { + t.Fatalf("expected exactly 100 allowed, got %d", got) + } +} + +func TestSlidingWindowNilIsAllowAll(t *testing.T) { + var w *SlidingWindow + for i := 0; i < 1000; i++ { + if !w.Allow("x") { + t.Fatal("nil limiter must allow everything") + } + } +} + +func TestSemaphoreAcquireRelease(t *testing.T) { + s := NewSemaphore(2) + if !s.TryAcquire() { + t.Fatal("acquire 1") + } + if !s.TryAcquire() { + t.Fatal("acquire 2") + } + if s.TryAcquire() { + t.Fatal("acquire 3 should fail") + } + s.Release() + if !s.TryAcquire() { + t.Fatal("acquire after release") + } +} + +func TestSemaphoreNilIsAllowAll(t *testing.T) { + var s *Semaphore + for i := 0; i < 100; i++ { + if !s.TryAcquire() { + t.Fatal("nil semaphore must allow everything") + } + s.Release() + } +} diff --git a/wisp/signature.go b/wisp/signature.go new file mode 100644 index 0000000..18e72c9 --- /dev/null +++ b/wisp/signature.go @@ -0,0 +1,103 @@ +package wisp + +import ( + "fmt" + "strings" + "sync" + "time" +) + +// SignatureConfig configures the SYN-flood signature detector. +type SignatureConfig struct { + Enabled bool + Window time.Duration + MinSamples int + FailedHandshakeRatio float64 +} + +// Signatures holds per-(connection, destination) detectors. +type Signatures struct { + cfg SignatureConfig + mu sync.Mutex + per map[string]*Detector +} + +func NewSignatures(cfg SignatureConfig) *Signatures { + return &Signatures{cfg: cfg, per: make(map[string]*Detector)} +} + +func (s *Signatures) For(connID uint64, dstIP string, dstPort int) *Detector { + if s == nil || !s.cfg.Enabled { + return nopDetector + } + key := fmt.Sprintf("%d|%s:%d", connID, dstIP, dstPort) + s.mu.Lock() + defer s.mu.Unlock() + d, ok := s.per[key] + if !ok { + d = &Detector{cfg: s.cfg} + s.per[key] = d + } + return d +} + +func (s *Signatures) Forget(connID uint64) { + if s == nil { + return + } + prefix := fmt.Sprintf("%d|", connID) + s.mu.Lock() + for k := range s.per { + if strings.HasPrefix(k, prefix) { + delete(s.per, k) + } + } + s.mu.Unlock() +} + +type sample struct { + t time.Time + ok bool +} + +// Detector is a per-tuple ring of recent dial outcomes. +type Detector struct { + cfg SignatureConfig + mu sync.Mutex + ring []sample +} + +var nopDetector = &Detector{} + +func (d *Detector) Record(ok bool) { + if d == nil || d.cfg.Window == 0 { + return + } + now := time.Now() + d.mu.Lock() + defer d.mu.Unlock() + d.ring = append(d.ring, sample{now, ok}) + cutoff := now.Add(-d.cfg.Window) + for len(d.ring) > 0 && d.ring[0].t.Before(cutoff) { + d.ring = d.ring[1:] + } +} + +func (d *Detector) Match() bool { + if d == nil || d.cfg.Window == 0 { + return false + } + d.mu.Lock() + defer d.mu.Unlock() + if len(d.ring) < d.cfg.MinSamples { + return false + } + failed := 0 + for _, s := range d.ring { + if !s.ok { + failed++ + } + } + ratio := float64(failed) / float64(len(d.ring)) + return ratio >= d.cfg.FailedHandshakeRatio +} diff --git a/wisp/signature_test.go b/wisp/signature_test.go new file mode 100644 index 0000000..edd6595 --- /dev/null +++ b/wisp/signature_test.go @@ -0,0 +1,78 @@ +package wisp + +import ( + "testing" + "time" +) + +func TestSignatureNoMatchOnSuccess(t *testing.T) { + s := NewSignatures(SignatureConfig{ + Enabled: true, + Window: 2 * time.Second, + MinSamples: 4, + FailedHandshakeRatio: 0.75, + }) + d := s.For(1, "9.9.9.9", 80) + for i := 0; i < 10; i++ { + d.Record(true) + } + if d.Match() { + t.Fatal("should not match when all succeed") + } +} + +func TestSignatureMatchOnFailures(t *testing.T) { + s := NewSignatures(SignatureConfig{ + Enabled: true, + Window: 2 * time.Second, + MinSamples: 4, + FailedHandshakeRatio: 0.75, + }) + d := s.For(1, "9.9.9.9", 80) + for i := 0; i < 10; i++ { + d.Record(false) + } + if !d.Match() { + t.Fatal("should match") + } +} + +func TestSignatureBelowMinSamples(t *testing.T) { + s := NewSignatures(SignatureConfig{ + Enabled: true, + Window: 2 * time.Second, + MinSamples: 10, + FailedHandshakeRatio: 0.5, + }) + d := s.For(1, "x", 1) + d.Record(false) + d.Record(false) + if d.Match() { + t.Fatal("below min samples should not match") + } +} + +func TestSignatureDisabledIsNop(t *testing.T) { + s := NewSignatures(SignatureConfig{Enabled: false}) + d := s.For(1, "x", 1) + d.Record(false) + if d.Match() { + t.Fatal("disabled signatures should never match") + } +} + +func TestSignaturesForget(t *testing.T) { + s := NewSignatures(SignatureConfig{ + Enabled: true, + Window: time.Second, + MinSamples: 1, + FailedHandshakeRatio: 0.5, + }) + d1 := s.For(1, "x", 1) + d1.Record(false) + s.Forget(1) + d2 := s.For(1, "x", 1) + if d2 == d1 { + t.Fatal("Forget should evict the detector") + } +} From f503ed92ce801c8671c73b1c2bd88f6cf759afe9 Mon Sep 17 00:00:00 2001 From: not-amplify Date: Tue, 19 May 2026 10:40:17 -0400 Subject: [PATCH 4/7] wisp: add persistent reputation store Tracks per-source-IP and per-destination-ip:port scores (0-100). Bad behavior raises score; long-lived successful streams lower it. Persisted to JSON via atomic-rename so it survives restart, with eviction of low-score idle entries. Designed for the distributed-source attack pattern the OVH abuse tickets revealed: client-side script on a popular attacker site drives many residential IPs at one target, each looking innocent per-source. The destination accumulates flags from distinct sources, eventually crossing the strict tier and refusing new CONNECTs to it. Tests cover: score arithmetic, clamping, distinct-source escalation, JSON round-trip, decay, tier mapping, nil-safety (7 tests). --- wisp/reputation.go | 310 ++++++++++++++++++++++++++++++++++++++++ wisp/reputation_test.go | 132 +++++++++++++++++ 2 files changed, 442 insertions(+) create mode 100644 wisp/reputation.go create mode 100644 wisp/reputation_test.go diff --git a/wisp/reputation.go b/wisp/reputation.go new file mode 100644 index 0000000..839ac56 --- /dev/null +++ b/wisp/reputation.go @@ -0,0 +1,310 @@ +package wisp + +import ( + "encoding/json" + "fmt" + "net" + "os" + "path/filepath" + "sync" + "time" +) + +// ReputationConfig configures the reputation store. +type ReputationConfig struct { + Enabled bool `json:"enabled"` + StorePath string `json:"storePath"` + DecayPerHour int `json:"scoreDecayPerHour"` + EvictAfter time.Duration `json:"-"` // populated from EvictAfterDays + EvictDays int `json:"evictAfterDays"` + Weights map[string]int `json:"weights"` + DestWeights map[string]int `json:"destinationWeights"` + Thresholds struct { + Warn int `json:"warn"` + Throttle int `json:"throttle"` + Strict int `json:"strict"` + } `json:"thresholds"` + SaveIntervalSeconds int `json:"saveIntervalSeconds"` +} + +// SourceEntry tracks reputation for a single source IP. +type SourceEntry struct { + Score int `json:"score"` + FirstSeen time.Time `json:"firstSeen"` + LastSeen time.Time `json:"lastSeen"` + Events map[string]int `json:"events"` +} + +// DestEntry tracks reputation for a single destination IP:port. It records +// the set of distinct source IPs that have hit it so botnet-style +// distributed abuse becomes visible. +type DestEntry struct { + Score int `json:"score"` + FirstSeen time.Time `json:"firstSeen"` + LastSeen time.Time `json:"lastSeen"` + Events map[string]int `json:"events"` + DistinctSources int `json:"distinctSources"` + SeenSources map[string]bool `json:"seenSources"` +} + +// Reputation is the in-memory store. Methods are nil-safe. +type Reputation struct { + cfg ReputationConfig + mu sync.RWMutex + sources map[string]*SourceEntry + dests map[string]*DestEntry + lastDecay time.Time +} + +func NewReputation(cfg ReputationConfig) *Reputation { + return &Reputation{ + cfg: cfg, + sources: make(map[string]*SourceEntry), + dests: make(map[string]*DestEntry), + lastDecay: time.Now(), + } +} + +func clampScore(v int) int { + if v < 0 { + return 0 + } + if v > 100 { + return 100 + } + return v +} + +// AddSource bumps the source entry's score by the configured weight for +// reason and records the event. No-op for nil receiver. +func (r *Reputation) AddSource(key, reason string) { + if r == nil { + return + } + w := r.cfg.Weights[reason] + r.mu.Lock() + defer r.mu.Unlock() + e, ok := r.sources[key] + now := time.Now() + if !ok { + e = &SourceEntry{FirstSeen: now, Events: make(map[string]int)} + r.sources[key] = e + } + e.LastSeen = now + if e.Events == nil { + e.Events = make(map[string]int) + } + e.Events[reason]++ + e.Score = clampScore(e.Score + w) +} + +// AddDest bumps the destination entry's score by the configured destination +// weight for reason. When srcIP is new for this destination, the +// destination's score also gets the distinctSourcesEscalation bonus. +func (r *Reputation) AddDest(ip string, port int, reason string, srcIP net.IP) { + if r == nil { + return + } + w := r.cfg.DestWeights[reason] + r.mu.Lock() + defer r.mu.Unlock() + key := fmt.Sprintf("%s:%d", ip, port) + e, ok := r.dests[key] + now := time.Now() + if !ok { + e = &DestEntry{ + FirstSeen: now, + Events: make(map[string]int), + SeenSources: make(map[string]bool), + } + r.dests[key] = e + } + e.LastSeen = now + if e.Events == nil { + e.Events = make(map[string]int) + } + if e.SeenSources == nil { + e.SeenSources = make(map[string]bool) + } + e.Events[reason]++ + e.Score = clampScore(e.Score + w) + if srcIP != nil { + s := srcIP.String() + if !e.SeenSources[s] { + e.SeenSources[s] = true + e.DistinctSources++ + if esc := r.cfg.DestWeights["distinctSourcesEscalation"]; esc != 0 { + e.Score = clampScore(e.Score + esc) + } + } + } +} + +func (r *Reputation) SourceScore(key string) int { + if r == nil { + return 0 + } + r.mu.RLock() + defer r.mu.RUnlock() + if e, ok := r.sources[key]; ok { + return e.Score + } + return 0 +} + +func (r *Reputation) DestScore(ip string, port int) int { + if r == nil { + return 0 + } + r.mu.RLock() + defer r.mu.RUnlock() + if e, ok := r.dests[fmt.Sprintf("%s:%d", ip, port)]; ok { + return e.Score + } + return 0 +} + +// Tier maps a score to a configured tier. +type Tier int + +const ( + TierNormal Tier = iota + TierWarn + TierThrottle + TierStrict +) + +func (r *Reputation) Tier(score int) Tier { + if r == nil { + return TierNormal + } + t := r.cfg.Thresholds + switch { + case t.Strict > 0 && score >= t.Strict: + return TierStrict + case t.Throttle > 0 && score >= t.Throttle: + return TierThrottle + case t.Warn > 0 && score >= t.Warn: + return TierWarn + } + return TierNormal +} + +type repSnapshot struct { + Sources map[string]*SourceEntry `json:"sources"` + Dests map[string]*DestEntry `json:"destinations"` +} + +// SaveNow writes the current store to StorePath via atomic rename. +func (r *Reputation) SaveNow() error { + if r == nil || r.cfg.StorePath == "" { + return nil + } + r.mu.RLock() + snap := repSnapshot{Sources: r.sources, Dests: r.dests} + data, err := json.MarshalIndent(snap, "", " ") + r.mu.RUnlock() + if err != nil { + return err + } + dir := filepath.Dir(r.cfg.StorePath) + if err := os.MkdirAll(dir, 0o755); err != nil { + return err + } + tmp := r.cfg.StorePath + ".tmp" + if err := os.WriteFile(tmp, data, 0o600); err != nil { + return err + } + return os.Rename(tmp, r.cfg.StorePath) +} + +// Load reads the store from StorePath. A missing file is not an error. +func (r *Reputation) Load() error { + if r == nil || r.cfg.StorePath == "" { + return nil + } + data, err := os.ReadFile(r.cfg.StorePath) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + var snap repSnapshot + if err := json.Unmarshal(data, &snap); err != nil { + return err + } + r.mu.Lock() + if snap.Sources != nil { + r.sources = snap.Sources + } + if snap.Dests != nil { + r.dests = snap.Dests + } + r.mu.Unlock() + return nil +} + +// ForceDecay subtracts DecayPerHour * (elapsed / hour) from every entry's +// score, clamped to 0. Exposed for tests; production code uses a goroutine. +func (r *Reputation) ForceDecay(elapsed time.Duration) { + if r == nil || r.cfg.DecayPerHour == 0 { + return + } + delta := int(float64(r.cfg.DecayPerHour) * elapsed.Hours()) + if delta <= 0 { + return + } + r.mu.Lock() + defer r.mu.Unlock() + for _, e := range r.sources { + e.Score = clampScore(e.Score - delta) + } + for _, e := range r.dests { + e.Score = clampScore(e.Score - delta) + } + r.lastDecay = time.Now() +} + +// Evict removes entries whose score is below 5 and whose LastSeen is older +// than EvictAfter. No-op if EvictAfter is zero. +func (r *Reputation) Evict() { + if r == nil || r.cfg.EvictAfter == 0 { + return + } + cutoff := time.Now().Add(-r.cfg.EvictAfter) + r.mu.Lock() + defer r.mu.Unlock() + for k, e := range r.sources { + if e.Score < 5 && e.LastSeen.Before(cutoff) { + delete(r.sources, k) + } + } + for k, e := range r.dests { + if e.Score < 5 && e.LastSeen.Before(cutoff) { + delete(r.dests, k) + } + } +} + +// RunMaintenance ticks every saveEvery, applying decay, eviction, and +// persistence. Exits when stop is closed; performs a final save on exit. +func (r *Reputation) RunMaintenance(stop <-chan struct{}, saveEvery time.Duration) { + if r == nil { + return + } + t := time.NewTicker(saveEvery) + defer t.Stop() + for { + select { + case <-stop: + _ = r.SaveNow() + return + case <-t.C: + now := time.Now() + r.ForceDecay(now.Sub(r.lastDecay)) + r.Evict() + _ = r.SaveNow() + } + } +} diff --git a/wisp/reputation_test.go b/wisp/reputation_test.go new file mode 100644 index 0000000..ec25bf2 --- /dev/null +++ b/wisp/reputation_test.go @@ -0,0 +1,132 @@ +package wisp + +import ( + "fmt" + "net" + "os" + "path/filepath" + "testing" + "time" +) + +func TestReputationAddAndScore(t *testing.T) { + r := NewReputation(ReputationConfig{ + Enabled: true, + Weights: map[string]int{"foo": 10}, + }) + r.AddSource("1.1.1.1", "foo") + if r.SourceScore("1.1.1.1") != 10 { + t.Fatalf("got %d", r.SourceScore("1.1.1.1")) + } + r.AddSource("1.1.1.1", "foo") + if r.SourceScore("1.1.1.1") != 20 { + t.Fatalf("got %d", r.SourceScore("1.1.1.1")) + } +} + +func TestReputationClamps(t *testing.T) { + r := NewReputation(ReputationConfig{ + Enabled: true, + Weights: map[string]int{"big": 60}, + }) + r.AddSource("a", "big") + r.AddSource("a", "big") + if r.SourceScore("a") != 100 { + t.Fatalf("expected clamp to 100, got %d", r.SourceScore("a")) + } +} + +func TestReputationDestDistinctSources(t *testing.T) { + r := NewReputation(ReputationConfig{ + Enabled: true, + DestWeights: map[string]int{ + "hit": 1, + "distinctSourcesEscalation": 1, + }, + }) + for i := 0; i < 50; i++ { + r.AddDest("9.9.9.9", 80, "hit", net.ParseIP(fmt.Sprintf("10.0.0.%d", i+1))) + } + if got := r.DestScore("9.9.9.9", 80); got < 50 { + t.Fatalf("expected score >= 50, got %d", got) + } +} + +func TestReputationPersistRoundtrip(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "rep.json") + r := NewReputation(ReputationConfig{ + Enabled: true, + Weights: map[string]int{"x": 7}, + StorePath: path, + }) + r.AddSource("1.2.3.4", "x") + if err := r.SaveNow(); err != nil { + t.Fatal(err) + } + stat, err := os.Stat(path) + if err != nil { + t.Fatal(err) + } + if stat.Size() == 0 { + t.Fatal("empty file") + } + r2 := NewReputation(ReputationConfig{ + Enabled: true, + Weights: map[string]int{"x": 7}, + StorePath: path, + }) + if err := r2.Load(); err != nil { + t.Fatal(err) + } + if r2.SourceScore("1.2.3.4") != 7 { + t.Fatalf("got %d", r2.SourceScore("1.2.3.4")) + } +} + +func TestReputationDecay(t *testing.T) { + r := NewReputation(ReputationConfig{ + Enabled: true, + Weights: map[string]int{"x": 50}, + DecayPerHour: 50, + }) + r.AddSource("a", "x") + r.ForceDecay(time.Hour) + if got := r.SourceScore("a"); got != 0 { + t.Fatalf("expected 0 after decay, got %d", got) + } +} + +func TestReputationTier(t *testing.T) { + cfg := ReputationConfig{Enabled: true} + cfg.Thresholds.Warn = 20 + cfg.Thresholds.Throttle = 50 + cfg.Thresholds.Strict = 80 + r := NewReputation(cfg) + cases := []struct { + score int + want Tier + }{ + {0, TierNormal}, {19, TierNormal}, + {20, TierWarn}, {49, TierWarn}, + {50, TierThrottle}, {79, TierThrottle}, + {80, TierStrict}, {100, TierStrict}, + } + for _, c := range cases { + if got := r.Tier(c.score); got != c.want { + t.Errorf("score=%d got=%v want=%v", c.score, got, c.want) + } + } +} + +func TestReputationNilSafe(t *testing.T) { + var r *Reputation + r.AddSource("x", "y") + r.AddDest("a", 1, "b", net.IPv4zero) + if r.SourceScore("x") != 0 { + t.Fatal("expected 0 from nil") + } + if r.Tier(50) != TierNormal { + t.Fatal("nil Tier should be Normal") + } +} From 9cd7aa99dc8ee1eefdcf3c6684d90115cceebb54 Mon Sep 17 00:00:00 2001 From: not-amplify Date: Tue, 19 May 2026 10:43:42 -0400 Subject: [PATCH 5/7] wisp: integrate flood-protection pipeline into handleConnect Adds FloodProtectionConfig + Globals to wisp.Config, builds the global limiters at handler init, and wires every CONNECT through: per-WS concurrent-stream cap per-source-IP sliding window per-destination IP:port sliding windows (sec + min) reputation strict-tier refusal in-flight SYN semaphore (TCP only) per-(WS, dst) SYN-flood signature detector with WS-close on match WsCloseAfterViolations escalates repeat offenders by closing the entire WebSocket after N enforcement hits. Also caps total concurrent WS connections at upgrade time, releases the semaphore on disconnect, and forgets per-WS signature state. Twisp gating now requires v2 + PasswordAuth + completed auth (using upstream's existing twispAuthorized() + authenticated flag); v1 twisp is refused outright. Every block emits a structured wisp.Logger Warn line for fail2ban-friendly ingest. Tests: existing test suite still passes; smoke test boots and returns the expected 400 to non-WS probes. --- wisp/config.go | 37 +++++++++++++++++ wisp/wisp-connection.go | 90 +++++++++++++++++++++++++++++++++++++++-- wisp/wisp-stream.go | 73 +++++++++++++++++++++++++++++++++ wisp/wisp.go | 57 ++++++++++++++++++++++++++ 4 files changed, 253 insertions(+), 4 deletions(-) diff --git a/wisp/config.go b/wisp/config.go index 2841c38..fc1f52d 100644 --- a/wisp/config.go +++ b/wisp/config.go @@ -68,10 +68,47 @@ type Config struct { BufferRemainingLength uint32 `json:"bufferRemainingLength"` + FloodProtection *FloodProtectionConfig `json:"floodProtection"` + Reputation *ReputationConfig `json:"reputation"` + Logger Logger DNSCache *DNSCache ReadBufPool *sync.Pool Dialer net.Dialer + Globals *Globals +} + +// FloodProtectionConfig groups every flood-mitigation knob. +type FloodProtectionConfig struct { + Enabled bool `json:"enabled"` + MaxConnectsPerSourceIPPerSecond int `json:"maxConnectsPerSourceIPPerSecond"` + MaxConnectsPerDestPerSecond int `json:"maxConnectsPerDestPerSecond"` + MaxConnectsPerDestPerMinute int `json:"maxConnectsPerDestPerMinute"` + MaxInFlightSyns int `json:"maxInFlightSyns"` + MaxConcurrentStreamsPerConnection int `json:"maxConcurrentStreamsPerConnection"` + MaxConcurrentConnections int `json:"maxConcurrentConnections"` + SynFloodSignature struct { + Enabled bool `json:"enabled"` + WindowMs int `json:"windowMs"` + MinSamples int `json:"minSamples"` + FailedHandshakeRatio float64 `json:"failedHandshakeRatio"` + } `json:"synFloodSignature"` + WsCloseAfterViolations int `json:"wsCloseAfterViolations"` + LogBlockedDials bool `json:"logBlockedDials"` +} + +// Globals holds process-wide enforcement state injected into wispConnection. +// Fields may be nil when the corresponding feature is disabled; all methods +// on the contained types are nil-safe. +type Globals struct { + PerSource *SlidingWindow + PerDestSec *SlidingWindow + PerDestMin *SlidingWindow + InFlightSyns *Semaphore + Connections *Semaphore + Egress *EgressPolicy + Reputation *Reputation + Signature *Signatures } func DefaultConfig() Config { diff --git a/wisp/wisp-connection.go b/wisp/wisp-connection.go index e102469..b5952fd 100644 --- a/wisp/wisp-connection.go +++ b/wisp/wisp-connection.go @@ -16,6 +16,8 @@ type writeReq struct { pool bool } +var connIDCounter uint64 + type wispConnection struct { netConn net.Conn writeCh chan writeReq @@ -38,6 +40,10 @@ type wispConnection struct { closeCh chan struct{} createdAt time.Time streamCount atomic.Int32 + + globals *Globals + connID uint64 + violations atomic.Int32 } func (c *wispConnection) close() { @@ -140,20 +146,37 @@ func (c *wispConnection) handleConnectPacket(streamId uint32, payload []byte) { hostname := string(payload[3:]) c.config.Logger.Debug("creating stream", "ip", c.remoteIP, "streamId", streamId, "hostname", hostname, "port", port, "type", streamType) - if streamType == streamTypeTerm { - if !c.config.EnableTwisp { - c.sendClosePacket(streamId, closeReasonBlocked) + + // Per-connection concurrent-stream cap. + if c.config.FloodProtection != nil && c.config.FloodProtection.MaxConcurrentStreamsPerConnection > 0 { + if c.streamCount.Load() >= int32(c.config.FloodProtection.MaxConcurrentStreamsPerConnection) { + c.violation("per_ws_streams") + c.sendClosePacket(streamId, closeReasonThrottled) return } - go handleTwisp(c, streamId, hostname) + } + + // Per-source-IP rate. + if c.globals != nil && c.globals.PerSource != nil && !c.globals.PerSource.Allow(c.remoteIP) { + c.violation("per_source_rate") + c.repAddSource("burstRate") + c.config.Logger.Warn("flood block", "reason", "per_source_rate", "ip", c.remoteIP, "host", hostname, "port", port) + c.sendClosePacket(streamId, closeReasonThrottled) return } + if streamType == streamTypeTerm { + c.handleTwispConnect(streamId, hostname) + return + } + + portNum, _ := strconv.Atoi(port) stream := &wispStream{ wispConn: c, streamId: streamId, connReady: make(chan struct{}), hostname: strings.ToLower(strings.TrimSpace(hostname)), + portNum: portNum, } stream.isOpen.Store(true) @@ -166,6 +189,57 @@ func (c *wispConnection) handleConnectPacket(streamId uint32, payload []byte) { go stream.handleConnect(streamType, port, hostname) } +// handleTwispConnect gates terminal-stream requests. Twisp requires v2, +// auth configured, and the client to have completed auth. +func (c *wispConnection) handleTwispConnect(streamId uint32, command string) { + if !c.config.EnableTwisp { + c.sendClosePacket(streamId, closeReasonBlocked) + return + } + if !c.isV2 { + c.repAddSource("twispNoAuth") + c.config.Logger.Warn("twisp blocked", "reason", "v1_no_auth", "ip", c.remoteIP) + c.sendClosePacket(streamId, closeReasonBlocked) + return + } + if !c.config.PasswordAuth { + c.repAddSource("twispNoAuth") + c.config.Logger.Warn("twisp blocked", "reason", "no_auth_configured", "ip", c.remoteIP) + c.sendClosePacket(streamId, closeReasonBlocked) + return + } + if !c.twispAuthorized() { + c.repAddSource("twispNoAuth") + c.config.Logger.Warn("twisp blocked", "reason", "not_authenticated", "ip", c.remoteIP) + c.sendClosePacket(streamId, closeReasonBlocked) + return + } + go handleTwisp(c, streamId, command) +} + +func (c *wispConnection) violation(reason string) { + if c.config.FloodProtection == nil || c.config.FloodProtection.WsCloseAfterViolations <= 0 { + return + } + n := c.violations.Add(1) + if n >= int32(c.config.FloodProtection.WsCloseAfterViolations) { + c.config.Logger.Warn("ws closed for violations", "ip", c.remoteIP, "violations", n, "lastReason", reason) + c.close() + } +} + +func (c *wispConnection) repAddSource(reason string) { + if c.globals != nil && c.globals.Reputation != nil { + c.globals.Reputation.AddSource(c.remoteIP, reason) + } +} + +func (c *wispConnection) repAddDest(ip string, port int, reason string) { + if c.globals != nil && c.globals.Reputation != nil { + c.globals.Reputation.AddDest(ip, port, reason, net.ParseIP(c.remoteIP)) + } +} + func (c *wispConnection) handleDataPacket(streamId uint32, payload []byte) { var stream *wispStream if c.cachedStreamId == streamId { @@ -321,6 +395,14 @@ func (c *wispConnection) deleteAllWispStreams() { ts.close(closeReasonUnspecified) } } + if c.globals != nil { + if c.globals.Connections != nil { + c.globals.Connections.Release() + } + if c.globals.Signature != nil { + c.globals.Signature.Forget(c.connID) + } + } defer func() { recover() }() close(c.writeCh) } diff --git a/wisp/wisp-stream.go b/wisp/wisp-stream.go index d99d792..83a6838 100644 --- a/wisp/wisp-stream.go +++ b/wisp/wisp-stream.go @@ -20,6 +20,7 @@ type wispStream struct { conn net.Conn bufferRemaining uint32 hostname string + portNum int connReady chan struct{} connReadyDone atomic.Bool @@ -101,6 +102,50 @@ func (s *wispStream) handleConnect(streamType uint8, port string, hostname strin } } + c := s.wispConn + + // Per-destination rate caps. + dstKey := net.JoinHostPort(resolvedHostname, port) + if c.globals != nil { + if c.globals.PerDestSec != nil && !c.globals.PerDestSec.Allow(dstKey) { + c.violation("per_dest_sec") + c.repAddSource("burstRate") + c.repAddDest(resolvedHostname, s.portNum, "burstRate") + cfg.Logger.Warn("flood block", "reason", "per_dest_sec", "ip", c.remoteIP, "dstIP", resolvedHostname, "port", port) + s.close(closeReasonThrottled) + return + } + if c.globals.PerDestMin != nil && !c.globals.PerDestMin.Allow(dstKey) { + c.violation("per_dest_min") + cfg.Logger.Warn("flood block", "reason", "per_dest_min", "ip", c.remoteIP, "dstIP", resolvedHostname, "port", port) + s.close(closeReasonThrottled) + return + } + } + + // Reputation-strict destination check. + if c.globals != nil && c.globals.Reputation != nil { + ds := c.globals.Reputation.DestScore(resolvedHostname, s.portNum) + if c.globals.Reputation.Tier(ds) == TierStrict { + c.repAddSource("requestKnownBadDest") + cfg.Logger.Warn("flood block", "reason", "dest_reputation_strict", "ip", c.remoteIP, "dstIP", resolvedHostname, "port", port) + s.close(closeReasonBlocked) + return + } + } + + // In-flight SYN cap. + synAcquired := false + if c.globals != nil && c.globals.InFlightSyns != nil && streamType == streamTypeTCP { + if !c.globals.InFlightSyns.TryAcquire() { + c.violation("in_flight_syns") + cfg.Logger.Warn("flood block", "reason", "in_flight_syns", "ip", c.remoteIP, "dstIP", resolvedHostname, "port", port) + s.close(closeReasonThrottled) + return + } + synAcquired = true + } + s.streamType = streamType s.bufferRemaining = cfg.BufferRemainingLength @@ -125,15 +170,43 @@ func (s *wispStream) handleConnect(streamType uint8, port string, hostname strin } case streamTypeUDP: if cfg.Proxy != "" || !cfg.AllowUDP { + if synAcquired { + c.globals.InFlightSyns.Release() + } s.close(closeReasonBlocked) return } s.conn, err = net.Dial("udp", destination) default: + if synAcquired { + c.globals.InFlightSyns.Release() + } s.close(closeReasonInvalidInfo) return } + if synAcquired { + c.globals.InFlightSyns.Release() + } + + // Record dial outcome for SYN-flood signature. + if c.globals != nil && c.globals.Signature != nil && streamType == streamTypeTCP { + det := c.globals.Signature.For(c.connID, resolvedHostname, s.portNum) + det.Record(err == nil) + if det.Match() { + c.repAddSource("synSignature") + c.repAddDest(resolvedHostname, s.portNum, "synSignature") + cfg.Logger.Warn("syn-flood signature matched; closing WS", + "ip", c.remoteIP, "dstIP", resolvedHostname, "port", port) + if err == nil && s.conn != nil { + s.conn.Close() + } + s.close(closeReasonBlocked) + c.close() + return + } + } + if err != nil { cfg.Logger.Warn("stream connection failed", "ip", s.wispConn.remoteIP, "hostname", hostname, "port", port, "error", err) s.close(mapDialError(err)) diff --git a/wisp/wisp.go b/wisp/wisp.go index 5fff186..a4d454c 100644 --- a/wisp/wisp.go +++ b/wisp/wisp.go @@ -6,6 +6,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" "github.com/lxzan/gws" @@ -46,12 +47,55 @@ func (cfg *Config) InitResolver() { } } +// BuildGlobals constructs the process-wide enforcement state from cfg. +// Idempotent: an already-built cfg.Globals is left as-is. +func (cfg *Config) BuildGlobals() { + if cfg.Globals != nil { + return + } + g := &Globals{Egress: PolicyFromConfig(cfg)} + if cfg.FloodProtection != nil && cfg.FloodProtection.Enabled { + fp := cfg.FloodProtection + if fp.MaxConnectsPerSourceIPPerSecond > 0 { + g.PerSource = NewSlidingWindow(fp.MaxConnectsPerSourceIPPerSecond, time.Second) + } + if fp.MaxConnectsPerDestPerSecond > 0 { + g.PerDestSec = NewSlidingWindow(fp.MaxConnectsPerDestPerSecond, time.Second) + } + if fp.MaxConnectsPerDestPerMinute > 0 { + g.PerDestMin = NewSlidingWindow(fp.MaxConnectsPerDestPerMinute, time.Minute) + } + if fp.MaxInFlightSyns > 0 { + g.InFlightSyns = NewSemaphore(fp.MaxInFlightSyns) + } + if fp.MaxConcurrentConnections > 0 { + g.Connections = NewSemaphore(fp.MaxConcurrentConnections) + } + g.Signature = NewSignatures(SignatureConfig{ + Enabled: fp.SynFloodSignature.Enabled, + Window: time.Duration(fp.SynFloodSignature.WindowMs) * time.Millisecond, + MinSamples: fp.SynFloodSignature.MinSamples, + FailedHandshakeRatio: fp.SynFloodSignature.FailedHandshakeRatio, + }) + } + if cfg.Reputation != nil && cfg.Reputation.Enabled { + rc := *cfg.Reputation + if rc.EvictAfter == 0 && rc.EvictDays > 0 { + rc.EvictAfter = time.Duration(rc.EvictDays) * 24 * time.Hour + } + g.Reputation = NewReputation(rc) + _ = g.Reputation.Load() + } + cfg.Globals = g +} + type upgradeHandler struct { gws.BuiltinEventHandler } func CreateWispHandler(config *Config) http.HandlerFunc { config.InitResolver() + config.BuildGlobals() readBufSize := 15 + config.TcpBufferSize config.ReadBufPool = &sync.Pool{ @@ -73,10 +117,21 @@ func CreateWispHandler(config *Config) http.HandlerFunc { }) return func(w http.ResponseWriter, r *http.Request) { + // Cap concurrent WS connections before upgrade. + if config.Globals != nil && config.Globals.Connections != nil { + if !config.Globals.Connections.TryAcquire() { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + } + useV2 := config.EnableV2 && r.Header.Get("Sec-WebSocket-Protocol") != "" wsConn, err := upgrader.Upgrade(w, r) if err != nil { + if config.Globals != nil && config.Globals.Connections != nil { + config.Globals.Connections.Release() + } return } @@ -100,6 +155,8 @@ func CreateWispHandler(config *Config) http.HandlerFunc { twispStreams: newTwisp(), isV2: useV2, remoteIP: remoteIP.String(), + globals: config.Globals, + connID: atomic.AddUint64(&connIDCounter, 1), } go wc.writeLoop() From 6d2ea59ed4d18d54fa0831e8c104b44dc929e5aa Mon Sep 17 00:00:00 2001 From: not-amplify Date: Tue, 19 May 2026 10:46:44 -0400 Subject: [PATCH 6/7] wisp/v2: optional bcrypt password storage passwordUsers entries that start with a bcrypt prefix ($2a$/$2b$/$2y$) are verified via golang.org/x/crypto/bcrypt; plaintext entries continue to work but emit a one-time deprecation warning to the configured Logger. The constant-time comparison is preserved for plaintext. Tests cover plaintext match/mismatch, bcrypt match/mismatch, and length-mismatch (constant-time). --- go.mod | 1 + go.sum | 2 ++ wisp/v2.go | 27 +++++++++++++++++++++++---- wisp/v2_test.go | 35 +++++++++++++++++++++++++++++++++++ 4 files changed, 61 insertions(+), 4 deletions(-) create mode 100644 wisp/v2_test.go diff --git a/go.mod b/go.mod index 9de587f..fb5af72 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.0 require ( github.com/creack/pty v1.1.21 github.com/lxzan/gws v1.8.3 + golang.org/x/crypto v0.51.0 golang.org/x/net v0.53.0 golang.org/x/sync v0.9.0 ) diff --git a/go.sum b/go.sum index 7342238..c4fa80a 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= +golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= diff --git a/wisp/v2.go b/wisp/v2.go index 017a036..5251377 100644 --- a/wisp/v2.go +++ b/wisp/v2.go @@ -4,9 +4,31 @@ import ( "crypto/subtle" "encoding/binary" "errors" + "strings" + "sync/atomic" "time" + + "golang.org/x/crypto/bcrypt" ) +// checkPassword compares stored against provided in constant time. Stored +// values that begin with a bcrypt prefix ($2a$/$2b$/$2y$) are verified via +// bcrypt; otherwise a constant-time byte compare is used (plaintext, with +// a deprecation log emitted once). +var plaintextWarned atomic.Bool + +func checkPassword(logger Logger, stored, provided string) bool { + if strings.HasPrefix(stored, "$2a$") || strings.HasPrefix(stored, "$2b$") || strings.HasPrefix(stored, "$2y$") { + return bcrypt.CompareHashAndPassword([]byte(stored), []byte(provided)) == nil + } + if !plaintextWarned.Swap(true) && logger != nil { + logger.Warn("plaintext passwords in passwordUsers are deprecated; use bcrypt hashes ($2a$/$2b$)") + } + expBytes := []byte(stored) + gotBytes := []byte(provided) + return len(expBytes) == len(gotBytes) && subtle.ConstantTimeCompare(expBytes, gotBytes) == 1 +} + var errorInvalid = errors.New("invalid wisp v2 payload") type extensions struct { @@ -151,10 +173,7 @@ func (c *wispConnection) handleInfo(streamId uint32, payload []byte) { if c.config.PasswordAuth && clientExts.passwordUsername != "" { expectedPassword, userExists := c.config.PasswordUsers[clientExts.passwordUsername] - expBytes := []byte(expectedPassword) - gotBytes := []byte(clientExts.passwordPassword) - ok := userExists && len(expBytes) == len(gotBytes) && subtle.ConstantTimeCompare(expBytes, gotBytes) == 1 - if ok { + if userExists && checkPassword(c.config.Logger, expectedPassword, clientExts.passwordPassword) { authPassed = true } else { c.sendClosePacket(0, closeReasonAuthBadPassword) diff --git a/wisp/v2_test.go b/wisp/v2_test.go new file mode 100644 index 0000000..1925b1f --- /dev/null +++ b/wisp/v2_test.go @@ -0,0 +1,35 @@ +package wisp + +import ( + "testing" + + "golang.org/x/crypto/bcrypt" +) + +func TestCheckPasswordPlaintext(t *testing.T) { + if !checkPassword(nil, "hunter2", "hunter2") { + t.Fatal("plain match") + } + if checkPassword(nil, "hunter2", "hunter3") { + t.Fatal("plain mismatch should fail") + } +} + +func TestCheckPasswordBcrypt(t *testing.T) { + hash, err := bcrypt.GenerateFromPassword([]byte("s3cret"), bcrypt.MinCost) + if err != nil { + t.Fatal(err) + } + if !checkPassword(nil, string(hash), "s3cret") { + t.Fatal("bcrypt match") + } + if checkPassword(nil, string(hash), "wrong") { + t.Fatal("bcrypt mismatch should fail") + } +} + +func TestCheckPasswordLengthMismatch(t *testing.T) { + if checkPassword(nil, "abc", "abcd") { + t.Fatal("length mismatch must fail") + } +} From 8a35efd8397de5f9c1c5ead32216275e1675a4db Mon Sep 17 00:00:00 2001 From: not-amplify Date: Tue, 19 May 2026 10:48:45 -0400 Subject: [PATCH 7/7] docs: README and example.config.json updated for flood protection Documents the new floodProtection and reputation config blocks, the trustedProxies/trustedHeaders fields, the twisp-auth requirement, and bcrypt password support. Also notes the SSRF-by-default behavior change (allowDirectIP / allowPrivateIPs / allowLoopbackIPs default false). Renames example.config.json's parseRealIPFrom to trustedProxies to match the field actually consumed by the Go server. --- README.md | 13 +++++++++++++ example.config.json | 43 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f02c2c8..6dfae7e 100644 --- a/README.md +++ b/README.md @@ -236,6 +236,19 @@ Copy `example.config.json` to `config.json` and edit as needed: | `certAuthPublicKeys` | []string | Allowed Ed25519 public keys (hex-encoded) | | `enableStreamConfirm` | bool | Send confirmation when streams connect | +### Flood protection & egress + +To prevent the server from being abused as a TCP SYN flood relay (see OVH abuse reports the maintainers received), this build adds: + +- **SSRF defense:** `allowDirectIP` now refuses CONNECTs to IP-literal hostnames; `allowPrivateIPs` and `allowLoopbackIPs` reject CONNECTs whose resolved IP is in private/loopback/link-local/multicast ranges. The DNS resolution loop walks all returned addresses and picks the first allowed one, so DNS responses can't bypass the policy by interleaving private and public IPs. +- **Per-source / per-destination rate limits:** sliding-window limiters on source IP, destination IP:port (sec + min). Defaults: 50/sec/source, 8/sec/dest, 60/min/dest. Tune via `floodProtection`. +- **In-flight SYN cap:** counting semaphore around outbound TCP dials. +- **SYN-flood signature detector:** per-(WS, dst) ring buffer of dial outcomes. When ≥`minSamples` attempts in `windowMs` exceed `failedHandshakeRatio` failed-handshake fraction, the WS is closed. +- **Reputation store:** persists 0–100 scores per source IP and per destination IP:port to `reputation.storePath` via atomic-rename. Bad behavior (egress violations, SYN signatures, twisp-without-auth, repeated burst-rate hits) raises the score; long-lived successful streams lower it. When a destination's score reaches the strict threshold (default 81) — which happens when many distinct sources hit it — new CONNECTs to that destination are refused. This is the defense against the popular-website-script attack pattern where many residential IPs each look innocent on their own but collectively target one victim. +- **Trusted proxy support:** `parseRealIP: true` plus `trustedProxies` (CIDRs) plus `trustedHeaders` (default: CF-Connecting-IP, X-Forwarded-For). Headers are honored only when the immediate peer is in `trustedProxies`. +- **Twisp now requires auth:** with `enableTwisp: true`, you must also enable `passwordAuth` and the client must pass the v2 auth handshake. Anonymous twisp on v1 is refused. `passwordUsers` may contain bcrypt hashes (prefix `$2a$`/`$2b$`/`$2y$`); plaintext is deprecated and logs a warning on first use. +- **Structured logging:** every block emits a Logger.Warn line — fail2ban-friendly. Example regex: `"flood block".*"ip", ""`. + ## Credits - [soap phia](https://github.com/soap-phia/) - writing most of this - [rebecca](https://github.com/rebeccaheartz69/) - greatly helping with implementing wisp v2 and extensions diff --git a/example.config.json b/example.config.json index 97128f1..d753627 100644 --- a/example.config.json +++ b/example.config.json @@ -40,7 +40,8 @@ "connectionsLimitPerIP": 20, "connectionWindowSeconds": 10, "parseRealIP": true, - "parseRealIPFrom": ["127.0.0.1"], + "trustedProxies": ["127.0.0.1"], + "trustedHeaders": ["CF-Connecting-IP", "X-Forwarded-For"], "maxMessageSize": 0, "staticDir": "", "nonWSResponse": "mrrow merp >w<", @@ -59,5 +60,43 @@ "maxConnectionsPerIP": 0, "globalMaxConnections": 0, "writeQueueSize": 4096, - "maxInboundBytesPerSecond": 0 + "maxInboundBytesPerSecond": 0, + "floodProtection": { + "enabled": true, + "maxConnectsPerSourceIPPerSecond": 50, + "maxConnectsPerDestPerSecond": 8, + "maxConnectsPerDestPerMinute": 60, + "maxInFlightSyns": 256, + "maxConcurrentStreamsPerConnection": 256, + "maxConcurrentConnections": 1024, + "synFloodSignature": { + "enabled": true, + "windowMs": 2000, + "minSamples": 32, + "failedHandshakeRatio": 0.75 + }, + "wsCloseAfterViolations": 16, + "logBlockedDials": true + }, + "reputation": { + "enabled": true, + "storePath": "./data/mrrowisp-reputation.json", + "saveIntervalSeconds": 30, + "scoreDecayPerHour": 1, + "evictAfterDays": 7, + "thresholds": { "warn": 21, "throttle": 51, "strict": 81 }, + "weights": { + "privateEgress": 15, + "synSignature": 25, + "twispNoAuth": 40, + "burstRate": 5, + "successfulStream": -2, + "requestKnownBadDest": 2 + }, + "destinationWeights": { + "privateEgress": 20, + "synSignature": 30, + "distinctSourcesEscalation": 1 + } + } }