1 module hunt.concurrency.SimpleQueue; 2 3 import hunt.Exceptions; 4 import hunt.logging.ConsoleLogger; 5 6 import core.thread; 7 import core.sync.semaphore : Semaphore; 8 import core.sync.condition; 9 import core.sync.mutex : Mutex; 10 import core.atomic; 11 12 import std.datetime; 13 14 // ported from https://github.com/qznc/d-queues 15 16 /** Implementations based on the paper 17 "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms" 18 by Maged and Michael. "*/ 19 20 /** Basic interface for all queues implemented here. 21 Is an input and output range. 22 */ 23 interface Queue(T) { 24 /** Atomically put one element into the queue. */ 25 void enqueue(T t); 26 27 /** Atomically take one element from the queue. 28 Wait blocking or spinning. */ 29 T dequeue(); 30 31 /** 32 If at least one element is in the queue, 33 atomically take one element from the queue 34 store it into e, and return true. 35 Otherwise return false; */ 36 bool tryDequeue(out T e); 37 } 38 39 private class QueueNode(T) { 40 QueueNode!T nxt; 41 T value; 42 43 this() {} 44 45 this(T value) { 46 this.value = value; 47 } 48 } 49 50 /** blocking multi-producer multi-consumer queue */ 51 class BlockingQueue(T) : Queue!T { 52 private QueueNode!T head; 53 private QueueNode!T tail; 54 private Mutex head_lock; 55 private Mutex tail_lock; 56 private shared bool isWaking = false; 57 58 /** Wait queue for waiting takes */ 59 private Condition notEmpty; 60 61 this() { 62 auto n = new QueueNode!T(); 63 this.head = this.tail = n; 64 this.head_lock = new Mutex(); 65 this.tail_lock = new Mutex(); 66 notEmpty = new Condition(head_lock); 67 } 68 69 void enqueue(T t) { 70 auto end = new QueueNode!T(); 71 this.tail_lock.lock(); 72 scope (exit) 73 this.tail_lock.unlock(); 74 auto tl = this.tail; 75 this.tail = end; 76 tl.value = t; 77 atomicFence(); 78 tl.nxt = end; // accessible to dequeue 79 notEmpty.notify(); 80 } 81 82 T dequeue() { 83 this.head_lock.lock(); 84 scope (exit) 85 this.head_lock.unlock(); 86 while (true) { // FIXME non-blocking! 87 auto hd = this.head; 88 auto scnd = hd.nxt; 89 if (scnd !is null) { 90 this.head = scnd; 91 return hd.value; 92 } else { 93 if(isWaking) 94 return T.init; 95 notEmpty.wait(); 96 } 97 } 98 assert(0); 99 } 100 101 bool tryDequeue(out T e) { 102 this.head_lock.lock(); 103 scope (exit) 104 this.head_lock.unlock(); 105 auto hd = this.head; 106 auto scnd = hd.nxt; 107 if (scnd !is null) { 108 this.head = scnd; 109 e = hd.value; 110 return true; 111 } 112 return false; 113 } 114 115 bool isEmpty() { 116 return this.head.nxt is null; 117 } 118 119 void clear() { 120 this.head_lock.lock(); 121 scope (exit) 122 this.head_lock.unlock(); 123 124 auto n = new QueueNode!T(); 125 this.head = this.tail = n; 126 } 127 128 void wakeup() { 129 if(cas(&isWaking, false, true)) 130 notEmpty.notify(); 131 } 132 } 133 134 /** non-blocking multi-producer multi-consumer queue */ 135 class NonBlockingQueue(T) : Queue!T { 136 private shared(QueueNode!T) head; 137 private shared(QueueNode!T) tail; 138 private shared bool isWaking = false; 139 140 this() { 141 shared n = new QueueNode!T(); 142 this.head = this.tail = n; 143 } 144 145 void enqueue(T t) { 146 shared end = new QueueNode!T(); 147 end.value = cast(shared)t; 148 while (true) { 149 auto tl = tail; 150 auto cur = tl.nxt; 151 if (cur !is null) { 152 // obsolete tail, try update 153 cas(&this.tail, tl, cur); 154 continue; 155 } 156 157 shared(QueueNode!T) dummy = null; 158 if (cas(&tl.nxt, dummy, end)) { 159 // successfull enqueued new end node 160 break; 161 } 162 } 163 } 164 165 T dequeue() nothrow { 166 T e = void; 167 while (!tryDequeue(e)) { 168 Thread.yield(); 169 } 170 // tryDequeue(e); 171 return e; 172 } 173 174 bool tryDequeue(out T e) nothrow { 175 auto dummy = this.head; 176 auto tl = this.tail; 177 auto nxt = dummy.nxt; 178 179 if(nxt is null) 180 return false; 181 182 if (cas(&this.head, dummy, nxt)) { 183 e = cast(T)nxt.value; 184 return true; 185 } 186 187 return tryDequeue(e); 188 } 189 190 bool isEmpty() nothrow { 191 return this.head.nxt is null; 192 } 193 194 void clear() { 195 shared n = new QueueNode!T(); 196 this.head = this.tail = n; 197 } 198 } 199 200 201 /** 202 * 203 */ 204 class SimpleQueue(T) : Queue!T { 205 private QueueNode!T head; 206 private QueueNode!T tail; 207 208 this() { 209 auto n = new QueueNode!T(); 210 this.head = this.tail = n; 211 } 212 213 void enqueue(T t) { 214 auto end = new QueueNode!T(t); 215 216 auto tl = this.tail; 217 this.tail = end; 218 tl.nxt = end; // acces 219 } 220 221 T dequeue() { 222 T e = void; 223 while (!tryDequeue(e)) { 224 Thread.yield(); 225 version(HUNT_DANGER_DEBUG) warning("Running here"); 226 } 227 return e; 228 } 229 230 T dequeue(Duration timeout) { 231 T e = void; 232 auto start = Clock.currTime; 233 bool r = tryDequeue(e); 234 while (!r && Clock.currTime < start + timeout) { 235 debug { 236 Duration dur = Clock.currTime - start; 237 if(dur > 15.seconds) { 238 warningf("There is no element available in %s", dur); 239 } 240 } 241 Thread.yield(); 242 r = tryDequeue(e); 243 } 244 245 if (!r) { 246 throw new TimeoutException("Timeout in " ~ timeout.toString()); 247 } 248 return e; 249 } 250 251 252 bool tryDequeue(out T e) { 253 auto nxt = this.head.nxt; 254 if(nxt is null) 255 return false; 256 257 this.head = nxt; 258 e = cast(T)nxt.value; 259 return true; 260 } 261 262 263 bool isEmpty() { 264 return this.head.nxt is null; 265 } 266 267 void clear() { 268 auto n = new QueueNode!T(); 269 this.head = this.tail = n; 270 } 271 }