-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSynchronousQueueMS.kt
More file actions
93 lines (80 loc) · 3.01 KB
/
SynchronousQueueMS.kt
File metadata and controls
93 lines (80 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.*
class SynchronousQueueMS<E> : SynchronousQueue<E> {
enum class Status { OK, RETRY }
enum class NodeType { SENDER, RECEIVER }
inner class Node<T>(
val value: AtomicReference<T?> = AtomicReference(null),
val next: AtomicReference<Node<T>?> = AtomicReference(null),
val type: NodeType,
var continuation: Continuation<Status>? = null
) {
fun isSender() = this.type == NodeType.SENDER
fun isReceiver() = this.type == NodeType.RECEIVER
}
private val head: AtomicReference<Node<E>>
private val tail: AtomicReference<Node<E>>
init {
val dummy: Node<E> = Node(type = NodeType.SENDER)
head = AtomicReference(dummy)
tail = AtomicReference(dummy)
}
override suspend fun send(element: E) {
val node: Node<E> = Node(value = AtomicReference(element), type = NodeType.SENDER)
loop@ while (true) {
val h: Node<E> = head.get()
val t: Node<E> = tail.get()
if (h == t || t.isSender()) {
when (coroutineResult(node, t)) {
Status.RETRY -> continue@loop
Status.OK -> return
}
} else {
val hNext: Node<E> = h.next.get() ?: continue
if (t != tail.get() || h != head.get() || h == t) {
continue
}
if (hNext.continuation != null && head.compareAndSet(h, hNext)) {
hNext.value.compareAndSet(null, element)
hNext.continuation!!.resume(Status.OK)
return
}
}
}
}
override suspend fun receive(): E {
val node: Node<E> = Node(type = NodeType.RECEIVER)
loop@ while (true) {
val h: Node<E> = head.get()
val t: Node<E> = tail.get()
if (h == t || t.isReceiver()) {
when (coroutineResult(node, t)) {
Status.RETRY -> continue@loop
Status.OK -> return node.value.get()!!
}
} else {
val hNext: Node<E> = h.next.get() ?: continue
if (t != tail.get() || h != head.get() || h == t) {
continue
}
val value = hNext.value.get() ?: continue
if (hNext.continuation != null && head.compareAndSet(h, hNext)) {
hNext.value.compareAndSet(value, null)
hNext.continuation!!.resume(Status.OK)
return value
}
}
}
}
suspend fun coroutineResult(node: Node<E>, t: Node<E>): Status {
return suspendCoroutine<Status> sc@{
node.continuation = it
if (t.next.compareAndSet(null, node)) {
tail.compareAndSet(t, node)
} else {
it.resume(Status.RETRY)
return@sc
}
}
}
}