1 module hunt.util.queue.SimpleQueue;
2 
3 import hunt.logging.ConsoleLogger;
4 import hunt.util.queue.Queue;
5 
6 import core.atomic;
7 import core.sync.condition;
8 import core.sync.mutex;
9 import core.time;
10 import core.thread;
11 
12 import std.container.dlist;
13 
14 
15 /**
16  * It's a thread-safe queue
17  */
18 class SimpleQueue(T) : Queue!(T) {
19     private DList!T _list;
20     private Mutex _headLock;
21     private Duration _timeout;
22     private bool _isWaiting = false;
23 
24     shared int _incomings = 0;
25     shared int _outgoings = 0;
26 
27     /** Wait queue for waiting takes */
28     private Condition _notEmpty;
29 
30     this(Duration timeout = 10.seconds) {
31         _timeout = timeout;
32         _headLock = new Mutex();
33         _notEmpty = new Condition(_headLock);
34     }
35 
36     override bool isEmpty() {
37         _headLock.lock();
38         scope (exit)
39             _headLock.unlock();
40 
41         return _list.empty();
42     }
43 
44     override T pop() {
45         _headLock.lock();
46         scope (exit) {
47             _headLock.unlock();
48         }
49 
50         if(isEmpty()) {
51             _isWaiting = true;
52             bool v = _notEmpty.wait(_timeout);
53             _isWaiting = false;
54             if(!v) {
55                 version (HUNT_IO_DEBUG) {
56                     tracef("Timeout in %s.", _timeout);
57                 }
58                 return T.init;
59             }
60         }
61 
62         T item = _list.front();
63         _list.removeFront();
64 
65         return item;
66     }
67 
68     override void push(T item) {
69         _headLock.lock();
70         scope (exit)
71             _headLock.unlock();
72 
73         _list.insert(item);
74 
75         if(_isWaiting) {
76             _notEmpty.notify();
77         }
78     }
79 }