1 module hunt.io.SimpleQueue;
2 
3 import hunt.Exceptions;
4 import hunt.logging.ConsoleLogger;
5 
6 import core.thread;
7 import core.sync.semaphore : Semaphore;
8 import core.sync.condition;
9 import core.sync.mutex : Mutex;
10 import core.atomic;
11 
12 import std.datetime;
13 
14 // ported from https://github.com/qznc/d-queues
15 
16 /** Implementations based on the paper
17     "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms"
18     by Maged and Michael. "*/
19 
20 /** Basic interface for all queues implemented here.
21     Is an input and output range. 
22 */
23 interface Queue(T) {
24     /** Atomically put one element into the queue. */
25     void enqueue(T t);
26 
27     /** Atomically take one element from the queue.
28       Wait blocking or spinning. */
29     T dequeue();
30 
31     /**
32       If at least one element is in the queue,
33       atomically take one element from the queue
34       store it into e, and return true.
35       Otherwise return false; */
36     bool tryDequeue(out T e);
37 }
38 
39 private class QueueNode(T) {
40     QueueNode!T nxt;
41     T value;
42 
43     this() {} 
44 
45     this(T value) {
46         this.value = value;
47     }
48 }
49 
50 /** blocking multi-producer multi-consumer queue  */
51 class BlockingQueue(T) : Queue!T {
52     private QueueNode!T head;
53     private QueueNode!T tail;
54     private Mutex head_lock;
55     private Mutex tail_lock;
56     private shared bool isWaking = false;
57 
58     /** Wait queue for waiting takes */
59     private Condition notEmpty;
60     private Duration _timeout;
61 
62     this(Duration timeout = 5.seconds) {
63         auto n = new QueueNode!T();
64         this.head = this.tail = n;
65         this.head_lock = new Mutex();
66         this.tail_lock = new Mutex();
67         notEmpty = new Condition(head_lock);
68         _timeout = timeout;
69     }
70 
71     void enqueue(T t) {
72         auto end = new QueueNode!T();
73         this.tail_lock.lock();
74         scope (exit)
75             this.tail_lock.unlock();
76         auto tl = this.tail;
77         this.tail = end;
78         tl.value = t;
79         atomicFence();
80         tl.nxt = end; // accessible to dequeue
81         notEmpty.notify();
82     }
83 
84     T dequeue() {
85         this.head_lock.lock();
86         scope (exit)
87             this.head_lock.unlock();
88         while (true) { // FIXME non-blocking!
89             auto hd = this.head;
90             auto scnd = hd.nxt;
91             if (scnd !is null) {
92                 this.head = scnd;
93                 return hd.value;
94             } else {
95                 if(isWaking)
96                     return T.init;
97                 bool r = notEmpty.wait(_timeout);
98                 if(!r) return T.init;
99             }
100         }
101         assert(0);
102     }
103 
104     bool tryDequeue(out T e) {
105         this.head_lock.lock();
106         scope (exit)
107             this.head_lock.unlock();
108         auto hd = this.head;
109         auto scnd = hd.nxt;
110         if (scnd !is null) {
111             this.head = scnd;
112             e = hd.value;
113             return true;
114         }
115         return false;
116     }
117 
118     bool isEmpty() {
119         return this.head.nxt is null;
120     }
121 
122     void clear() {
123         this.head_lock.lock();
124         scope (exit)
125             this.head_lock.unlock();
126         
127         auto n = new QueueNode!T();
128         this.head = this.tail = n;
129     }
130 
131     void wakeup() {
132         if(cas(&isWaking, false, true))
133             notEmpty.notify();
134     }
135 }
136 
137 /** non-blocking multi-producer multi-consumer queue  */
138 class NonBlockingQueue(T) : Queue!T {
139     private shared(QueueNode!T) head;
140     private shared(QueueNode!T) tail;
141     private shared bool isWaking = false;
142 
143     this() {
144         shared n = new QueueNode!T();
145         this.head = this.tail = n;
146     }
147 
148     void enqueue(T t) {
149         shared end = new QueueNode!T();
150         end.value = cast(shared)t;
151         while (true) {
152             auto tl = tail;
153             auto cur = tl.nxt;
154             if (cur !is null) {
155                 // obsolete tail, try update
156                 cas(&this.tail, tl, cur);
157                 continue;
158             }
159 
160             shared(QueueNode!T) dummy = null;
161             if (cas(&tl.nxt, dummy, end)) {
162                 // successfull enqueued new end node
163                 break;
164             }
165         }
166     }
167 
168     T dequeue() nothrow {
169         T e = void;
170         while (!tryDequeue(e)) {
171             Thread.yield();
172         }
173         // tryDequeue(e);
174         return e;
175     }
176 
177     bool tryDequeue(out T e) nothrow {
178         auto dummy = this.head;
179         auto tl = this.tail;
180         auto nxt = dummy.nxt;
181 
182         if(nxt is null)
183             return false;
184         
185         if (cas(&this.head, dummy, nxt)) {
186             e = cast(T)nxt.value;
187             return true;
188         }
189         
190         return tryDequeue(e);
191     }
192 
193     bool isEmpty() nothrow {
194         return this.head.nxt is null;
195     }
196 
197     void clear() {        
198         shared n = new QueueNode!T();
199         this.head = this.tail = n;
200     }
201 }
202 
203 
204 /**
205  * 
206  */
207 class SimpleQueue(T) : Queue!T {
208     private QueueNode!T head;
209     private QueueNode!T tail;
210 
211     this() {
212         auto n = new QueueNode!T();
213         this.head = this.tail = n;
214     }
215 
216     void enqueue(T t) {
217         auto end = new QueueNode!T(t);
218         
219         auto tl = this.tail;
220         this.tail = end;
221         tl.nxt = end; // acces
222     }
223 
224     T dequeue() {
225         T e = void;
226         while (!tryDequeue(e)) {
227             Thread.yield();
228             version(HUNT_DANGER_DEBUG) warning("Running here");
229         }
230         return e;
231     }
232 
233     T dequeue(Duration timeout) {
234         T e = void;
235         auto start = Clock.currTime;
236         bool r = tryDequeue(e);
237         while (!r && Clock.currTime < start + timeout) {
238             debug {
239                 Duration dur = Clock.currTime - start;
240                 if(dur > timeout) {
241                     warningf("There is no element available in %s", dur);
242                 }
243             }
244             Thread.yield();
245             r = tryDequeue(e);
246         }
247 
248         if (!r) {
249             throw new TimeoutException("Timeout in " ~ timeout.toString());
250         }
251         return e;
252     }
253 
254 
255     bool tryDequeue(out T e) {
256         auto nxt = this.head.nxt;
257         if(nxt is null)
258             return false;
259         
260         this.head = nxt;
261         e = cast(T)nxt.value;
262         return true;
263     }
264 
265 
266     bool isEmpty() {
267         return this.head.nxt is null;
268     }
269 
270     void clear() {        
271         auto n = new QueueNode!T();
272         this.head = this.tail = n;
273     }
274 }