1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.IOCP; 13 14 // dfmt off 15 version (HAVE_IOCP) : 16 // dfmt on 17 18 import hunt.event.selector.Selector; 19 import hunt.io.channel.Common; 20 import hunt.io.channel; 21 import hunt.event.timer; 22 import hunt.logging.ConsoleLogger; 23 import hunt.system.Error; 24 import hunt.io.channel.iocp.AbstractStream; 25 import core.sys.windows.windows; 26 import std.conv; 27 import std.socket; 28 import hunt.concurrency.TaskPool; 29 import std.container : DList; 30 31 32 /** 33 * 34 */ 35 class AbstractSelector : Selector { 36 37 this(size_t number, size_t divider,TaskPool pool = null, size_t maxChannels = 1500) { 38 _taskPool = pool; 39 super(number, divider, maxChannels); 40 _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); 41 if (_iocpHandle is null) 42 errorf("CreateIoCompletionPort failed: %d\n", GetLastError()); 43 _timer.init(); 44 _stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 45 } 46 47 ~this() { 48 // import std.socket; 49 // std.socket.close(_iocpHandle); 50 } 51 52 override bool register(AbstractChannel channel) { 53 assert(channel !is null); 54 ChannelType ct = channel.type; 55 auto fd = channel.handle; 56 version (HUNT_IO_DEBUG) 57 tracef("register, channel(fd=%d, type=%s)", fd, ct); 58 59 super.register(channel); 60 61 if (ct == ChannelType.Timer) { 62 AbstractTimer timerChannel = cast(AbstractTimer) channel; 63 assert(timerChannel !is null); 64 if (!timerChannel.setTimerOut()) 65 return false; 66 _timer.timeWheel().addNewTimer(timerChannel.timer, timerChannel.wheelSize()); 67 } else if (ct == ChannelType.TCP 68 || ct == ChannelType.Accept || ct == ChannelType.UDP) { 69 version (HUNT_IO_DEBUG) 70 trace("Run CreateIoCompletionPort on socket: ", fd); 71 72 // _event.setNext(channel); 73 CreateIoCompletionPort(cast(HANDLE) fd, _iocpHandle, 74 cast(size_t)(cast(void*) channel), 0); 75 76 //cast(AbstractStream)channel) 77 } else { 78 warningf("Can't register a channel: %s", ct); 79 } 80 81 auto stream = cast(AbstractStream)channel; 82 if (stream !is null && !stream.isClient()) { 83 stream.beginRead(); 84 } 85 86 return true; 87 } 88 89 override bool deregister(AbstractChannel channel) { 90 // FIXME: Needing refactor or cleanup -@Administrator at 8/28/2018, 3:28:18 PM 91 // https://stackoverflow.com/questions/6573218/removing-a-handle-from-a-i-o-completion-port-and-other-questions-about-iocp 92 version(HUNT_IO_DEBUG) 93 tracef("deregister (fd=%d)", channel.handle); 94 95 96 97 // IocpContext _data; 98 // _data.channel = channel; 99 // _data.operation = IocpOperation.close; 100 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 101 //(cast(AbstractStream)channel).stopAction(); 102 //WaitForSingleObject 103 return super.deregister(channel); 104 } 105 106 // void weakUp() { 107 // IocpContext _data; 108 // // _data.channel = _event; 109 // _data.operation = IocpOperation.event; 110 111 // // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 112 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, null); 113 // } 114 115 override void onLoop(long timeout = -1) { 116 _timer.init(); 117 super.onLoop(timeout); 118 } 119 120 protected override int doSelect(long t) { 121 auto timeout = _timer.doWheel(); 122 OVERLAPPED* overlapped; 123 ULONG_PTR key = 0; 124 DWORD bytes = 0; 125 IocpContext* ev; 126 127 while( WAIT_OBJECT_0 != WaitForSingleObject(_stopEvent , 0) && !isStopping()) { 128 // https://docs.microsoft.com/zh-cn/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatus 129 const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, &key, 130 &overlapped, INFINITE); 131 132 ev = cast(IocpContext*) overlapped; 133 // ev = cast(IocpContext *)( cast(PCHAR)(overlapped) - cast(ULONG_PTR)(&(cast(IocpContext*)0).overlapped)); 134 if (ret == 0) { 135 DWORD dwErr = GetLastError(); 136 if (WAIT_TIMEOUT == dwErr) { 137 continue; 138 } else { 139 assert(ev !is null, "The IocpContext is null"); 140 AbstractChannel channel = ev.channel; 141 if (channel !is null && !channel.isClosed()) { 142 channel.close(); 143 } 144 continue; 145 } 146 } else if (ev is null || ev.channel is null) { 147 version(HUNT_IO_DEBUG) warningf("The ev is null or ev.watche is null. isStopping: %s", isStopping()); 148 } else { 149 if (0 == bytes && (ev.operation == IocpOperation.read || ev.operation == IocpOperation.write)) { 150 AbstractChannel channel = ev.channel; 151 if (channel !is null && !channel.isClosed()) { 152 channel.close(); 153 } 154 continue; 155 } else { 156 handleChannelEvent(ev.operation, ev.channel, bytes); 157 } 158 } 159 } 160 161 return 0; 162 } 163 164 private void handleChannelEvent(IocpOperation op, AbstractChannel channel, DWORD bytes) { 165 166 version (HUNT_IO_DEBUG) 167 infof("ev.operation: %s, fd=%d", op, channel.handle); 168 169 switch (op) { 170 case IocpOperation.accept: 171 channel.onRead(); 172 break; 173 case IocpOperation.connect: 174 onSocketRead(channel, 0); 175 (cast(AbstractStream)channel).beginRead(); 176 break; 177 case IocpOperation.read: 178 onSocketRead(channel, bytes); 179 break; 180 case IocpOperation.write: 181 onSocketWrite(channel, bytes); 182 break; 183 case IocpOperation.event: 184 channel.onRead(); 185 break; 186 case IocpOperation.close: 187 break; 188 default: 189 warning("unsupported operation type: ", op); 190 break; 191 } 192 } 193 194 override void stop() { 195 super.stop(); 196 // weakUp(); 197 PostQueuedCompletionStatus(_iocpHandle, 0, 0, null); 198 } 199 200 void handleTimer() { 201 202 } 203 204 // override void dispose() { 205 206 // } 207 208 private void onSocketRead(AbstractChannel channel, size_t len) { 209 debug if (channel is null) { 210 warning("channel is null"); 211 return; 212 } 213 214 if (channel is null) 215 { 216 warning("channel is null"); 217 return; 218 } 219 220 // (cast(AbstractStream)channel).setBusyWrite(false); 221 222 if (len == 0 || channel.isClosed) { 223 version (HUNT_IO_DEBUG) 224 infof("channel [fd=%d] closed. isClosed: %s, len: %d", channel.handle, channel.isClosed, len); 225 //channel.close(); 226 return; 227 } 228 229 AbstractSocketChannel socketChannel = cast(AbstractSocketChannel) channel; 230 // assert(socketChannel !is null, "The type of channel is: " ~ typeid(channel).name); 231 if (socketChannel is null) { 232 warning("The channel socket is null: "); 233 } else { 234 socketChannel.setRead(len); 235 channel.onRead(); 236 } 237 } 238 239 private void onSocketWrite(AbstractChannel channel, size_t len) { 240 debug if (channel is null) { 241 warning("channel is null"); 242 return; 243 } 244 AbstractStream client = cast(AbstractStream) channel; 245 // assert(client !is null, "The type of channel is: " ~ typeid(channel).name); 246 if (client is null) { 247 warning("The channel socket is null: "); 248 return; 249 } 250 client.onWriteDone(len); // Notify the client about how many bytes actually sent. 251 } 252 253 254 private: 255 HANDLE _iocpHandle; 256 CustomTimer _timer; 257 HANDLE _stopEvent; 258 TaskPool _taskPool; 259 }