1 module hunt.util.worker.WorkerThread;
2 
3 import hunt.util.Closeable;
4 import hunt.util.ResoureManager;
5 import hunt.util.worker.Task;
6 import hunt.util.worker.Worker;
7 
8 import hunt.logging.ConsoleLogger;
9 
10 import core.atomic;
11 import core.memory;
12 import core.thread;
13 import core.sync.condition;
14 import core.sync.mutex;
15 import std.conv;
16 
17 
18 
19 enum WorkerThreadState {
20     Idle,
21     Busy, // occupied
22     Stopped
23 }
24 
25 bool inWorkerThread() {
26     WorkerThread th = cast(WorkerThread) Thread.getThis();
27     return th !is null;
28 }
29 
30 /**
31  *
32  */
33 class WorkerThread : Thread {
34 
35     private shared WorkerThreadState _state;
36     private size_t _index;
37     private Task _task;
38     private Duration _timeout;
39 
40     private Condition _condition;
41     private Mutex _mutex;
42 
43     this(size_t index, Duration timeout = 5.seconds, size_t stackSize = 0) {
44         _index = index;
45         _timeout = timeout;
46         _state = WorkerThreadState.Idle;
47         _mutex = new Mutex();
48         _condition = new Condition(_mutex);
49         this.name = "WorkerThread-" ~ _index.to!string();
50         super(&run, stackSize);
51     }
52 
53     void stop() {
54         _state = WorkerThreadState.Stopped;
55     }
56 
57     bool isBusy() {
58         return _state == WorkerThreadState.Busy;
59     }
60     
61     bool isIdle() {
62         return _state == WorkerThreadState.Idle;
63     }
64 
65     WorkerThreadState state() {
66         return _state;
67     }
68 
69     size_t index() {
70         return _index;
71     }
72 
73     Task task() {
74         return _task;
75     }
76 
77     bool attatch(Task task) {
78         assert(task !is null);
79         bool r = cas(&_state, WorkerThreadState.Idle, WorkerThreadState.Busy);
80 
81         if (r) {
82             version(HUNT_IO_DEBUG) {
83                 infof("attatching task %d with thread %s", task.id, this.name);
84             }
85 
86             _mutex.lock();
87             scope (exit) {
88                 _mutex.unlock();
89             }
90             _task = task;
91             _condition.notify();
92             
93         } else {
94             warningf("%s is unavailable. state: %s", this.name(), _state);
95         }
96 
97         return r;
98     }
99 
100     private void run() nothrow {
101         while (_state != WorkerThreadState.Stopped) {
102 
103             scope (exit) {
104                 version (HUNT_IO_DEBUG) {
105                     tracef("%s Done. state: %s", this.name(), _state);
106                 }
107 
108                 collectResoure();
109                 _task = null;
110                 bool r = cas(&_state, WorkerThreadState.Busy, WorkerThreadState.Idle);
111                 if(!r) {
112                     warningf("Failed to set thread %s to Idle, its state is %s", this.name, _state);
113                 }
114             } 
115 
116             try {
117                 doRun();
118             } catch (Throwable ex) {
119                 warning(ex);
120             } 
121         }
122         
123         version (HUNT_DEBUG) tracef("%s Stopped. state: %s", this.name(), _state);
124     }
125 
126     private bool _isWaiting = false;
127 
128     private void doRun() {
129         _mutex.lock();
130         
131         Task task = _task;
132         while(task is null && _state != WorkerThreadState.Stopped) {
133             bool r = _condition.wait(_timeout);
134             task = _task;
135 
136             version(HUNT_IO_DEBUG) {
137                 if(!r && _state == WorkerThreadState.Busy) {
138                     if(task is null) {
139                         warningf("No task attatched on a busy thread %s in %s, task: %s", this.name, _timeout);
140                     } else {
141                         warningf("more tests need for this status, thread %s in %s", this.name, _timeout);
142                     }
143                 }
144             }
145         }
146 
147         _mutex.unlock();
148 
149         if(task !is null) {
150             version(HUNT_IO_DEBUG) {
151                 tracef("Try to exeucte task %d in thread %s, its status: %s", task.id, this.name, task.status);
152             }
153             task.execute();
154         }
155     }
156 }