1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.IOCP;
13 
14 // dfmt off
15 version (HAVE_IOCP) : 
16 // dfmt on
17 
18 import hunt.event.selector.Selector;
19 import hunt.io.channel.Common;
20 import hunt.io.channel;
21 import hunt.event.timer;
22 import hunt.logging.ConsoleLogger;
23 import hunt.system.Error;
24 import hunt.io.channel.iocp.AbstractStream;
25 import core.sys.windows.windows;
26 import std.conv;
27 import std.socket;
28 import hunt.concurrency.TaskPool;
29 import std.container : DList;
30 
31 
32 /**
33  * 
34  */
35 class AbstractSelector : Selector {
36 
37     this(size_t number, size_t divider,TaskPool pool = null, size_t maxChannels = 1500) {
38         _taskPool = pool;
39         super(number, divider, maxChannels);
40         _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0);
41         if (_iocpHandle is null)
42             errorf("CreateIoCompletionPort failed: %d\n", GetLastError());
43         _timer.init();
44         _stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
45     }
46 
47     ~this() {
48         // import std.socket;
49         // std.socket.close(_iocpHandle);
50     }
51 
52     override bool register(AbstractChannel channel) {
53         assert(channel !is null);
54         ChannelType ct = channel.type;
55         auto fd = channel.handle;
56         version (HUNT_IO_DEBUG)
57             tracef("register, channel(fd=%d, type=%s)", fd, ct);
58         
59         super.register(channel);
60 
61         if (ct == ChannelType.Timer) {
62             AbstractTimer timerChannel = cast(AbstractTimer) channel;
63             assert(timerChannel !is null);
64             if (!timerChannel.setTimerOut())
65                 return false;
66             _timer.timeWheel().addNewTimer(timerChannel.timer, timerChannel.wheelSize());
67         } else if (ct == ChannelType.TCP
68                 || ct == ChannelType.Accept || ct == ChannelType.UDP) {
69             version (HUNT_IO_DEBUG)
70                 trace("Run CreateIoCompletionPort on socket: ", fd);
71 
72             // _event.setNext(channel);
73             CreateIoCompletionPort(cast(HANDLE) fd, _iocpHandle,
74                     cast(size_t)(cast(void*) channel), 0);
75 
76             //cast(AbstractStream)channel)
77         } else {
78             warningf("Can't register a channel: %s", ct);
79         }
80 
81         auto stream = cast(AbstractStream)channel;
82         if (stream !is null && !stream.isClient()) {
83             stream.beginRead();
84         }
85 
86         return true;
87     }
88 
89     override bool deregister(AbstractChannel channel) {
90         // FIXME: Needing refactor or cleanup -@Administrator at 8/28/2018, 3:28:18 PM
91         // https://stackoverflow.com/questions/6573218/removing-a-handle-from-a-i-o-completion-port-and-other-questions-about-iocp
92         version(HUNT_IO_DEBUG) 
93         tracef("deregister (fd=%d)", channel.handle);
94 
95 
96 
97         // IocpContext _data;
98         // _data.channel = channel;
99         // _data.operation = IocpOperation.close;
100         // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
101         //(cast(AbstractStream)channel).stopAction();
102         //WaitForSingleObject
103         return super.deregister(channel);
104     }
105 
106     // void weakUp() {
107     //     IocpContext _data;
108     //     // _data.channel = _event;
109     //     _data.operation = IocpOperation.event;
110 
111     //     // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
112     //     PostQueuedCompletionStatus(_iocpHandle, 0, 0, null);
113     // }
114 
115     override void onLoop(long timeout = -1) {
116         _timer.init();
117         super.onLoop(timeout);
118     }
119 
120     protected override int doSelect(long t) {
121         auto timeout = _timer.doWheel();
122         OVERLAPPED* overlapped;
123         ULONG_PTR key = 0;
124         DWORD bytes = 0;
125         IocpContext* ev;
126 
127         while( WAIT_OBJECT_0 != WaitForSingleObject(_stopEvent , 0) && !isStopping()) {
128             // https://docs.microsoft.com/zh-cn/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatus
129             const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, &key,
130                     &overlapped, INFINITE);
131             
132             ev = cast(IocpContext*) overlapped;
133             // ev = cast(IocpContext *)( cast(PCHAR)(overlapped) - cast(ULONG_PTR)(&(cast(IocpContext*)0).overlapped));
134             if (ret == 0) {
135                 DWORD dwErr = GetLastError();
136                 if (WAIT_TIMEOUT == dwErr) {
137                     continue;
138                 } else {
139                     assert(ev !is null, "The IocpContext is null");
140                     AbstractChannel channel = ev.channel;
141                     if (channel !is null && !channel.isClosed()) {
142                         channel.close();
143                     }
144                     continue;
145                 }
146             } else if (ev is null || ev.channel is null) {
147                version(HUNT_IO_DEBUG) warningf("The ev is null or ev.watche is null. isStopping: %s", isStopping());
148             } else {
149                 if (0 == bytes && (ev.operation == IocpOperation.read || ev.operation == IocpOperation.write)) {
150                     AbstractChannel channel = ev.channel;
151                     if (channel !is null && !channel.isClosed()) {
152                         channel.close();
153                     }
154                     continue;
155                 } else {
156                     handleChannelEvent(ev.operation, ev.channel, bytes);
157                 }
158             }
159         }
160 
161         return 0;
162     }
163 
164     private void handleChannelEvent(IocpOperation op, AbstractChannel channel, DWORD bytes) {
165 
166         version (HUNT_IO_DEBUG)
167             infof("ev.operation: %s, fd=%d", op, channel.handle);
168 
169         switch (op) {
170             case IocpOperation.accept:
171                 channel.onRead();
172                 break;
173             case IocpOperation.connect:
174                 onSocketRead(channel, 0);
175                 (cast(AbstractStream)channel).beginRead();
176                 break;
177             case IocpOperation.read:
178                 onSocketRead(channel, bytes);
179                 break;
180             case IocpOperation.write:
181                 onSocketWrite(channel, bytes);
182                 break;
183             case IocpOperation.event:
184                 channel.onRead();
185                 break;
186             case IocpOperation.close:
187                 break;
188             default:
189                 warning("unsupported operation type: ", op);
190             break;
191         }
192     }
193 
194     override void stop() {
195         super.stop();
196         // weakUp();
197         PostQueuedCompletionStatus(_iocpHandle, 0, 0, null);
198     }
199 
200     void handleTimer() {
201 
202     }
203 
204     // override void dispose() {
205 
206     // }
207 
208     private void onSocketRead(AbstractChannel channel, size_t len) {
209         debug if (channel is null) {
210             warning("channel is null");
211             return;
212         }
213 
214         if (channel is null)
215         {
216             warning("channel is null");
217             return;
218         }
219 
220         // (cast(AbstractStream)channel).setBusyWrite(false);
221 
222         if (len == 0 || channel.isClosed) {
223             version (HUNT_IO_DEBUG)
224                infof("channel [fd=%d] closed. isClosed: %s, len: %d", channel.handle, channel.isClosed, len);
225             //channel.close();
226             return;
227         }
228 
229         AbstractSocketChannel socketChannel = cast(AbstractSocketChannel) channel;
230         // assert(socketChannel !is null, "The type of channel is: " ~ typeid(channel).name);
231         if (socketChannel is null) {
232             warning("The channel socket is null: ");
233         } else {
234             socketChannel.setRead(len);
235             channel.onRead();
236         }
237     }
238 
239     private void onSocketWrite(AbstractChannel channel, size_t len) {
240         debug if (channel is null) {
241             warning("channel is null");
242             return;
243         }
244         AbstractStream client = cast(AbstractStream) channel;
245         // assert(client !is null, "The type of channel is: " ~ typeid(channel).name);
246         if (client is null) {
247             warning("The channel socket is null: ");
248             return;
249         }
250         client.onWriteDone(len); // Notify the client about how many bytes actually sent.
251     }
252 
253 
254 private:
255     HANDLE _iocpHandle;
256     CustomTimer _timer;
257     HANDLE _stopEvent;
258     TaskPool _taskPool;
259 }