1 module hunt.io.SimpleQueue;
2
3 import hunt.Exceptions;
4 import hunt.logging;
5
6 import core.thread;
7 import core.sync.semaphore : Semaphore;
8 import core.sync.condition;
9 import core.sync.mutex : Mutex;
10 import core.atomic;
11
12 import std.datetime;
13
14 // ported from https://github.com/qznc/d-queues
15
16 /** Implementations based on the paper
17 "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms"
18 by Maged and Michael. "*/
19
20 /** Basic interface for all queues implemented here.
21 Is an input and output range.
22 */
23 interface Queue(T) {
24 /** Atomically put one element into the queue. */
25 void enqueue(T t);
26
27 /** Atomically take one element from the queue.
28 Wait blocking or spinning. */
29 T dequeue();
30
31 /**
32 If at least one element is in the queue,
33 atomically take one element from the queue
34 store it into e, and return true.
35 Otherwise return false; */
36 bool tryDequeue(out T e);
37 }
38
39 private class QueueNode(T) {
40 QueueNode!T nxt;
41 T value;
42
43 this() {}
44
45 this(T value) {
46 this.value = value;
47 }
48 }
49
50 /** blocking multi-producer multi-consumer queue */
51 class BlockingQueue(T) : Queue!T {
52 private QueueNode!T head;
53 private QueueNode!T tail;
54 private Mutex head_lock;
55 private Mutex tail_lock;
56 private shared bool isWaking = false;
57
58 /** Wait queue for waiting takes */
59 private Condition notEmpty;
60 private Duration _timeout;
61
62 this(Duration timeout = 5.seconds) {
63 auto n = new QueueNode!T();
64 this.head = this.tail = n;
65 this.head_lock = new Mutex();
66 this.tail_lock = new Mutex();
67 notEmpty = new Condition(head_lock);
68 _timeout = timeout;
69 }
70
71 void enqueue(T t) {
72 auto end = new QueueNode!T();
73 this.tail_lock.lock();
74 scope (exit)
75 this.tail_lock.unlock();
76 auto tl = this.tail;
77 this.tail = end;
78 tl.value = t;
79 atomicFence();
80 tl.nxt = end; // accessible to dequeue
81 notEmpty.notify();
82 }
83
84 T dequeue() {
85 this.head_lock.lock();
86 scope (exit)
87 this.head_lock.unlock();
88 while (true) { // FIXME non-blocking!
89 auto hd = this.head;
90 auto scnd = hd.nxt;
91 if (scnd !is null) {
92 this.head = scnd;
93 return hd.value;
94 } else {
95 if(isWaking)
96 return T.init;
97 bool r = notEmpty.wait(_timeout);
98 if(!r) return T.init;
99 }
100 }
101 assert(0);
102 }
103
104 bool tryDequeue(out T e) {
105 this.head_lock.lock();
106 scope (exit)
107 this.head_lock.unlock();
108 auto hd = this.head;
109 auto scnd = hd.nxt;
110 if (scnd !is null) {
111 this.head = scnd;
112 e = hd.value;
113 return true;
114 }
115 return false;
116 }
117
118 bool isEmpty() {
119 return this.head.nxt is null;
120 }
121
122 void clear() {
123 this.head_lock.lock();
124 scope (exit)
125 this.head_lock.unlock();
126
127 auto n = new QueueNode!T();
128 this.head = this.tail = n;
129 }
130
131 void wakeup() {
132 if(cas(&isWaking, false, true))
133 notEmpty.notify();
134 }
135 }
136
137 /** non-blocking multi-producer multi-consumer queue */
138 class NonBlockingQueue(T) : Queue!T {
139 private shared(QueueNode!T) head;
140 private shared(QueueNode!T) tail;
141 private shared bool isWaking = false;
142
143 this() {
144 shared n = new QueueNode!T();
145 this.head = this.tail = n;
146 }
147
148 void enqueue(T t) {
149 shared end = new QueueNode!T();
150 end.value = cast(shared)t;
151 while (true) {
152 auto tl = tail;
153 auto cur = tl.nxt;
154 if (cur !is null) {
155 // obsolete tail, try update
156 cas(&this.tail, tl, cur);
157 continue;
158 }
159
160 shared(QueueNode!T) dummy = null;
161 if (cas(&tl.nxt, dummy, end)) {
162 // successfull enqueued new end node
163 break;
164 }
165 }
166 }
167
168 T dequeue() nothrow {
169 T e = void;
170 while (!tryDequeue(e)) {
171 Thread.yield();
172 }
173 // tryDequeue(e);
174 return e;
175 }
176
177 bool tryDequeue(out T e) nothrow {
178 auto dummy = this.head;
179 auto tl = this.tail;
180 auto nxt = dummy.nxt;
181
182 if(nxt is null)
183 return false;
184
185 if (cas(&this.head, dummy, nxt)) {
186 e = cast(T)nxt.value;
187 return true;
188 }
189
190 return tryDequeue(e);
191 }
192
193 bool isEmpty() nothrow {
194 return this.head.nxt is null;
195 }
196
197 void clear() {
198 shared n = new QueueNode!T();
199 this.head = this.tail = n;
200 }
201 }
202
203
204 /**
205 *
206 */
207 class SimpleQueue(T) : Queue!T {
208 private QueueNode!T head;
209 private QueueNode!T tail;
210
211 this() {
212 auto n = new QueueNode!T();
213 this.head = this.tail = n;
214 }
215
216 void enqueue(T t) {
217 auto end = new QueueNode!T(t);
218
219 auto tl = this.tail;
220 this.tail = end;
221 tl.nxt = end; // acces
222 }
223
224 T dequeue() {
225 T e = void;
226 while (!tryDequeue(e)) {
227 Thread.yield();
228 version(HUNT_DANGER_DEBUG) warning("Running here");
229 }
230 return e;
231 }
232
233 T dequeue(Duration timeout) {
234 T e = void;
235 auto start = Clock.currTime;
236 bool r = tryDequeue(e);
237 while (!r && Clock.currTime < start + timeout) {
238 debug {
239 Duration dur = Clock.currTime - start;
240 if(dur > 15.seconds) {
241 warningf("There is no element available in %s", dur);
242 }
243 }
244 Thread.yield();
245 r = tryDequeue(e);
246 }
247
248 if (!r) {
249 throw new TimeoutException("Timeout in " ~ timeout.toString());
250 }
251 return e;
252 }
253
254
255 bool tryDequeue(out T e) {
256 auto nxt = this.head.nxt;
257 if(nxt is null)
258 return false;
259
260 this.head = nxt;
261 e = cast(T)nxt.value;
262 return true;
263 }
264
265
266 bool isEmpty() {
267 return this.head.nxt is null;
268 }
269
270 void clear() {
271 auto n = new QueueNode!T();
272 this.head = this.tail = n;
273 }
274 }