-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathengine.go
More file actions
347 lines (296 loc) · 9.14 KB
/
engine.go
File metadata and controls
347 lines (296 loc) · 9.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
// Package taskengine concurrently execute a set of tasks assigned to multiple different workers.
//
// Each worker can works all or a subset of the tasks.
//
// When a worker is ready, the next task to execute is dynamically choosen
// considering the current status of the tasks
// so to maximize the thoughput of the tasks successfully executed.
//
// After the first success result is found the remaining jobs for same task are cancelled.
package taskengine
import (
"context"
"fmt"
"time"
)
// Mode of execution for each task.
type Mode int
// Values of mode of execution for each task.
const (
// For each task returns the result of all the workers: success, error or canceled.
// Multiple success results can be returned.
AllResults Mode = iota
// For each task returns the success or error results.
// The canceled resuts are not returned.
// Multiple success results can be returned.
SuccessOrErrorResults
// For each task returns the results preceding the first success (included).
// At most one success is returned.
ResultsUntilFirstSuccess
// For each task returns only one result: the first success or the last result.
FirstSuccessOrLastResult
)
// Engine type is the main struct used to execute the tasks.
// It internally saves the inforations about the workers and the tasks of each worker.
type Engine struct {
workers map[WorkerID]*Worker
widtasks WorkerTasks // map[WorkerID]*Tasks
workersList []*Worker // original workers list
}
// jobInput is the internal struct passed to a worker to execute a task.
type jobInput struct {
ctx context.Context // task context
cancel context.CancelFunc // func to cancel task execution
task Task // task to be executed
outc chan *jobOutput // output channel
stat TaskStat // used for Start event
}
// jobOutput contains the result returned by the worker with the
// WorkerID and instance in executing the given task.
// A nil result indicates that the worker instance is ready to perform a task.
type jobOutput struct {
res Result // can be nil
wid WorkerID
instance int
task Task // not used if res is nil
timeStart time.Time
timeEnd time.Time
}
// NewEngine initialize a new engine object from the list of workers and the tasks of each worker.
// It performs some sanity checks and returns error in case of incongruences.
func NewEngine(ws []*Worker, wts WorkerTasks) (*Engine, error) {
// check workers and build a map from workerid to Worker
workers := map[WorkerID]*Worker{}
for _, w := range ws {
if _, ok := workers[w.WorkerID]; ok {
return nil, fmt.Errorf("duplicate worker: WorkerID=%q", w.WorkerID)
}
if w.Instances <= 0 || w.Instances > maxInstances {
return nil, fmt.Errorf("instances must be in 1..%d range: WorkerID=%q", maxInstances, w.WorkerID)
}
if w.Work == nil {
return nil, fmt.Errorf("work function cannot be nil: WorkerID=%q", w.WorkerID)
}
workers[w.WorkerID] = w
}
// create each taskID context
widtasks := WorkerTasks{}
for wid, ts := range wts {
// for empty task lists, continue
if len(ts) == 0 {
continue
}
// check the worker exists
if _, ok := workers[wid]; !ok {
return nil, fmt.Errorf("tasks for undefined worker: WorkerID=%q", wid)
}
// save the task list of the worker in the engine
widtasks[wid] = ts
}
return &Engine{
workers: workers,
widtasks: widtasks,
workersList: ws,
}, nil
}
// FilterEventFunc returns a function that, given an *Event,
// returns true if the event satisfy the criteria of the Mode argument.
func FilterEventFunc(mode Mode) func(*Event) bool {
switch mode {
case FirstSuccessOrLastResult:
return IsFirstSuccessOrLastResult
case ResultsUntilFirstSuccess:
return IsResultUntilFirstSuccess
case SuccessOrErrorResults:
return IsSuccessOrError
default:
return IsResult
}
}
// Execute returns a chan that receives the results generated by tasks execution.
// It calls the ExecuteEvents method and filters the returned results based on
// the Mode parameter.
func (eng *Engine) Execute(ctx context.Context, mode Mode) (chan Result, error) {
// init the event chan
eventchan, err := eng.ExecuteEvents(ctx)
if err != nil {
return nil, err
}
// func to filter the results to be exported
exportResult := FilterEventFunc(mode)
// create the result chan
resultchan := make(chan Result)
// goroutine that read input from the event chan
// write output to the result chan.
go func(eventc chan *Event, resultc chan Result, export func(*Event) bool) {
for e := range eventc {
if export(e) {
resultc <- e.Result
}
}
close(resultc)
}(eventchan, resultchan, exportResult)
return resultchan, nil
}
// ExecuteEvents returns a chan that receives all the Events
// generated by each task execution.
// For each (worker, task) pair, it is emitted a Start event
// followed by a final event that can be Success, Canceled or Error.
//
// The method is useful to track the execution of the tasks:
// while the Execute method can only return the result on completion
// of execution, the ExecuteEvents method returns also the Start event
// at the beginning of execution (with a nil result).
func (eng *Engine) ExecuteEvents(ctx context.Context) (chan *Event, error) {
if eng == nil {
return nil, fmt.Errorf("nil engine")
}
if ctx == nil {
return nil, fmt.Errorf("nil context")
}
// creates the Event channel
eventc := make(chan *Event)
// creates the *jobOutput channel
outputc := make(chan *jobOutput)
// creates the *jobInput chan of each worker
inputc := map[WorkerID](chan *jobInput){}
for wid := range eng.workers {
inputc[wid] = make(chan *jobInput)
}
// creates each task context
taskctx := map[TaskID]context.Context{}
taskcancel := map[TaskID]context.CancelFunc{}
for _, ts := range eng.widtasks {
for _, t := range ts {
tid := t.TaskID()
if _, ok := taskctx[tid]; !ok {
ctx, cancel := context.WithCancel(ctx)
taskctx[tid] = ctx
taskcancel[tid] = cancel
}
}
}
// Starts the goroutines that executes the real work.
// For each worker it starts N goroutines, with N = Instances.
// Each goroutine get the input from the worker request channel,
// and put the output to the task result channel (contained in the request).
for _, worker := range eng.workersList {
// for each worker instances
for i := 0; i < worker.Instances; i++ {
go func(w *Worker, inst int, inputc <-chan *jobInput) {
for req := range inputc {
timeStart := time.Now()
// start event
event := &Event{
Task: req.task,
WorkerID: w.WorkerID,
WorkerInst: inst,
Result: nil,
TaskStat: req.stat,
TimeStart: timeStart,
TimeEnd: timeStart,
}
eventc <- event
// get the worker result of the task
res := w.Work(req.ctx, w, inst, req.task)
// send the result to the output chan
jout := jobOutput{
wid: w.WorkerID,
instance: inst,
res: res,
task: req.task,
timeStart: timeStart,
timeEnd: time.Now(),
}
req.outc <- &jout
}
}(worker, i, inputc[worker.WorkerID])
}
}
// start a goroutine that, for each worker instance,
// send a void output to signal it is ready to work.
go func() {
for _, w := range eng.workersList {
wid := w.WorkerID
for i := 0; i < w.Instances; i++ {
jout := jobOutput{
wid: wid,
instance: i,
res: nil,
}
outputc <- &jout
}
}
}()
// main goroutine that handle the input and output from the workers
// and send the events to the event chan.
go func() {
// clone eng.widtasks
widtasks := eng.widtasks.Clone()
// init the status map from the WorkerTasks object
statMap := newTaskStatusMap(eng.widtasks)
for !statMap.completed() {
// get the next output
o := <-outputc
// handle result
if o.res != nil {
success := (o.res.Error() == nil)
tid := o.task.TaskID()
// updates task info map
statMap.done(tid, success)
if success {
// call cancel func for the task context
taskcancel[tid]()
}
// end event (success, error or canceled)
event := &Event{
Task: o.task,
WorkerID: o.wid,
WorkerInst: o.instance,
Result: o.res,
TaskStat: *statMap[tid],
TimeStart: o.timeStart,
TimeEnd: o.timeEnd,
}
eventc <- event
}
// select the next task of the worker
var nexttask Task
{
ts := widtasks[o.wid]
n := statMap.pick(ts)
if n >= 0 {
nexttask = ts.remove(n)
widtasks[o.wid] = ts
}
}
if nexttask == nil {
// close the worker chan
// NOTE: in case of a worker with two or more instances,
// the close of the channel must be called only once.
// Else get the error:
// panic: close of closed channel
if ch, ok := inputc[o.wid]; ok {
close(ch)
delete(inputc, o.wid)
}
} else {
tid := nexttask.TaskID()
// updates task info map
statMap.doing(tid)
// send the job to the worker
i := &jobInput{
ctx: taskctx[tid],
cancel: taskcancel[tid],
task: nexttask,
outc: outputc,
stat: *statMap[tid],
}
inputc[o.wid] <- i
}
}
close(outputc)
close(eventc)
}()
return eventc, nil
}