1 module hunt.io.worker.WorkerGroup;
2 
3 import std.stdio;
4 import std.container : DList;
5 import core.thread;
6 import core.sync.condition;
7 import core.sync.mutex;
8 import std.process;
9 import hunt.io.channel;
10 import hunt.io.BufferUtils;
11 import hunt.io.ByteBuffer;
12 import hunt.io.worker.Worker;
13 
14 
15 class WorkerGroup(T) {
16 
17 	this(size_t threadSize = 1) {
18     _mutex = new Mutex();
19     _condition = new Condition(_mutex);
20     _isExit = false;
21     _threadSize = threadSize;
22 
23     _thread = new Thread(() {
24       doThreadProc();
25     });
26 
27     for(int i = 0 ; i < _threadSize ; ++i)
28     {
29         auto worker = new Worker!T();
30        _workers[i] = worker;
31     }
32 	}
33 
34 
35   private
36   {
37     bool                _isExit;
38     Condition           _condition;
39     Mutex               _mutex;
40     DList!(Task!T)      _queue;
41     size_t              _threadSize;
42     Thread              _thread;
43     Worker!T[size_t]    _workers;
44   }
45 
46 
47   void put(T obj , ByteBuffer buffer)
48   {
49     if(obj !is null && buffer !is null)
50     {
51       auto task = new Task!T(obj, buffer);
52       _condition.mutex().lock();
53       _queue.insertBack(task);
54       _condition.notify();
55       _condition.mutex().unlock();
56     }
57   }
58 
59   void dispatch(Task!T task)
60   {
61       if (task !is null)
62       {
63         _workers[task.channel.handle % _threadSize].put(task);
64       }
65   }
66 
67   void doThreadProc()
68   {
69     	do
70       {
71           Task!T task = null;
72           {
73             _condition.mutex().lock();
74             if (_queue.empty())
75             {
76               _condition.wait();
77             }else
78             {
79               task = _queue.front();
80               _queue.removeFront();
81             }
82             _condition.mutex().unlock();
83           }
84 
85           if (_isExit)
86           {
87             break;
88           }
89 
90           dispatch(task);
91 
92       } while (!_isExit);
93 
94 	    return ;
95   }
96 
97   void stop()
98   {
99       _isExit = true;
100   }
101 
102   void run()
103   {
104       foreach(worker ; _workers.byValue)
105       {
106             worker.run();
107       }
108       _thread.start();
109   }
110 }
111