1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.IOCP; 13 14 // dfmt off 15 version (HAVE_IOCP) : 16 // dfmt on 17 18 import hunt.event.selector.Selector; 19 import hunt.io.channel.Common; 20 import hunt.io.channel; 21 import hunt.event.timer; 22 import hunt.logging; 23 import hunt.system.Error; 24 import hunt.io.channel.iocp.AbstractStream; 25 import core.sys.windows.windows; 26 import std.conv; 27 import std.socket; 28 import hunt.util.worker; 29 import std.container : DList; 30 31 32 /** 33 * 34 */ 35 class AbstractSelector : Selector { 36 37 this(size_t number, size_t divider, Worker worker = null, size_t maxChannels = 1500) { 38 super(number, divider, worker, maxChannels); 39 _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); 40 if (_iocpHandle is null) 41 errorf("CreateIoCompletionPort failed: %d\n", GetLastError()); 42 _timer.init(); 43 _stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 44 } 45 46 ~this() { 47 // import std.socket; 48 // std.socket.close(_iocpHandle); 49 } 50 51 override bool register(AbstractChannel channel) { 52 super.register(channel); 53 54 ChannelType ct = channel.type; 55 auto fd = channel.handle; 56 version (HUNT_IO_DEBUG) 57 tracef("register, channel(fd=%d, type=%s)", fd, ct); 58 59 if (ct == ChannelType.Timer) { 60 AbstractTimer timerChannel = cast(AbstractTimer) channel; 61 assert(timerChannel !is null); 62 if (!timerChannel.setTimerOut()) 63 return false; 64 _timer.timeWheel().addNewTimer(timerChannel.timer, timerChannel.wheelSize()); 65 } else if (ct == ChannelType.TCP 66 || ct == ChannelType.Accept || ct == ChannelType.UDP) { 67 version (HUNT_IO_DEBUG) 68 trace("Run CreateIoCompletionPort on socket: ", fd); 69 70 // _event.setNext(channel); 71 CreateIoCompletionPort(cast(HANDLE) fd, _iocpHandle, 72 cast(size_t)(cast(void*) channel), 0); 73 74 //cast(AbstractStream)channel) 75 } else { 76 warningf("Can't register a channel: %s", ct); 77 } 78 79 auto stream = cast(AbstractStream)channel; 80 if (stream !is null && !stream.isClient()) { 81 stream.beginRead(); 82 } 83 84 return true; 85 } 86 87 override bool deregister(AbstractChannel channel) { 88 // FIXME: Needing refactor or cleanup -@Administrator at 8/28/2018, 3:28:18 PM 89 // https://stackoverflow.com/questions/6573218/removing-a-handle-from-a-i-o-completion-port-and-other-questions-about-iocp 90 version(HUNT_IO_DEBUG) 91 tracef("deregister (fd=%d)", channel.handle); 92 93 94 95 // IocpContext _data; 96 // _data.channel = channel; 97 // _data.operation = IocpOperation.close; 98 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 99 //(cast(AbstractStream)channel).stopAction(); 100 //WaitForSingleObject 101 return super.deregister(channel); 102 } 103 104 // void weakUp() { 105 // IocpContext _data; 106 // // _data.channel = _event; 107 // _data.operation = IocpOperation.event; 108 109 // // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 110 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, null); 111 // } 112 113 override void onLoop(long timeout = -1) { 114 _timer.init(); 115 super.onLoop(timeout); 116 } 117 118 protected override int doSelect(long t) { 119 auto timeout = _timer.doWheel(); 120 OVERLAPPED* overlapped; 121 ULONG_PTR key = 0; 122 DWORD bytes = 0; 123 IocpContext* ev; 124 125 while( WAIT_OBJECT_0 != WaitForSingleObject(_stopEvent , 0) && !isStopping()) { 126 // https://docs.microsoft.com/zh-cn/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatus 127 const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, &key, 128 &overlapped, INFINITE); 129 130 ev = cast(IocpContext*) overlapped; 131 // ev = cast(IocpContext *)( cast(PCHAR)(overlapped) - cast(ULONG_PTR)(&(cast(IocpContext*)0).overlapped)); 132 if (ret == 0) { 133 DWORD dwErr = GetLastError(); 134 if (WAIT_TIMEOUT == dwErr) { 135 continue; 136 } else { 137 assert(ev !is null, "The IocpContext is null"); 138 AbstractChannel channel = ev.channel; 139 if (channel !is null && !channel.isClosed()) { 140 channel.close(); 141 } 142 continue; 143 } 144 } else if (ev is null || ev.channel is null) { 145 version(HUNT_IO_DEBUG) warningf("The ev is null or ev.watche is null. isStopping: %s", isStopping()); 146 } else { 147 if (0 == bytes && (ev.operation == IocpOperation.read || ev.operation == IocpOperation.write)) { 148 AbstractChannel channel = ev.channel; 149 if (channel !is null && !channel.isClosed()) { 150 channel.close(); 151 } 152 continue; 153 } else { 154 handleChannelEvent(ev.operation, ev.channel, bytes); 155 } 156 } 157 } 158 159 return 0; 160 } 161 162 private void handleChannelEvent(IocpOperation op, AbstractChannel channel, DWORD bytes) { 163 164 version (HUNT_IO_DEBUG) 165 infof("ev.operation: %s, fd=%d", op, channel.handle); 166 167 switch (op) { 168 case IocpOperation.accept: 169 channel.onRead(); 170 break; 171 case IocpOperation.connect: 172 onSocketRead(channel, 0); 173 (cast(AbstractStream)channel).beginRead(); 174 break; 175 case IocpOperation.read: 176 onSocketRead(channel, bytes); 177 break; 178 case IocpOperation.write: 179 onSocketWrite(channel, bytes); 180 break; 181 case IocpOperation.event: 182 channel.onRead(); 183 break; 184 case IocpOperation.close: 185 break; 186 default: 187 warning("unsupported operation type: ", op); 188 break; 189 } 190 } 191 192 override void stop() { 193 super.stop(); 194 // weakUp(); 195 PostQueuedCompletionStatus(_iocpHandle, 0, 0, null); 196 } 197 198 void handleTimer() { 199 200 } 201 202 // override void dispose() { 203 204 // } 205 206 private void onSocketRead(AbstractChannel channel, size_t len) { 207 debug if (channel is null) { 208 warning("channel is null"); 209 return; 210 } 211 212 if (channel is null) 213 { 214 warning("channel is null"); 215 return; 216 } 217 218 // (cast(AbstractStream)channel).setBusyWrite(false); 219 220 if (len == 0 || channel.isClosed) { 221 version (HUNT_IO_DEBUG) 222 infof("channel [fd=%d] closed. isClosed: %s, len: %d", channel.handle, channel.isClosed, len); 223 //channel.close(); 224 return; 225 } 226 227 AbstractSocketChannel socketChannel = cast(AbstractSocketChannel) channel; 228 // assert(socketChannel !is null, "The type of channel is: " ~ typeid(channel).name); 229 if (socketChannel is null) { 230 warning("The channel socket is null: "); 231 } else { 232 socketChannel.setRead(len); 233 channel.onRead(); 234 } 235 } 236 237 private void onSocketWrite(AbstractChannel channel, size_t len) { 238 debug if (channel is null) { 239 warning("channel is null"); 240 return; 241 } 242 AbstractStream client = cast(AbstractStream) channel; 243 // assert(client !is null, "The type of channel is: " ~ typeid(channel).name); 244 if (client is null) { 245 warning("The channel socket is null: "); 246 return; 247 } 248 client.onWriteDone(len); // Notify the client about how many bytes actually sent. 249 } 250 251 252 private: 253 HANDLE _iocpHandle; 254 CustomTimer _timer; 255 HANDLE _stopEvent; 256 }