-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpriority.go
More file actions
192 lines (169 loc) · 5.52 KB
/
priority.go
File metadata and controls
192 lines (169 loc) · 5.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package relay
import (
"container/heap"
"context"
"sync"
)
// Priority represents the urgency level of a request. Higher values indicate
// higher priority and are dequeued first when the bulkhead is at capacity.
// Within the same priority level, requests are dequeued in FIFO order.
type Priority int
const (
// PriorityLow is for background or non-critical requests.
PriorityLow Priority = 0
// PriorityNormal is the default priority for typical requests.
PriorityNormal Priority = 50
// PriorityHigh is for important requests that should execute sooner.
PriorityHigh Priority = 100
// PriorityCritical is for time-sensitive requests (health checks, auth, etc.).
PriorityCritical Priority = 200
)
// priorityItem wraps a Request with its enqueue sequence for FIFO
// ordering within the same priority level.
type priorityItem struct {
req *Request
priority Priority
sequence uint64
notify chan struct{} // closed when this item is dequeued
}
// priorityQueue implements container/heap.Interface for a max-heap where
// higher priority values are dequeued first. Within the same priority,
// lower sequence numbers (earlier arrivals) are dequeued first (FIFO).
type priorityQueue struct {
items []*priorityItem
sequence uint64
mu sync.Mutex
closed bool
}
// newPriorityQueue creates a new empty priority queue.
func newPriorityQueue() *priorityQueue {
return &priorityQueue{
items: make([]*priorityItem, 0),
}
}
// enqueueDirect adds a request directly to the heap without blocking.
// The item's notify channel is closed immediately so any hypothetical
// waiter would unblock right away. Use only in tests that want to
// pre-populate the queue and verify ordering without a running client.
func (pq *priorityQueue) enqueueDirect(req *Request, priority Priority) {
notify := make(chan struct{})
item := &priorityItem{
req: req,
priority: priority,
notify: notify,
}
pq.mu.Lock()
item.sequence = pq.sequence
pq.sequence++
heap.Push(pq, item)
pq.mu.Unlock()
}
// EnqueueAndWait adds a request to the priority queue and blocks until it is
// dequeued, respecting the context deadline. Returns an error if the context
// is cancelled or the queue is closed.
//
// onSlotAbandoned (optional) is called when the item was already dequeued
// (a bulkhead slot was transferred to this waiter) but ctx fired in the race
// window between notify-close and the select pick. The callback must
// re-transfer or free the slot to avoid leaking bulkhead capacity.
func (pq *priorityQueue) EnqueueAndWait(ctx context.Context, req *Request, priority Priority, onSlotAbandoned ...func()) error {
notify := make(chan struct{})
item := &priorityItem{
req: req,
priority: priority,
notify: notify,
}
pq.mu.Lock()
if pq.closed {
pq.mu.Unlock()
return ErrClientClosed
}
item.sequence = pq.sequence
pq.sequence++
heap.Push(pq, item)
pq.mu.Unlock()
// Wait for either dequeue notification or context cancellation.
select {
case <-notify:
return nil
case <-ctx.Done():
pq.mu.Lock()
removed := pq.removeItem(item)
pq.mu.Unlock()
if !removed && len(onSlotAbandoned) > 0 && onSlotAbandoned[0] != nil {
// The item was already dequeued (notify was closed) but the Go
// runtime picked ctx.Done() in the race. A bulkhead slot was
// transferred to us — call the callback so it can be recycled.
onSlotAbandoned[0]()
}
return ctx.Err()
}
}
// removeItem removes a specific item from the queue. Returns true if the item
// was found and removed, false if it was not present (already dequeued).
// Must be called under lock.
func (pq *priorityQueue) removeItem(target *priorityItem) bool {
for i, item := range pq.items {
if item == target {
heap.Remove(pq, i)
return true
}
}
return false
}
// DequeueNext dequeues and returns the highest-priority request from the queue.
// Notifies the waiting request so it can proceed.
func (pq *priorityQueue) DequeueNext() (*Request, Priority) {
pq.mu.Lock()
defer pq.mu.Unlock()
if len(pq.items) == 0 {
return nil, PriorityNormal
}
item := heap.Pop(pq).(*priorityItem)
close(item.notify)
return item.req, item.priority
}
// Size returns the number of items currently in the queue. Safe to call
// from outside; acquires the lock.
func (pq *priorityQueue) Size() int {
pq.mu.Lock()
defer pq.mu.Unlock()
return len(pq.items)
}
// Close marks the queue as closed, causing new EnqueueAndWait calls to fail.
func (pq *priorityQueue) Close() {
pq.mu.Lock()
defer pq.mu.Unlock()
pq.closed = true
}
// Implement container/heap.Interface
// NOTE: these methods are called by container/heap while pq.mu is already
// held by the caller (EnqueueAndWait / DequeueNext). They must NOT acquire
// the mutex to avoid deadlocks.
// Len satisfies heap.Interface. Must be called with pq.mu held.
func (pq *priorityQueue) Len() int {
return len(pq.items)
}
// Less compares heap items for max-heap ordering.
// Not safe to call without holding pq.mu.
func (pq *priorityQueue) Less(i, j int) bool {
// Max-heap: higher priority value comes first
if pq.items[i].priority != pq.items[j].priority {
return pq.items[i].priority > pq.items[j].priority
}
// FIFO within same priority: lower sequence comes first
return pq.items[i].sequence < pq.items[j].sequence
}
func (pq *priorityQueue) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
}
func (pq *priorityQueue) Push(x interface{}) {
pq.items = append(pq.items, x.(*priorityItem))
}
func (pq *priorityQueue) Pop() interface{} {
old := pq.items
n := len(old)
x := old[n-1]
pq.items = old[0 : n-1]
return x
}