-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.go
More file actions
132 lines (112 loc) · 3.41 KB
/
executor.go
File metadata and controls
132 lines (112 loc) · 3.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package async
import "sync"
// An Executor is a coroutine spawner, and a coroutine runner.
//
// When a coroutine is spawned or resumed, it is added into an internal queue.
// The Run method then pops and runs each of them from the queue until
// the queue is emptied.
// It is done in a single-threaded manner.
// If one coroutine blocks, no other coroutines can run.
// The best practice is not to block.
//
// The internal queue is a priority queue.
// Coroutines added in the queue are sorted by their weights.
// Coroutines with the same weight are sorted by their levels
// (child coroutines have one level higher than their parent ones).
// Coroutines with the same weight and level are sorted by their arrival
// order (FIFO).
// Popping the queue removes the first coroutine with the highest weight or
// the least level.
//
// Manually calling the Run method is usually not desired.
// One would instead use the Autorun method to set up an autorun function to
// calling the Run method automatically whenever a coroutine is spawned or
// resumed.
// An Executor never calls the autorun function twice at the same time.
type Executor struct {
mu sync.Mutex
pq priorityqueue[*Coroutine]
ps panicstack
running bool
autorun func()
}
// Autorun sets up an autorun function to calling the Run method automatically
// whenever a coroutine is spawned or resumed.
//
// One must pass a function that calls the Run method.
//
// If f blocks, the Spawn method may block too.
// The best practice is not to block.
func (e *Executor) Autorun(f func()) {
e.mu.Lock()
e.autorun = f
e.mu.Unlock()
}
// Run pops and runs every coroutine in the queue until the queue is emptied.
//
// Run must not be called twice at the same time.
func (e *Executor) Run() {
e.mu.Lock()
e.running = true
for !e.pq.Empty() {
co := e.pq.Pop()
e.runCoroutine(co)
}
ps := e.ps
e.ps = nil
e.running = false
e.mu.Unlock()
ps.Repanic()
}
// Spawn creates a coroutine with default weight to work on t.
//
// The coroutine is added in a queue. To run it, either call the Run method, or
// call the Autorun method to set up an autorun function beforehand.
//
// Spawn is safe for concurrent use.
func (e *Executor) Spawn(t Task) {
e.SpawnWeighted(0, t)
}
// SpawnBlocking is like [Executor.Spawn] but also blocks the running goroutine
// until t completes.
func (e *Executor) SpawnBlocking(t Task) {
e.SpawnWeightedBlocking(0, t)
}
// SpawnWeighted creates a coroutine with weight w to work on t.
//
// The coroutine is added in a queue. To run it, either call the Run method, or
// call the Autorun method to set up an autorun function beforehand.
//
// SpawnWeighted is safe for concurrent use.
func (e *Executor) SpawnWeighted(w Weight, t Task) {
e.spawn(nil, w, t)
}
// SpawnWeightedBlocking is like [Executor.SpawnWeighted] but also blocks
// the running goroutine until t completes.
func (e *Executor) SpawnWeightedBlocking(w Weight, t Task) {
wg := waitGroupPool.Get().(*sync.WaitGroup)
e.spawn(wg, w, t)
wg.Wait()
waitGroupPool.Put(wg)
}
func (e *Executor) spawn(wg *sync.WaitGroup, w Weight, t Task) {
var autorun func()
co := newCoroutine().init(0, w, e, t)
if wg != nil {
wg.Add(1)
co.wg = wg
}
e.mu.Lock()
if !e.running && e.autorun != nil {
e.running = true
autorun = e.autorun
}
e.pq.Push(co)
e.mu.Unlock()
if autorun != nil {
autorun()
}
}
var waitGroupPool = sync.Pool{
New: func() any { return new(sync.WaitGroup) },
}