Skip to content

Commit ae82fc0

Browse files
intel352claude
andcommitted
fix: add eviction to unbounded rate-limit/cache/lock maps
- RateLimitStep.buckets: lazy cleanup goroutine (sync.Once), evicts buckets inactive for 10min, Stop() closes stop channel - gatewayRateLimiter.buckets: same pattern, started/stopped via APIGateway.Start/Stop lifecycle - WebhookSender.deadLetter: 10k cap with oldest-first trim, hourly purge of entries older than 24h; Start/Stop lifecycle - globalOAuthCache: sync.Once starts 5min cleanup goroutine on first getOrCreate; evicts entries whose token has expired - InMemoryLock.locks: lastUsed tracked per entry, 10min cleanup tick evicts idle entries older than 30min; Start/Stop methods added All cleanup goroutines include panic recovery and stop channels. Tests added for each map verifying shrinkage after backdated entries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9870f1c commit ae82fc0

10 files changed

Lines changed: 620 additions & 22 deletions

module/api_gateway.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,15 @@ type gatewayRateLimiter struct {
7171
buckets map[string]*tokenBucket
7272
rpm int
7373
burst int
74+
stopCh chan struct{}
7475
}
7576

7677
func newGatewayRateLimiter(rpm, burst int) *gatewayRateLimiter {
7778
return &gatewayRateLimiter{
7879
buckets: make(map[string]*tokenBucket),
7980
rpm: rpm,
8081
burst: burst,
82+
stopCh: make(chan struct{}),
8183
}
8284
}
8385

@@ -94,6 +96,44 @@ func (rl *gatewayRateLimiter) allow(clientIP string) bool {
9496
return bucket.allow()
9597
}
9698

99+
// startCleanup launches a background goroutine that evicts stale buckets every 5 minutes.
100+
func (rl *gatewayRateLimiter) startCleanup() {
101+
go func() {
102+
defer func() { recover() }() //nolint:errcheck
103+
ticker := time.NewTicker(5 * time.Minute)
104+
defer ticker.Stop()
105+
for {
106+
select {
107+
case <-ticker.C:
108+
rl.evictStaleBuckets(10 * time.Minute)
109+
case <-rl.stopCh:
110+
return
111+
}
112+
}
113+
}()
114+
}
115+
116+
// stop shuts down the background cleanup goroutine.
117+
func (rl *gatewayRateLimiter) stop() {
118+
select {
119+
case <-rl.stopCh:
120+
default:
121+
close(rl.stopCh)
122+
}
123+
}
124+
125+
// evictStaleBuckets removes buckets not accessed within ttl.
126+
func (rl *gatewayRateLimiter) evictStaleBuckets(ttl time.Duration) {
127+
cutoff := time.Now().Add(-ttl)
128+
rl.mu.Lock()
129+
defer rl.mu.Unlock()
130+
for k, b := range rl.buckets {
131+
if b.lastRefill.Before(cutoff) {
132+
delete(rl.buckets, k)
133+
}
134+
}
135+
}
136+
97137
// APIGatewayOption is a functional option for configuring an APIGateway at construction time.
98138
type APIGatewayOption func(*APIGateway)
99139

@@ -210,11 +250,27 @@ func (g *APIGateway) Name() string { return g.name }
210250
// Init initializes the module.
211251
func (g *APIGateway) Init(_ modular.Application) error { return nil }
212252

213-
// Start is a no-op.
214-
func (g *APIGateway) Start(_ context.Context) error { return nil }
253+
// Start launches background cleanup goroutines for all rate limiters.
254+
func (g *APIGateway) Start(_ context.Context) error {
255+
if g.instanceRateLimiter != nil {
256+
g.instanceRateLimiter.startCleanup()
257+
}
258+
for _, rl := range g.rateLimiters {
259+
rl.startCleanup()
260+
}
261+
return nil
262+
}
215263

216-
// Stop is a no-op.
217-
func (g *APIGateway) Stop(_ context.Context) error { return nil }
264+
// Stop shuts down background cleanup goroutines for all rate limiters.
265+
func (g *APIGateway) Stop(_ context.Context) error {
266+
if g.instanceRateLimiter != nil {
267+
g.instanceRateLimiter.stop()
268+
}
269+
for _, rl := range g.rateLimiters {
270+
rl.stop()
271+
}
272+
return nil
273+
}
218274

219275
// ProvidesServices returns the services provided by this module.
220276
func (g *APIGateway) ProvidesServices() []modular.ServiceProvider {

module/api_gateway_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package module
22

33
import (
4+
"context"
5+
"fmt"
46
"net/http"
57
"net/http/httptest"
68
"testing"
@@ -497,3 +499,60 @@ func TestAWSAPIGateway_SyncRoutesRequiresAPIID(t *testing.T) {
497499
t.Fatal("expected error when api_id is empty")
498500
}
499501
}
502+
503+
func TestGatewayRateLimiter_BucketEviction(t *testing.T) {
504+
rl := newGatewayRateLimiter(60, 5)
505+
506+
// Fill with unique client IPs
507+
for i := range 100 {
508+
rl.allow(fmt.Sprintf("192.168.1.%d", i%256))
509+
}
510+
511+
rl.mu.Lock()
512+
count := len(rl.buckets)
513+
rl.mu.Unlock()
514+
if count == 0 {
515+
t.Fatal("expected buckets to be populated")
516+
}
517+
518+
// Backdate all buckets
519+
rl.mu.Lock()
520+
for _, b := range rl.buckets {
521+
b.lastRefill = time.Now().Add(-20 * time.Minute)
522+
}
523+
rl.mu.Unlock()
524+
525+
rl.evictStaleBuckets(10 * time.Minute)
526+
527+
rl.mu.Lock()
528+
count = len(rl.buckets)
529+
rl.mu.Unlock()
530+
if count != 0 {
531+
t.Fatalf("expected 0 buckets after eviction, got %d", count)
532+
}
533+
}
534+
535+
func TestAPIGateway_StartStopRateLimiterCleanup(t *testing.T) {
536+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
537+
w.WriteHeader(http.StatusOK)
538+
}))
539+
defer backend.Close()
540+
541+
gw := NewAPIGateway("gw-lifecycle", WithRateLimit(&RateLimitConfig{
542+
RequestsPerMinute: 60,
543+
BurstSize: 10,
544+
}))
545+
if err := gw.SetRoutes([]GatewayRoute{
546+
{PathPrefix: "/api", Backend: backend.URL, RateLimit: &RateLimitConfig{RequestsPerMinute: 30, BurstSize: 5}},
547+
}); err != nil {
548+
t.Fatalf("SetRoutes failed: %v", err)
549+
}
550+
551+
ctx := context.Background()
552+
if err := gw.Start(ctx); err != nil {
553+
t.Fatalf("Start failed: %v", err)
554+
}
555+
if err := gw.Stop(ctx); err != nil {
556+
t.Fatalf("Stop failed: %v", err)
557+
}
558+
}

module/pipeline_step_http_call.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,22 @@ import (
2323
// isolated entry.
2424
var globalOAuthCache = &oauthTokenCache{ //nolint:gochecknoglobals // intentional process-wide cache
2525
entries: make(map[string]*oauthCacheEntry),
26+
stopCh: make(chan struct{}),
2627
}
2728

2829
// oauthTokenCache is a registry of per-credential token cache entries.
2930
type oauthTokenCache struct {
30-
mu sync.RWMutex
31-
entries map[string]*oauthCacheEntry
31+
mu sync.RWMutex
32+
entries map[string]*oauthCacheEntry
33+
startOnce sync.Once
34+
stopCh chan struct{}
3235
}
3336

3437
// getOrCreate returns the existing cache entry for key, or creates and stores a new one.
38+
// On first call it starts a background goroutine that evicts expired entries every 5 minutes.
3539
func (c *oauthTokenCache) getOrCreate(key string) *oauthCacheEntry {
40+
c.startOnce.Do(func() { go c.cleanupLoop() })
41+
3642
c.mu.RLock()
3743
entry, ok := c.entries[key]
3844
c.mu.RUnlock()
@@ -49,6 +55,36 @@ func (c *oauthTokenCache) getOrCreate(key string) *oauthCacheEntry {
4955
return entry
5056
}
5157

58+
// cleanupLoop evicts expired entries from the cache every 5 minutes.
59+
func (c *oauthTokenCache) cleanupLoop() {
60+
defer func() { recover() }() //nolint:errcheck
61+
ticker := time.NewTicker(5 * time.Minute)
62+
defer ticker.Stop()
63+
for {
64+
select {
65+
case <-ticker.C:
66+
c.evictExpired()
67+
case <-c.stopCh:
68+
return
69+
}
70+
}
71+
}
72+
73+
// evictExpired removes entries whose tokens have expired.
74+
func (c *oauthTokenCache) evictExpired() {
75+
now := time.Now()
76+
c.mu.Lock()
77+
defer c.mu.Unlock()
78+
for key, entry := range c.entries {
79+
entry.mu.Lock()
80+
expired := entry.accessToken == "" || now.After(entry.expiry)
81+
entry.mu.Unlock()
82+
if expired {
83+
delete(c.entries, key)
84+
}
85+
}
86+
}
87+
5288
// oauthCacheEntry holds a cached OAuth2 access token with expiry. A singleflight.Group is
5389
// embedded to ensure at most one concurrent token fetch per credential set.
5490
type oauthCacheEntry struct {

module/pipeline_step_http_call_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,3 +1115,41 @@ func TestHTTPCallStep_ElapsedMS(t *testing.T) {
11151115
t.Errorf("expected elapsed_ms >= 0, got %d", elapsedMS)
11161116
}
11171117
}
1118+
1119+
func TestOAuthTokenCache_ExpiredEntryEviction(t *testing.T) {
1120+
cache := &oauthTokenCache{
1121+
entries: make(map[string]*oauthCacheEntry),
1122+
stopCh: make(chan struct{}),
1123+
}
1124+
1125+
// Add entries: some expired, some valid
1126+
expiredEntry := &oauthCacheEntry{}
1127+
expiredEntry.set("expired-token", "", 1*time.Second)
1128+
expiredEntry.mu.Lock()
1129+
expiredEntry.expiry = time.Now().Add(-10 * time.Minute) // force expired
1130+
expiredEntry.mu.Unlock()
1131+
1132+
validEntry := &oauthCacheEntry{}
1133+
validEntry.set("valid-token", "", 1*time.Hour)
1134+
1135+
cache.mu.Lock()
1136+
cache.entries["expired-key"] = expiredEntry
1137+
cache.entries["valid-key"] = validEntry
1138+
cache.mu.Unlock()
1139+
1140+
cache.evictExpired()
1141+
1142+
cache.mu.RLock()
1143+
_, expiredStillPresent := cache.entries["expired-key"]
1144+
_, validStillPresent := cache.entries["valid-key"]
1145+
cache.mu.RUnlock()
1146+
1147+
if expiredStillPresent {
1148+
t.Error("expired entry should have been evicted")
1149+
}
1150+
if !validStillPresent {
1151+
t.Error("valid entry should not have been evicted")
1152+
}
1153+
1154+
close(cache.stopCh)
1155+
}

module/pipeline_step_rate_limit.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ type RateLimitStep struct {
1919
keyFrom string // template for per-client key
2020
tmpl *TemplateEngine
2121

22-
mu sync.Mutex
23-
buckets map[string]*tokenBucket
22+
mu sync.Mutex
23+
buckets map[string]*tokenBucket
24+
stopCh chan struct{}
25+
startOnce sync.Once
2426
}
2527

2628
// tokenBucket implements a simple token bucket rate limiter.
@@ -97,16 +99,57 @@ func NewRateLimitStepFactory() StepFactory {
9799
keyFrom: keyFrom,
98100
tmpl: NewTemplateEngine(),
99101
buckets: make(map[string]*tokenBucket),
102+
stopCh: make(chan struct{}),
100103
}, nil
101104
}
102105
}
103106

104107
// Name returns the step name.
105108
func (s *RateLimitStep) Name() string { return s.name }
106109

110+
// Stop shuts down the background cleanup goroutine. Safe to call multiple times
111+
// only if the goroutine was started (i.e., Execute was called at least once).
112+
func (s *RateLimitStep) Stop() {
113+
s.startOnce.Do(func() {}) // mark once as done so cleanup won't start if Stop races Execute
114+
select {
115+
case <-s.stopCh:
116+
default:
117+
close(s.stopCh)
118+
}
119+
}
120+
121+
// cleanupLoop removes stale token buckets every 5 minutes.
122+
func (s *RateLimitStep) cleanupLoop() {
123+
defer func() { recover() }() //nolint:errcheck
124+
ticker := time.NewTicker(5 * time.Minute)
125+
defer ticker.Stop()
126+
for {
127+
select {
128+
case <-ticker.C:
129+
s.evictStaleBuckets(10 * time.Minute)
130+
case <-s.stopCh:
131+
return
132+
}
133+
}
134+
}
135+
136+
// evictStaleBuckets removes buckets that have not been accessed within ttl.
137+
func (s *RateLimitStep) evictStaleBuckets(ttl time.Duration) {
138+
cutoff := time.Now().Add(-ttl)
139+
s.mu.Lock()
140+
defer s.mu.Unlock()
141+
for k, b := range s.buckets {
142+
if b.lastRefill.Before(cutoff) {
143+
delete(s.buckets, k)
144+
}
145+
}
146+
}
147+
107148
// Execute checks rate limiting for the resolved key and either allows or
108149
// rejects the request.
109150
func (s *RateLimitStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) {
151+
s.startOnce.Do(func() { go s.cleanupLoop() })
152+
110153
// Resolve the rate limit key from template
111154
key := s.keyFrom
112155
if s.tmpl != nil && key != "global" {

0 commit comments

Comments
 (0)