1 module hunt.util.queue.SimpleQueue; 2 3 import hunt.logging; 4 import hunt.util.queue.Queue; 5 6 import core.atomic; 7 import core.sync.condition; 8 import core.sync.mutex; 9 import core.time; 10 import core.thread; 11 12 import std.container.dlist; 13 14 15 /** 16 * It's a thread-safe queue 17 */ 18 class SimpleQueue(T) : Queue!(T) { 19 private DList!T _list; 20 private Mutex _headLock; 21 private Duration _timeout; 22 private bool _isWaiting = false; 23 24 shared int _incomings = 0; 25 shared int _outgoings = 0; 26 27 /** Wait queue for waiting takes */ 28 private Condition _notEmpty; 29 30 this(Duration timeout = 10.seconds) { 31 _timeout = timeout; 32 _headLock = new Mutex(); 33 _notEmpty = new Condition(_headLock); 34 } 35 36 override bool isEmpty() { 37 _headLock.lock(); 38 scope (exit) 39 _headLock.unlock(); 40 41 return _list.empty(); 42 } 43 44 override T pop() { 45 _headLock.lock(); 46 scope (exit) { 47 _headLock.unlock(); 48 } 49 50 if(isEmpty()) { 51 _isWaiting = true; 52 bool v = _notEmpty.wait(_timeout); 53 _isWaiting = false; 54 if(!v) { 55 version (HUNT_IO_DEBUG) { 56 infof("Timeout in %s.", _timeout); 57 } 58 return T.init; 59 } 60 } 61 62 if(_list.empty()) 63 return T.init; 64 65 T item = _list.front(); 66 _list.removeFront(); 67 68 return item; 69 } 70 71 override void push(T item) { 72 _headLock.lock(); 73 scope (exit) 74 _headLock.unlock(); 75 76 _list.insert(item); 77 78 if(_isWaiting) { 79 _notEmpty.notify(); 80 } 81 } 82 83 override void clear() { 84 _headLock.lock(); 85 scope (exit) 86 _headLock.unlock(); 87 88 _list.clear(); 89 _notEmpty.notify(); 90 } 91 }