1 module hunt.util.worker.WorkerThread;
2 
3 import hunt.util.Closeable;
4 import hunt.util.ResoureManager;
5 import hunt.util.worker.Task;
6 import hunt.util.worker.Worker;
7 
8 import hunt.logging;
9 
10 import core.atomic;
11 import core.memory;
12 import core.thread;
13 import core.sync.condition;
14 import core.sync.mutex;
15 import std.conv;
16 
17 
18 
19 enum WorkerThreadState {
20     Idle,
21     Busy, // occupied
22     Stopped
23 }
24 
25 bool inWorkerThread() {
26     WorkerThread th = cast(WorkerThread) Thread.getThis();
27     return th !is null;
28 }
29 
30 /**
31  *
32  */
33 class WorkerThread : Thread {
34 
35     private shared WorkerThreadState _state;
36     private size_t _index;
37     private Task _task;
38     private Duration _timeout;
39 
40     private Condition _condition;
41     private Mutex _mutex;
42 
43     this(size_t index, Duration timeout = 5.seconds, size_t stackSize = 0) {
44         _index = index;
45         _timeout = timeout;
46         _state = WorkerThreadState.Idle;
47         _mutex = new Mutex();
48         _condition = new Condition(_mutex);
49         this.name = "WorkerThread-" ~ _index.to!string();
50         super(&run, stackSize);
51     }
52 
53     void stop() {
54         version(HUNT_IO_DEBUG) {
55             infof("Stopping thread %s", this.name);
56         }
57         _state = WorkerThreadState.Stopped;
58 
59         _mutex.lock();
60         scope (exit) {
61             _mutex.unlock();
62         }
63         _condition.notify();
64     }
65 
66     bool isBusy() {
67         return _state == WorkerThreadState.Busy;
68     }
69     
70     bool isIdle() {
71         return _state == WorkerThreadState.Idle;
72     }
73 
74     WorkerThreadState state() {
75         return _state;
76     }
77 
78     size_t index() {
79         return _index;
80     }
81 
82     Task task() {
83         return _task;
84     }
85 
86     bool attatch(Task task) {
87         assert(task !is null);
88         bool r = cas(&_state, WorkerThreadState.Idle, WorkerThreadState.Busy);
89 
90         if (r) {
91             version(HUNT_IO_DEBUG) {
92                 infof("attatching task %d with thread %s", task.id, this.name);
93             }
94 
95             _mutex.lock();
96             scope (exit) {
97                 _mutex.unlock();
98             }
99             _task = task;
100             _condition.notify();
101             
102         } else {
103             warningf("%s is unavailable. state: %s", this.name(), _state);
104         }
105 
106         return r;
107     }
108 
109     private void run() nothrow {
110         while (_state != WorkerThreadState.Stopped) {
111 
112             scope (exit) {
113                 version (HUNT_IO_DEBUG) {
114                     tracef("%s Done. state: %s", this.name(), _state);
115                 }
116 
117                 collectResoure();
118                 _task = null;
119 
120                 if(_state != WorkerThreadState.Stopped) {
121                     bool r = cas(&_state, WorkerThreadState.Busy, WorkerThreadState.Idle);
122                     version(HUNT_IO_DEBUG) {
123                         if(!r) {
124                             warningf("Failed to set thread %s to Idle, its state is %s", this.name, _state);
125                         }
126                     }
127                 }
128             } 
129 
130             try {
131                 doRun();
132             } catch (Throwable ex) {
133                 warning(ex);
134             } 
135         }
136         
137         version (HUNT_DEBUG) tracef("%s Stopped. state: %s", this.name(), _state);
138     }
139 
140     private bool _isWaiting = false;
141 
142     private void doRun() {
143         _mutex.lock();
144         
145         Task task = _task;
146         while(task is null && _state != WorkerThreadState.Stopped) {
147             bool r = _condition.wait(_timeout);
148             task = _task;
149 
150             version(HUNT_IO_DEBUG) {
151                 if(!r && _state == WorkerThreadState.Busy) {
152                     if(task is null) {
153                         warningf("No task attatched on a busy thread %s in %s, task: %s", this.name, _timeout);
154                     } else {
155                         warningf("more tests need for this status, thread %s in %s", this.name, _timeout);
156                     }
157                 }
158             }
159         }
160 
161         _mutex.unlock();
162 
163         if(task !is null) {
164             version(HUNT_IO_DEBUG) {
165                 tracef("Try to exeucte task %d in thread %s, its status: %s", task.id, this.name, task.status);
166             }
167             task.execute();
168         }
169     }
170 }