1 module hunt.util.queue.SimpleQueue;
2
3 import hunt.logging;
4 import hunt.util.queue.Queue;
5
6 import core.atomic;
7 import core.sync.condition;
8 import core.sync.mutex;
9 import core.time;
10 import core.thread;
11
12 import std.container.dlist;
13
14
15 /**
16 * It's a thread-safe queue
17 */
18 class SimpleQueue(T) : Queue!(T) {
19 private DList!T _list;
20 private Mutex _headLock;
21 private Duration _timeout;
22 private bool _isWaiting = false;
23
24 shared int _incomings = 0;
25 shared int _outgoings = 0;
26
27 /** Wait queue for waiting takes */
28 private Condition _notEmpty;
29
30 this(Duration timeout = 10.seconds) {
31 _timeout = timeout;
32 _headLock = new Mutex();
33 _notEmpty = new Condition(_headLock);
34 }
35
36 override bool isEmpty() {
37 _headLock.lock();
38 scope (exit)
39 _headLock.unlock();
40
41 return _list.empty();
42 }
43
44 override T pop() {
45 _headLock.lock();
46 scope (exit) {
47 _headLock.unlock();
48 }
49
50 if(isEmpty()) {
51 _isWaiting = true;
52 bool v = _notEmpty.wait(_timeout);
53 _isWaiting = false;
54 if(!v) {
55 version (HUNT_IO_DEBUG) {
56 infof("Timeout in %s.", _timeout);
57 }
58 return T.init;
59 }
60 }
61
62 if(_list.empty())
63 return T.init;
64
65 T item = _list.front();
66 _list.removeFront();
67
68 return item;
69 }
70
71 override void push(T item) {
72 _headLock.lock();
73 scope (exit)
74 _headLock.unlock();
75
76 _list.insert(item);
77
78 if(_isWaiting) {
79 _notEmpty.notify();
80 }
81 }
82
83 override void clear() {
84 _headLock.lock();
85 scope (exit)
86 _headLock.unlock();
87
88 _list.clear();
89 _notEmpty.notify();
90 }
91 }