1 module hunt.io.SimpleQueue; 2 3 import hunt.Exceptions; 4 import hunt.logging; 5 6 import core.thread; 7 import core.sync.semaphore : Semaphore; 8 import core.sync.condition; 9 import core.sync.mutex : Mutex; 10 import core.atomic; 11 12 import std.datetime; 13 14 // ported from https://github.com/qznc/d-queues 15 16 /** Implementations based on the paper 17 "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms" 18 by Maged and Michael. "*/ 19 20 /** Basic interface for all queues implemented here. 21 Is an input and output range. 22 */ 23 interface Queue(T) { 24 /** Atomically put one element into the queue. */ 25 void enqueue(T t); 26 27 /** Atomically take one element from the queue. 28 Wait blocking or spinning. */ 29 T dequeue(); 30 31 /** 32 If at least one element is in the queue, 33 atomically take one element from the queue 34 store it into e, and return true. 35 Otherwise return false; */ 36 bool tryDequeue(out T e); 37 } 38 39 private class QueueNode(T) { 40 QueueNode!T nxt; 41 T value; 42 43 this() {} 44 45 this(T value) { 46 this.value = value; 47 } 48 } 49 50 /** blocking multi-producer multi-consumer queue */ 51 class BlockingQueue(T) : Queue!T { 52 private QueueNode!T head; 53 private QueueNode!T tail; 54 private Mutex head_lock; 55 private Mutex tail_lock; 56 private shared bool isWaking = false; 57 58 /** Wait queue for waiting takes */ 59 private Condition notEmpty; 60 private Duration _timeout; 61 62 this(Duration timeout = 5.seconds) { 63 auto n = new QueueNode!T(); 64 this.head = this.tail = n; 65 this.head_lock = new Mutex(); 66 this.tail_lock = new Mutex(); 67 notEmpty = new Condition(head_lock); 68 _timeout = timeout; 69 } 70 71 void enqueue(T t) { 72 auto end = new QueueNode!T(); 73 this.tail_lock.lock(); 74 scope (exit) 75 this.tail_lock.unlock(); 76 auto tl = this.tail; 77 this.tail = end; 78 tl.value = t; 79 atomicFence(); 80 tl.nxt = end; // accessible to dequeue 81 notEmpty.notify(); 82 } 83 84 T dequeue() { 85 this.head_lock.lock(); 86 scope (exit) 87 this.head_lock.unlock(); 88 while (true) { // FIXME non-blocking! 89 auto hd = this.head; 90 auto scnd = hd.nxt; 91 if (scnd !is null) { 92 this.head = scnd; 93 return hd.value; 94 } else { 95 if(isWaking) 96 return T.init; 97 bool r = notEmpty.wait(_timeout); 98 if(!r) return T.init; 99 } 100 } 101 assert(0); 102 } 103 104 bool tryDequeue(out T e) { 105 this.head_lock.lock(); 106 scope (exit) 107 this.head_lock.unlock(); 108 auto hd = this.head; 109 auto scnd = hd.nxt; 110 if (scnd !is null) { 111 this.head = scnd; 112 e = hd.value; 113 return true; 114 } 115 return false; 116 } 117 118 bool isEmpty() { 119 return this.head.nxt is null; 120 } 121 122 void clear() { 123 this.head_lock.lock(); 124 scope (exit) 125 this.head_lock.unlock(); 126 127 auto n = new QueueNode!T(); 128 this.head = this.tail = n; 129 } 130 131 void wakeup() { 132 if(cas(&isWaking, false, true)) 133 notEmpty.notify(); 134 } 135 } 136 137 /** non-blocking multi-producer multi-consumer queue */ 138 class NonBlockingQueue(T) : Queue!T { 139 private shared(QueueNode!T) head; 140 private shared(QueueNode!T) tail; 141 private shared bool isWaking = false; 142 143 this() { 144 shared n = new QueueNode!T(); 145 this.head = this.tail = n; 146 } 147 148 void enqueue(T t) { 149 shared end = new QueueNode!T(); 150 end.value = cast(shared)t; 151 while (true) { 152 auto tl = tail; 153 auto cur = tl.nxt; 154 if (cur !is null) { 155 // obsolete tail, try update 156 cas(&this.tail, tl, cur); 157 continue; 158 } 159 160 shared(QueueNode!T) dummy = null; 161 if (cas(&tl.nxt, dummy, end)) { 162 // successfull enqueued new end node 163 break; 164 } 165 } 166 } 167 168 T dequeue() nothrow { 169 T e = void; 170 while (!tryDequeue(e)) { 171 Thread.yield(); 172 } 173 // tryDequeue(e); 174 return e; 175 } 176 177 bool tryDequeue(out T e) nothrow { 178 auto dummy = this.head; 179 auto tl = this.tail; 180 auto nxt = dummy.nxt; 181 182 if(nxt is null) 183 return false; 184 185 if (cas(&this.head, dummy, nxt)) { 186 e = cast(T)nxt.value; 187 return true; 188 } 189 190 return tryDequeue(e); 191 } 192 193 bool isEmpty() nothrow { 194 return this.head.nxt is null; 195 } 196 197 void clear() { 198 shared n = new QueueNode!T(); 199 this.head = this.tail = n; 200 } 201 } 202 203 204 /** 205 * 206 */ 207 class SimpleQueue(T) : Queue!T { 208 private QueueNode!T head; 209 private QueueNode!T tail; 210 211 this() { 212 auto n = new QueueNode!T(); 213 this.head = this.tail = n; 214 } 215 216 void enqueue(T t) { 217 auto end = new QueueNode!T(t); 218 219 auto tl = this.tail; 220 this.tail = end; 221 tl.nxt = end; // acces 222 } 223 224 T dequeue() { 225 T e = void; 226 while (!tryDequeue(e)) { 227 Thread.yield(); 228 version(HUNT_DANGER_DEBUG) warning("Running here"); 229 } 230 return e; 231 } 232 233 T dequeue(Duration timeout) { 234 T e = void; 235 auto start = Clock.currTime; 236 bool r = tryDequeue(e); 237 while (!r && Clock.currTime < start + timeout) { 238 debug { 239 Duration dur = Clock.currTime - start; 240 if(dur > 15.seconds) { 241 warningf("There is no element available in %s", dur); 242 } 243 } 244 Thread.yield(); 245 r = tryDequeue(e); 246 } 247 248 if (!r) { 249 throw new TimeoutException("Timeout in " ~ timeout.toString()); 250 } 251 return e; 252 } 253 254 255 bool tryDequeue(out T e) { 256 auto nxt = this.head.nxt; 257 if(nxt is null) 258 return false; 259 260 this.head = nxt; 261 e = cast(T)nxt.value; 262 return true; 263 } 264 265 266 bool isEmpty() { 267 return this.head.nxt is null; 268 } 269 270 void clear() { 271 auto n = new QueueNode!T(); 272 this.head = this.tail = n; 273 } 274 }