1 module hunt.util.queue.SimpleQueue; 2 3 import hunt.logging.ConsoleLogger; 4 import hunt.util.queue.Queue; 5 6 import core.atomic; 7 import core.sync.condition; 8 import core.sync.mutex; 9 import core.time; 10 import core.thread; 11 12 import std.container.dlist; 13 14 15 /** 16 * It's a thread-safe queue 17 */ 18 class SimpleQueue(T) : Queue!(T) { 19 private DList!T _list; 20 private Mutex _headLock; 21 private Duration _timeout; 22 private bool _isWaiting = false; 23 24 shared int _incomings = 0; 25 shared int _outgoings = 0; 26 27 /** Wait queue for waiting takes */ 28 private Condition _notEmpty; 29 30 this(Duration timeout = 10.seconds) { 31 _timeout = timeout; 32 _headLock = new Mutex(); 33 _notEmpty = new Condition(_headLock); 34 } 35 36 override bool isEmpty() { 37 _headLock.lock(); 38 scope (exit) 39 _headLock.unlock(); 40 41 return _list.empty(); 42 } 43 44 override T pop() { 45 _headLock.lock(); 46 scope (exit) { 47 _headLock.unlock(); 48 } 49 50 if(isEmpty()) { 51 _isWaiting = true; 52 bool v = _notEmpty.wait(_timeout); 53 _isWaiting = false; 54 if(!v) { 55 version (HUNT_IO_DEBUG) { 56 tracef("Timeout in %s.", _timeout); 57 } 58 return T.init; 59 } 60 } 61 62 T item = _list.front(); 63 _list.removeFront(); 64 65 return item; 66 } 67 68 override void push(T item) { 69 _headLock.lock(); 70 scope (exit) 71 _headLock.unlock(); 72 73 _list.insert(item); 74 75 if(_isWaiting) { 76 _notEmpty.notify(); 77 } 78 } 79 }