1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.IOCP;
13 
14 // dfmt off
15 version (HAVE_IOCP) : 
16 // dfmt on
17 
18 import hunt.event.selector.Selector;
19 import hunt.io.channel.Common;
20 import hunt.io.channel;
21 import hunt.event.timer;
22 import hunt.logging.ConsoleLogger;
23 import hunt.system.Error;
24 import hunt.io.channel.iocp.AbstractStream;
25 import core.sys.windows.windows;
26 import std.conv;
27 import std.socket;
28 import hunt.util.TaskPool;
29 import std.container : DList;
30 
31 
32 /**
33  * 
34  */
35 class AbstractSelector : Selector {
36 
37     this(size_t number, size_t divider,TaskPool pool = null, size_t maxChannels = 1500) {
38         _taskPool = pool;
39         super(number, divider, maxChannels);
40         _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0);
41         if (_iocpHandle is null)
42             errorf("CreateIoCompletionPort failed: %d\n", GetLastError());
43         _timer.init();
44         _stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
45     }
46 
47     ~this() {
48         // import std.socket;
49         // std.socket.close(_iocpHandle);
50     }
51 
52     override bool register(AbstractChannel channel) {
53         super.register(channel);
54 
55         ChannelType ct = channel.type;
56         auto fd = channel.handle;
57         version (HUNT_IO_DEBUG)
58             tracef("register, channel(fd=%d, type=%s)", fd, ct);
59 
60         if (ct == ChannelType.Timer) {
61             AbstractTimer timerChannel = cast(AbstractTimer) channel;
62             assert(timerChannel !is null);
63             if (!timerChannel.setTimerOut())
64                 return false;
65             _timer.timeWheel().addNewTimer(timerChannel.timer, timerChannel.wheelSize());
66         } else if (ct == ChannelType.TCP
67                 || ct == ChannelType.Accept || ct == ChannelType.UDP) {
68             version (HUNT_IO_DEBUG)
69                 trace("Run CreateIoCompletionPort on socket: ", fd);
70 
71             // _event.setNext(channel);
72             CreateIoCompletionPort(cast(HANDLE) fd, _iocpHandle,
73                     cast(size_t)(cast(void*) channel), 0);
74 
75             //cast(AbstractStream)channel)
76         } else {
77             warningf("Can't register a channel: %s", ct);
78         }
79 
80         auto stream = cast(AbstractStream)channel;
81         if (stream !is null && !stream.isClient()) {
82             stream.beginRead();
83         }
84 
85         return true;
86     }
87 
88     override bool deregister(AbstractChannel channel) {
89         // FIXME: Needing refactor or cleanup -@Administrator at 8/28/2018, 3:28:18 PM
90         // https://stackoverflow.com/questions/6573218/removing-a-handle-from-a-i-o-completion-port-and-other-questions-about-iocp
91         version (HUNT_IO_DEBUG)
92             tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type);
93 
94         // IocpContext _data;
95         // _data.channel = channel;
96         // _data.operation = IocpOperation.close;
97         // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
98         //(cast(AbstractStream)channel).stopAction();
99         //WaitForSingleObject
100         return super.deregister(channel);
101     }
102 
103     // void weakUp() {
104     //     IocpContext _data;
105     //     // _data.channel = _event;
106     //     _data.operation = IocpOperation.event;
107 
108     //     // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
109     //     PostQueuedCompletionStatus(_iocpHandle, 0, 0, null);
110     // }
111 
112     override void onLoop(long timeout = -1) {
113         _timer.init();
114         super.onLoop(timeout);
115     }
116 
117     protected override int doSelect(long t) {
118         auto timeout = _timer.doWheel();
119         OVERLAPPED* overlapped;
120         ULONG_PTR key = 0;
121         DWORD bytes = 0;
122         IocpContext* ev;
123 
124         while( WAIT_OBJECT_0 != WaitForSingleObject(_stopEvent , 0) && !isStopping()) {
125             // https://docs.microsoft.com/zh-cn/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatus
126             const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, &key,
127                     &overlapped, INFINITE);
128             
129             ev = cast(IocpContext*) overlapped;
130             // ev = cast(IocpContext *)( cast(PCHAR)(overlapped) - cast(ULONG_PTR)(&(cast(IocpContext*)0).overlapped));
131             if (ret == 0) {
132                 DWORD dwErr = GetLastError();
133                 if (WAIT_TIMEOUT == dwErr) {
134                     continue;
135                 } else {
136                     assert(ev !is null, "The IocpContext is null");
137                     AbstractChannel channel = ev.channel;
138                     if (channel !is null && !channel.isClosed()) {
139                         channel.close();
140                     }
141                     continue;
142                 }
143             } else if (ev is null || ev.channel is null) {
144                version(HUNT_IO_DEBUG) warningf("The ev is null or ev.watche is null. isStopping: %s", isStopping());
145             } else {
146                 if (0 == bytes && (ev.operation == IocpOperation.read || ev.operation == IocpOperation.write)) {
147                     AbstractChannel channel = ev.channel;
148                     if (channel !is null && !channel.isClosed()) {
149                         channel.close();
150                     }
151                     continue;
152                 } else {
153                     handleChannelEvent(ev.operation, ev.channel, bytes);
154                 }
155             }
156         }
157 
158         return 0;
159     }
160 
161     private void handleChannelEvent(IocpOperation op, AbstractChannel channel, DWORD bytes) {
162 
163         version (HUNT_IO_DEBUG)
164             infof("ev.operation: %s, fd=%d", op, channel.handle);
165 
166         switch (op) {
167             case IocpOperation.accept:
168                 channel.onRead();
169                 break;
170             case IocpOperation.connect:
171                 onSocketRead(channel, 0);
172                 (cast(AbstractStream)channel).beginRead();
173                 break;
174             case IocpOperation.read:
175                 onSocketRead(channel, bytes);
176                 break;
177             case IocpOperation.write:
178                 onSocketWrite(channel, bytes);
179                 break;
180             case IocpOperation.event:
181                 channel.onRead();
182                 break;
183             case IocpOperation.close:
184                 break;
185             default:
186                 warning("unsupported operation type: ", op);
187             break;
188         }
189     }
190 
191     override void stop() {
192         super.stop();
193         // weakUp();
194         PostQueuedCompletionStatus(_iocpHandle, 0, 0, null);
195     }
196 
197     void handleTimer() {
198 
199     }
200 
201     // override void dispose() {
202 
203     // }
204 
205     private void onSocketRead(AbstractChannel channel, size_t len) {
206         debug if (channel is null) {
207             warning("channel is null");
208             return;
209         }
210 
211         if (channel is null)
212         {
213             warning("channel is null");
214             return;
215         }
216 
217         // (cast(AbstractStream)channel).setBusyWrite(false);
218 
219         if (len == 0 || channel.isClosed) {
220             version (HUNT_IO_DEBUG)
221                infof("channel [fd=%d] closed. isClosed: %s, len: %d", channel.handle, channel.isClosed, len);
222             //channel.close();
223             return;
224         }
225 
226         AbstractSocketChannel socketChannel = cast(AbstractSocketChannel) channel;
227         // assert(socketChannel !is null, "The type of channel is: " ~ typeid(channel).name);
228         if (socketChannel is null) {
229             warning("The channel socket is null: ");
230         } else {
231             socketChannel.setRead(len);
232             channel.onRead();
233         }
234     }
235 
236     private void onSocketWrite(AbstractChannel channel, size_t len) {
237         debug if (channel is null) {
238             warning("channel is null");
239             return;
240         }
241         AbstractStream client = cast(AbstractStream) channel;
242         // assert(client !is null, "The type of channel is: " ~ typeid(channel).name);
243         if (client is null) {
244             warning("The channel socket is null: ");
245             return;
246         }
247         client.onWriteDone(len); // Notify the client about how many bytes actually sent.
248     }
249 
250 
251 private:
252     HANDLE _iocpHandle;
253     CustomTimer _timer;
254     HANDLE _stopEvent;
255     TaskPool _taskPool;
256 }