1 module hunt.concurrency.SimpleQueue;
2 
3 import hunt.Exceptions;
4 import hunt.logging.ConsoleLogger;
5 
6 import core.thread;
7 import core.sync.semaphore : Semaphore;
8 import core.sync.condition;
9 import core.sync.mutex : Mutex;
10 import core.atomic;
11 
12 import std.datetime;
13 
14 // ported from https://github.com/qznc/d-queues
15 
16 /** Implementations based on the paper
17     "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms"
18     by Maged and Michael. "*/
19 
20 /** Basic interface for all queues implemented here.
21     Is an input and output range. 
22 */
23 interface Queue(T) {
24     /** Atomically put one element into the queue. */
25     void enqueue(T t);
26 
27     /** Atomically take one element from the queue.
28       Wait blocking or spinning. */
29     T dequeue();
30 
31     /**
32       If at least one element is in the queue,
33       atomically take one element from the queue
34       store it into e, and return true.
35       Otherwise return false; */
36     bool tryDequeue(out T e);
37 }
38 
39 private class QueueNode(T) {
40     QueueNode!T nxt;
41     T value;
42 
43     this() {} 
44 
45     this(T value) {
46         this.value = value;
47     }
48 }
49 
50 /** blocking multi-producer multi-consumer queue  */
51 class BlockingQueue(T) : Queue!T {
52     private QueueNode!T head;
53     private QueueNode!T tail;
54     private Mutex head_lock;
55     private Mutex tail_lock;
56     private shared bool isWaking = false;
57 
58     /** Wait queue for waiting takes */
59     private Condition notEmpty;
60 
61     this() {
62         auto n = new QueueNode!T();
63         this.head = this.tail = n;
64         this.head_lock = new Mutex();
65         this.tail_lock = new Mutex();
66         notEmpty = new Condition(head_lock);
67     }
68 
69     void enqueue(T t) {
70         auto end = new QueueNode!T();
71         this.tail_lock.lock();
72         scope (exit)
73             this.tail_lock.unlock();
74         auto tl = this.tail;
75         this.tail = end;
76         tl.value = t;
77         atomicFence();
78         tl.nxt = end; // accessible to dequeue
79         notEmpty.notify();
80     }
81 
82     T dequeue() {
83         this.head_lock.lock();
84         scope (exit)
85             this.head_lock.unlock();
86         while (true) { // FIXME non-blocking!
87             auto hd = this.head;
88             auto scnd = hd.nxt;
89             if (scnd !is null) {
90                 this.head = scnd;
91                 return hd.value;
92             } else {
93                 if(isWaking)
94                     return T.init;
95                 notEmpty.wait();
96             }
97         }
98         assert(0);
99     }
100 
101     bool tryDequeue(out T e) {
102         this.head_lock.lock();
103         scope (exit)
104             this.head_lock.unlock();
105         auto hd = this.head;
106         auto scnd = hd.nxt;
107         if (scnd !is null) {
108             this.head = scnd;
109             e = hd.value;
110             return true;
111         }
112         return false;
113     }
114 
115     bool isEmpty() {
116         return this.head.nxt is null;
117     }
118 
119     void clear() {
120         this.head_lock.lock();
121         scope (exit)
122             this.head_lock.unlock();
123         
124         auto n = new QueueNode!T();
125         this.head = this.tail = n;
126     }
127 
128     void wakeup() {
129         if(cas(&isWaking, false, true))
130             notEmpty.notify();
131     }
132 }
133 
134 /** non-blocking multi-producer multi-consumer queue  */
135 class NonBlockingQueue(T) : Queue!T {
136     private shared(QueueNode!T) head;
137     private shared(QueueNode!T) tail;
138     private shared bool isWaking = false;
139 
140     this() {
141         shared n = new QueueNode!T();
142         this.head = this.tail = n;
143     }
144 
145     void enqueue(T t) {
146         shared end = new QueueNode!T();
147         end.value = cast(shared)t;
148         while (true) {
149             auto tl = tail;
150             auto cur = tl.nxt;
151             if (cur !is null) {
152                 // obsolete tail, try update
153                 cas(&this.tail, tl, cur);
154                 continue;
155             }
156 
157             shared(QueueNode!T) dummy = null;
158             if (cas(&tl.nxt, dummy, end)) {
159                 // successfull enqueued new end node
160                 break;
161             }
162         }
163     }
164 
165     T dequeue() nothrow {
166         T e = void;
167         while (!tryDequeue(e)) {
168             Thread.yield();
169         }
170         // tryDequeue(e);
171         return e;
172     }
173 
174     bool tryDequeue(out T e) nothrow {
175         auto dummy = this.head;
176         auto tl = this.tail;
177         auto nxt = dummy.nxt;
178 
179         if(nxt is null)
180             return false;
181         
182         if (cas(&this.head, dummy, nxt)) {
183             e = cast(T)nxt.value;
184             return true;
185         }
186         
187         return tryDequeue(e);
188     }
189 
190     bool isEmpty() nothrow {
191         return this.head.nxt is null;
192     }
193 
194     void clear() {        
195         shared n = new QueueNode!T();
196         this.head = this.tail = n;
197     }
198 }
199 
200 
201 /**
202  * 
203  */
204 class SimpleQueue(T) : Queue!T {
205     private QueueNode!T head;
206     private QueueNode!T tail;
207 
208     this() {
209         auto n = new QueueNode!T();
210         this.head = this.tail = n;
211     }
212 
213     void enqueue(T t) {
214         auto end = new QueueNode!T(t);
215         
216         auto tl = this.tail;
217         this.tail = end;
218         tl.nxt = end; // acces
219     }
220 
221     T dequeue() {
222         T e = void;
223         while (!tryDequeue(e)) {
224             Thread.yield();
225             version(HUNT_DANGER_DEBUG) warning("Running here");
226         }
227         return e;
228     }
229 
230     T dequeue(Duration timeout) {
231         T e = void;
232         auto start = Clock.currTime;
233         bool r = tryDequeue(e);
234         while (!r && Clock.currTime < start + timeout) {
235             debug {
236                 Duration dur = Clock.currTime - start;
237                 if(dur > 15.seconds) {
238                     warningf("There is no element available in %s", dur);
239                 }
240             }
241             Thread.yield();
242             r = tryDequeue(e);
243         }
244 
245         if (!r) {
246             throw new TimeoutException("Timeout in " ~ timeout.toString());
247         }
248         return e;
249     }
250 
251 
252     bool tryDequeue(out T e) {
253         auto nxt = this.head.nxt;
254         if(nxt is null)
255             return false;
256         
257         this.head = nxt;
258         e = cast(T)nxt.value;
259         return true;
260     }
261 
262 
263     bool isEmpty() {
264         return this.head.nxt is null;
265     }
266 
267     void clear() {        
268         auto n = new QueueNode!T();
269         this.head = this.tail = n;
270     }
271 }