1 module hunt.util.worker.Worker;
2 
3 import hunt.util.worker.Task;
4 // import hunt.util.worker.TaskQueue;
5 import hunt.util.worker.WorkerThread;
6 import hunt.logging;
7 
8 import core.atomic;
9 import core.sync.condition;
10 import core.sync.mutex;
11 import core.thread;
12 
13 import std.conv; 
14 import std.concurrency;
15 
16 
17 
18 /**
19  * 
20  */
21 class Worker {
22 
23     private size_t _size;
24     private Thread _masterThread;
25     private WorkerThread[] _workerThreads;
26     private Task[size_t] _tasks;
27     private Mutex _taskLocker;
28 
29 
30     private TaskQueue _taskQueue;
31     private shared bool _isRunning = false;
32 
33     this(TaskQueue taskQueue, size_t size = 8) {
34         _taskQueue = taskQueue;
35         _size = size;
36 
37         version(HUNT_DEBUG) {
38             infof("Worker size: %d", size);
39         }
40 
41         initialize();
42     }
43 
44     private void initialize() {
45         _taskLocker = new Mutex();
46         _workerThreads = new WorkerThread[_size];
47         
48         foreach(size_t index; 0 .. _size) {
49             WorkerThread thread = new WorkerThread(index);
50             thread.start();
51 
52             _workerThreads[index] = thread;
53         }
54     }
55 
56     void inspect() {
57 
58         foreach(WorkerThread th; _workerThreads) {
59             
60             Task task = th.task();
61 
62             if(th.state() == WorkerThreadState.Busy) {
63                 if(task is null) {
64                     warning("A dead worker thread detected: %s, %s", th.name, th.state());
65                 } else {
66                     tracef("Thread: %s,  state: %s, lifeTime: %s", th.name, th.state(), task.lifeTime());
67                 }
68             } else {
69                 if(task is null) {
70                     tracef("Thread: %s,  state: %s", th.name, th.state());
71                 } else {
72                     tracef("Thread: %s,  state: %s", th.name, th.state(), task.executionTime);
73                 }
74             }
75         }
76     }
77 
78     void put(Task task) {
79         _taskQueue.push(task);
80 
81         _taskLocker.lock();
82         scope(exit) {
83             _taskLocker.unlock();
84         }
85 
86         _tasks[task.id] = task;
87     }
88 
89     Task get(size_t id) {
90         _taskLocker.lock();
91         scope(exit) {
92             _taskLocker.unlock();
93         } 
94 
95         auto itemPtr = id in _tasks;
96         if(itemPtr is null) {
97             throw new Exception("Task does NOT exist: " ~ id.to!string);
98         }
99 
100         return *itemPtr;
101     }
102 
103     void remove(size_t id) {
104         _taskLocker.lock();
105         scope(exit) {
106             _taskLocker.unlock();
107         } 
108 
109         _tasks.remove(id);
110     }
111 
112     void clear() {
113         _taskLocker.lock();
114         scope(exit) {
115             _taskLocker.unlock();
116         } 
117         _tasks.clear();
118 
119     }
120 
121     void run() {
122         bool r = cas(&_isRunning, false, true);
123         if(r) {
124             _masterThread = new Thread(&doRun);
125             _masterThread.start();
126         }
127     }
128 
129     void stop() {
130         bool r = cas(&_isRunning, true, false);
131 
132         if(r) {
133             version(HUNT_IO_DEBUG) {
134                 info("Stopping all the threads...");
135             }
136 
137 
138             foreach(size_t index; 0 .. _size) {
139                 _workerThreads[index].stop();
140                 _workerThreads[index].join();
141             }
142 
143             
144             // To stop the master thread as soon as possible.
145             // _taskQueue.push(null); 
146             _taskQueue.clear();
147 
148             if(_masterThread !is null) {
149                 _masterThread.join();
150             }
151 
152             version(HUNT_IO_DEBUG) {
153                 info("All the threads stopped.");
154             }
155         }
156     }
157 
158     private WorkerThread findIdleThread() {
159         foreach(size_t index, WorkerThread thread; _workerThreads) {
160             version(HUNT_IO_DEBUG) {
161                 tracef("Thread: %s, state: %s", thread.name, thread.state);
162             }
163 
164             if(thread.isIdle())
165                 return thread;
166         }
167 
168         return null;
169     } 
170 
171     private void doRun() {
172         while(_isRunning) {
173             try {
174                 version(HUNT_IO_DEBUG) info("running...");
175                 Task task = _taskQueue.pop();
176                 if(!_isRunning) break;
177 
178                 if(task is null) {
179                     version(HUNT_IO_DEBUG) {
180                         warning("A null task popped!");
181                         inspect();
182                     }
183                     continue;
184                 }
185 
186                 WorkerThread workerThread;
187                 bool isAttatched = false;
188                 
189                 do {
190                     workerThread = findIdleThread();
191 
192                     // All worker threads are busy!
193                     if(workerThread is null) {
194                         // version(HUNT_METRIC) {
195                         //     _taskQueue.inspect();
196                         // }
197                         // trace("All worker threads are busy!");
198                         // Thread.sleep(1.seconds);
199                         // Thread.sleep(10.msecs);
200                         Thread.yield();
201                     } else {
202                         isAttatched = workerThread.attatch(task);
203                     }
204                 } while(!isAttatched && _isRunning);
205 
206             } catch(Throwable ex) {
207                 warning(ex);
208             }
209         }
210 
211         version(HUNT_IO_DEBUG) info("Worker stopped!");
212 
213     }
214 
215 }
216