1 module hunt.io.worker.Worker;
2 
3 import hunt.io.ByteBuffer;
4 import core.thread;
5 import core.sync.condition;
6 import core.sync.mutex;
7 import std.container : DList;
8 import hunt.io.channel;
9 
10 class Task(T) {
11   this (T obj , ByteBuffer buffer)
12   {
13     this.channel = obj;
14     this.buffer = buffer;
15   }
16   T channel;
17   ByteBuffer buffer;
18 }
19 
20 
21 class Worker(T) {
22 
23 	this()
24   {
25     _isExit = false;
26     _mutex = new Mutex();
27     _condition = new Condition(_mutex);
28     _isExit = false;
29     _thread = new Thread(() {
30       doThreadProc();
31     });
32   }
33 
34   void doThreadProc()
35   {
36       do
37       {
38         Task!T task = null;
39         {
40           _condition.mutex().lock();
41           if (_queue.empty())
42           {
43             _condition.wait();
44           }else
45           {
46             task = _queue.front();
47             _queue.removeFront();
48           }
49           _condition.mutex().unlock();
50         }
51 
52         if (_isExit)
53         {
54           break;
55         }
56         if (task !is null && !((cast(AbstractChannel)(task.channel)).isClosed()))
57         {
58           auto handle  = task.channel.getDataReceivedHandler();
59           if (handle !is null)
60           {
61             handle(task.buffer);
62           }
63         }
64       } while (!_isExit);
65   }
66 
67   void run()
68   {
69     _thread.start();
70   }
71 
72   void put(Task!T task)
73   {
74     if(task !is null)
75     {
76       _condition.mutex().lock();
77       _queue.insertBack(task);
78       _condition.notify();
79       _condition.mutex().unlock();
80     }
81   }
82 
83   private {
84     bool                _isExit;
85     Condition           _condition;
86     Mutex               _mutex;
87     Thread              _thread;
88     DList!(Task!T)      _queue;
89   }
90 
91 }
92