1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.IOCP;
13 
14 // dfmt off
15 version (HAVE_IOCP) : 
16 // dfmt on
17 
18 import hunt.io.socket.Common;
19 import hunt.io.socket;
20 import hunt.event.timer;
21 import hunt.logging;
22 import hunt.system.Error;
23 
24 import core.sys.windows.windows;
25 import std.conv;
26 
27 /**
28 */
29 class AbstractSelector : Selector {
30     this() {
31         _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0);
32         if (_iocpHandle is null)
33             errorf("CreateIoCompletionPort failed: %d\n", GetLastError());
34         _event = new EventChannel(this);
35         _timer.init();
36     }
37 
38     ~this() {
39         // import std.socket;
40         // std.socket.close(_iocpHandle);
41     }
42 
43     override bool register(AbstractChannel channel) {
44         assert(channel !is null);
45         ChannelType ct = channel.type;
46         auto fd = channel.handle;
47         version (HUNT_DEBUG)
48             tracef("register, channel(fd=%d, type=%s)", fd, ct);
49 
50         if (ct == ChannelType.Timer) {
51             AbstractTimer timerChannel = cast(AbstractTimer) channel;
52             assert(timerChannel !is null);
53             if (!timerChannel.setTimerOut())
54                 return false;
55             _timer.timeWheel().addNewTimer(timerChannel.timer, timerChannel.wheelSize());
56         } else if (ct == ChannelType.TCP
57                 || ct == ChannelType.Accept || ct == ChannelType.UDP) {
58             version (HUNT_DEBUG)
59                 trace("Run CreateIoCompletionPort on socket: ", fd);
60 
61             _event.setNext(channel);
62             CreateIoCompletionPort(cast(HANDLE) fd, _iocpHandle,
63                     cast(size_t)(cast(void*) channel), 0);
64         } else {
65             warningf("Can't register a channel: %s", ct);
66         }
67         return true;
68     }
69 
70     override bool reregister(AbstractChannel channel) {
71         throw new LoopException("IOCP does not support reregister!");
72     }
73 
74     override bool deregister(AbstractChannel channel) {
75         // FIXME: Needing refactor or cleanup -@Administrator at 8/28/2018, 3:28:18 PM
76         // https://stackoverflow.com/questions/6573218/removing-a-handle-from-a-i-o-completion-port-and-other-questions-about-iocp
77         //tracef("deregister (fd=%d)", channel.handle);
78 
79         // IocpContext _data;
80         // _data.channel = channel;
81         // _data.operation = IocpOperation.close;
82         // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
83 
84         return true;
85     }
86 
87     void weakUp() {
88         IocpContext _data;
89         _data.channel = _event;
90         _data.operation = IocpOperation.event;
91 
92         // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
93         PostQueuedCompletionStatus(_iocpHandle, 0, 0, null);
94     }
95 
96     override void onLoop(scope void delegate() weakup, long timeout = -1) {
97         _timer.init();
98         super.onLoop(weakup, timeout);
99     }
100 
101     override protected int doSelect(long t) {
102         auto timeout = _timer.doWheel();
103         OVERLAPPED* overlapped;
104         ULONG_PTR key = 0;
105         DWORD bytes = 0;
106 
107         // const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes,
108         //         &key, &overlapped, INFINITE);
109         // tracef("GetQueuedCompletionStatus, ret=%d", ret);
110 
111         // trace("timeout=", timeout);
112         const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, &key,
113                 &overlapped, timeout);
114 
115         IocpContext* ev = cast(IocpContext*) overlapped;
116         if (ret == 0) {
117             const auto erro = GetLastError();
118             // About ERROR_OPERATION_ABORTED
119             // https://stackoverflow.com/questions/7228703/the-i-o-operation-has-been-aborted-because-of-either-a-thread-exit-or-an-applica
120             if (erro == WAIT_TIMEOUT || erro == ERROR_OPERATION_ABORTED) // 
121                 return ret;
122             
123             debug errorf("error occurred, code=%d, message: %s", erro, getErrorMessage(erro));
124             assert(ev !is null);
125             // if (ev !is null) {
126             AbstractChannel channel = ev.channel;
127             if (channel !is null && !channel.isClosed())
128                 channel.close();
129             // }
130         } else if (ev is null || ev.channel is null)
131             warning("ev is null or ev.watche is null");
132         else
133             handleIocpOperation(ev.operation, ev.channel, bytes);
134         return ret;
135     }
136 
137     private void handleIocpOperation(IocpOperation op, AbstractChannel channel, DWORD bytes) {
138 
139         version (HUNT_DEBUG)
140             info("ev.operation: ", op);
141 
142         switch (op) {
143         case IocpOperation.accept:
144             channel.onRead();
145             break;
146         case IocpOperation.connect:
147             onSocketRead(channel, 0);
148             break;
149         case IocpOperation.read:
150             onSocketRead(channel, bytes);
151             break;
152         case IocpOperation.write:
153             onSocketWrite(channel, bytes);
154             break;
155         case IocpOperation.event:
156             channel.onRead();
157             break;
158         case IocpOperation.close:
159             warning("close: ",);
160             break;
161         default:
162             warning("unsupported operation type: ", op);
163             break;
164         }
165     }
166 
167     override void stop() {
168         super.stop();
169         weakUp();
170     }
171 
172     void handleTimer() {
173 
174     }
175 
176     override void dispose() {
177 
178     }
179 
180     private void onSocketRead(AbstractChannel channel, size_t len) {
181         debug if (channel is null) {
182             warning("channel is null");
183             return;
184         }
185 
186         if (len == 0 || channel.isClosed) {
187             version (HUNT_DEBUG)
188                 info("channel closed");
189             return;
190         }
191 
192         AbstractSocketChannel socketChannel = cast(AbstractSocketChannel) channel;
193         // assert(socketChannel !is null, "The type of channel is: " ~ typeid(channel).name);
194         if (socketChannel is null) {
195             warning("The channel socket is null: ");
196         } else {
197             socketChannel.setRead(len);
198             channel.onRead();
199         }
200     }
201 
202     private void onSocketWrite(AbstractChannel channel, size_t len) {
203         debug if (channel is null) {
204             warning("channel is null");
205             return;
206         }
207         AbstractStream client = cast(AbstractStream) channel;
208         // assert(client !is null, "The type of channel is: " ~ typeid(channel).name);
209         if (client is null) {
210             warning("The channel socket is null: ");
211             return;
212         }
213         client.onWriteDone(len); // Notify the client about how many bytes actually sent.
214     }
215 
216 private:
217     HANDLE _iocpHandle;
218     EventChannel _event;
219     CustomTimer _timer;
220 }