-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBlockingStackImpl.kt
More file actions
112 lines (95 loc) · 3.42 KB
/
BlockingStackImpl.kt
File metadata and controls
112 lines (95 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import java.util.concurrent.atomic.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
class BlockingStackImpl<E> : BlockingStack<E> {
// ==========================
// Segment Queue Synchronizer
// ==========================
private val queueHead: AtomicReference<Receiver<E>>
private val queueTail: AtomicReference<Receiver<E>>
init {
val queueDummy = Receiver<E>()
queueHead = AtomicReference(queueDummy)
queueTail = AtomicReference(queueDummy)
}
private suspend fun suspend(): E {
return suspendCoroutine sc@{ cont ->
val node = Receiver(cont)
while (true) {
val tail = queueTail.get()
if (tail.next.compareAndSet(null, node)) {
queueTail.compareAndSet(tail, node)
break
}
}
}
}
private fun resume(element: E) {
while (true) {
val head = queueHead.get()
val tail = queueTail.get()
if (head != queueHead.get() || tail != queueTail.get() || head == tail) {
continue
}
val node = head.next.get() ?: continue
if (node.continuation != null && queueHead.compareAndSet(head, node)) {
node.continuation.resume(element)
return
}
}
}
private class Receiver<E>(
val continuation: Continuation<E>? = null,
val next: AtomicReference<Receiver<E>?> = AtomicReference(null)
)
// ==============
// Blocking Stack
// ==============
private val stackHead = AtomicReference<Node<E>?>()
private val elements = AtomicInteger()
override fun push(element: E) {
val elements = this.elements.getAndIncrement()
if (elements >= 0) {
// push the element to the top of the stack
while (true) {
val head = stackHead.get()
if (head == null) {
if (stackHead.compareAndSet(null, Node(element))) return
} else {
if (head.element != SUSPENDED) {
if (stackHead.compareAndSet(head, Node(element, AtomicReference(head)))) return
} else {
val node = head.next.get()
if (stackHead.compareAndSet(head, node)) {
resume(element)
return
}
}
}
}
} else {
// resume the next waiting receiver
resume(element)
}
}
override suspend fun pop(): E {
val elements = this.elements.getAndDecrement()
if (elements > 0) {
// remove the top element from the stack
while (true) {
val head = stackHead.get()
if (head == null) {
if (stackHead.compareAndSet(null, Node(SUSPENDED))) return suspend()
} else {
val node = head.next.get()
if (stackHead.compareAndSet(head, node)) return head.element as E
}
}
} else {
return suspend()
}
}
}
private class Node<E>(val element: Any? = null, val next: AtomicReference<Node<E>?> = AtomicReference(null))
private val SUSPENDED = Any()