-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathasync.go
More file actions
66 lines (56 loc) · 2.04 KB
/
async.go
File metadata and controls
66 lines (56 loc) · 2.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package gorums
// Async is a generic future type for asynchronous quorum calls.
// It encapsulates the state of an asynchronous call and provides methods
// for checking the status or waiting for completion.
//
// Type parameter Resp is the response type from nodes.
type Async[Resp any] struct {
reply Resp
err error
c chan struct{}
}
// Get returns the reply and any error associated with the called method.
// The method blocks until a reply or error is available.
func (f *Async[Resp]) Get() (Resp, error) {
<-f.c
return f.reply, f.err
}
// Done reports if a reply and/or error is available for the called method.
func (f *Async[Resp]) Done() bool {
select {
case <-f.c:
return true
default:
return false
}
}
// AsyncFirst returns an Async future that resolves when the first response is received.
// Messages are sent immediately (synchronously) to preserve ordering.
func (r *Responses[Resp]) AsyncFirst() *Async[Resp] {
return r.AsyncThreshold(1)
}
// AsyncMajority returns an Async future that resolves when a majority quorum is reached.
// Messages are sent immediately (synchronously) to preserve ordering when multiple
// async calls are created in sequence.
func (r *Responses[Resp]) AsyncMajority() *Async[Resp] {
quorumSize := r.size/2 + 1
return r.AsyncThreshold(quorumSize)
}
// AsyncAll returns an Async future that resolves when all nodes have responded.
// Messages are sent immediately (synchronously) to preserve ordering.
func (r *Responses[Resp]) AsyncAll() *Async[Resp] {
return r.AsyncThreshold(r.size)
}
// AsyncThreshold returns an Async future that resolves when the threshold is reached.
// Messages are sent immediately (synchronously) to preserve ordering when multiple
// async calls are created in sequence.
func (r *Responses[Resp]) AsyncThreshold(threshold int) *Async[Resp] {
// Send messages synchronously before spawning the goroutine to preserve ordering
r.sendNow()
fut := &Async[Resp]{c: make(chan struct{}, 1)}
go func() {
defer close(fut.c)
fut.reply, fut.err = r.Threshold(threshold)
}()
return fut
}