forked from fluxcd/pkg
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue.go
More file actions
133 lines (110 loc) · 3.33 KB
/
queue.go
File metadata and controls
133 lines (110 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
Copyright 2025 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"sync"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
)
// QueueEventSource holds enough tracking information about the
// source object that triggered a queue event and implements the
// error interface.
type QueueEventSource struct {
Kind string `json:"kind"`
Name string `json:"name"`
Namespace string `json:"namespace"`
UID types.UID `json:"uid"`
ResourceVersion string `json:"resourceVersion"`
}
// Ensure QueueEventSource implements the error interface.
var _ error = &QueueEventSource{}
// Error returns a string representation of the object represented by QueueEventSource.
func (q *QueueEventSource) Error() string {
return fmt.Sprintf("%s/%s/%s", q.Kind, q.Namespace, q.Name)
}
// Is returns true if the target error is a QueueEventSource object.
func (*QueueEventSource) Is(target error) bool {
_, ok := target.(*QueueEventSource)
return ok
}
// queueEventType represents the type of event that occurred in the queue.
type queueEventType int
const (
// queueEventObjectEnqueued indicates that an object was enqueued.
queueEventObjectEnqueued queueEventType = iota
)
// queueEventPayload is the payload delivered to listeners
// when a queue event occurs.
type queueEventPayload struct {
source QueueEventSource
}
// queueHooks implements mechanisms for hooking to queue events.
type queueHooks struct {
lis map[queueEvent][]*queueListener
mu sync.Mutex
}
// queueEvent represents an event related to the queue.
type queueEvent struct {
queueEventType
ctrl.Request
}
// queueListener represents a listener for a queue event.
type queueListener struct {
ctx context.Context
cancel context.CancelFunc
payload chan<- *queueEventPayload
}
func newQueueHooks() *queueHooks {
return &queueHooks{
lis: make(map[queueEvent][]*queueListener),
}
}
func (q *queueHooks) dispatch(event queueEvent, payload queueEventPayload) {
q.mu.Lock()
listeners := q.lis[event]
delete(q.lis, event)
q.collectGarbage()
q.mu.Unlock()
for _, l := range listeners {
l.payload <- &payload
l.cancel()
}
}
func (q *queueHooks) registerListener(ctx context.Context, event queueEvent) (
context.Context, context.CancelFunc, <-chan *queueEventPayload,
) {
ctx, cancel := context.WithCancel(ctx)
payload := make(chan *queueEventPayload, 1)
q.mu.Lock()
q.collectGarbage()
q.lis[event] = append(q.lis[event], &queueListener{ctx, cancel, payload})
q.mu.Unlock()
return ctx, cancel, payload
}
func (q *queueHooks) collectGarbage() {
for key, listeners := range q.lis {
var alive []*queueListener
for _, l := range listeners {
if l.ctx.Err() == nil {
alive = append(alive, l)
}
}
if len(alive) > 0 {
q.lis[key] = alive
} else {
delete(q.lis, key)
}
}
}