1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.IOCP; 13 14 // dfmt off 15 version (HAVE_IOCP) : 16 // dfmt on 17 18 import hunt.event.selector.Selector; 19 import hunt.io.channel.Common; 20 import hunt.io.channel; 21 import hunt.event.timer; 22 import hunt.logging.ConsoleLogger; 23 import hunt.system.Error; 24 import hunt.io.channel.iocp.AbstractStream; 25 import core.sys.windows.windows; 26 import std.conv; 27 import std.socket; 28 import hunt.util.TaskPool; 29 import std.container : DList; 30 31 32 /** 33 * 34 */ 35 class AbstractSelector : Selector { 36 37 this(size_t number, size_t divider,TaskPool pool = null, size_t maxChannels = 1500) { 38 _taskPool = pool; 39 super(number, divider, maxChannels); 40 _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); 41 if (_iocpHandle is null) 42 errorf("CreateIoCompletionPort failed: %d\n", GetLastError()); 43 _timer.init(); 44 _stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 45 } 46 47 ~this() { 48 // import std.socket; 49 // std.socket.close(_iocpHandle); 50 } 51 52 override bool register(AbstractChannel channel) { 53 super.register(channel); 54 55 ChannelType ct = channel.type; 56 auto fd = channel.handle; 57 version (HUNT_IO_DEBUG) 58 tracef("register, channel(fd=%d, type=%s)", fd, ct); 59 60 if (ct == ChannelType.Timer) { 61 AbstractTimer timerChannel = cast(AbstractTimer) channel; 62 assert(timerChannel !is null); 63 if (!timerChannel.setTimerOut()) 64 return false; 65 _timer.timeWheel().addNewTimer(timerChannel.timer, timerChannel.wheelSize()); 66 } else if (ct == ChannelType.TCP 67 || ct == ChannelType.Accept || ct == ChannelType.UDP) { 68 version (HUNT_IO_DEBUG) 69 trace("Run CreateIoCompletionPort on socket: ", fd); 70 71 // _event.setNext(channel); 72 CreateIoCompletionPort(cast(HANDLE) fd, _iocpHandle, 73 cast(size_t)(cast(void*) channel), 0); 74 75 //cast(AbstractStream)channel) 76 } else { 77 warningf("Can't register a channel: %s", ct); 78 } 79 80 auto stream = cast(AbstractStream)channel; 81 if (stream !is null && !stream.isClient()) { 82 stream.beginRead(); 83 } 84 85 return true; 86 } 87 88 override bool deregister(AbstractChannel channel) { 89 // FIXME: Needing refactor or cleanup -@Administrator at 8/28/2018, 3:28:18 PM 90 // https://stackoverflow.com/questions/6573218/removing-a-handle-from-a-i-o-completion-port-and-other-questions-about-iocp 91 version (HUNT_IO_DEBUG) 92 tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type); 93 94 // IocpContext _data; 95 // _data.channel = channel; 96 // _data.operation = IocpOperation.close; 97 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 98 //(cast(AbstractStream)channel).stopAction(); 99 //WaitForSingleObject 100 return super.deregister(channel); 101 } 102 103 // void weakUp() { 104 // IocpContext _data; 105 // // _data.channel = _event; 106 // _data.operation = IocpOperation.event; 107 108 // // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 109 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, null); 110 // } 111 112 override void onLoop(long timeout = -1) { 113 _timer.init(); 114 super.onLoop(timeout); 115 } 116 117 protected override int doSelect(long t) { 118 auto timeout = _timer.doWheel(); 119 OVERLAPPED* overlapped; 120 ULONG_PTR key = 0; 121 DWORD bytes = 0; 122 IocpContext* ev; 123 124 while( WAIT_OBJECT_0 != WaitForSingleObject(_stopEvent , 0) && !isStopping()) { 125 // https://docs.microsoft.com/zh-cn/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatus 126 const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, &key, 127 &overlapped, INFINITE); 128 129 ev = cast(IocpContext*) overlapped; 130 // ev = cast(IocpContext *)( cast(PCHAR)(overlapped) - cast(ULONG_PTR)(&(cast(IocpContext*)0).overlapped)); 131 if (ret == 0) { 132 DWORD dwErr = GetLastError(); 133 if (WAIT_TIMEOUT == dwErr) { 134 continue; 135 } else { 136 assert(ev !is null, "The IocpContext is null"); 137 AbstractChannel channel = ev.channel; 138 if (channel !is null && !channel.isClosed()) { 139 channel.close(); 140 } 141 continue; 142 } 143 } else if (ev is null || ev.channel is null) { 144 version(HUNT_IO_DEBUG) warningf("The ev is null or ev.watche is null. isStopping: %s", isStopping()); 145 } else { 146 if (0 == bytes && (ev.operation == IocpOperation.read || ev.operation == IocpOperation.write)) { 147 AbstractChannel channel = ev.channel; 148 if (channel !is null && !channel.isClosed()) { 149 channel.close(); 150 } 151 continue; 152 } else { 153 handleChannelEvent(ev.operation, ev.channel, bytes); 154 } 155 } 156 } 157 158 return 0; 159 } 160 161 private void handleChannelEvent(IocpOperation op, AbstractChannel channel, DWORD bytes) { 162 163 version (HUNT_IO_DEBUG) 164 infof("ev.operation: %s, fd=%d", op, channel.handle); 165 166 switch (op) { 167 case IocpOperation.accept: 168 channel.onRead(); 169 break; 170 case IocpOperation.connect: 171 onSocketRead(channel, 0); 172 (cast(AbstractStream)channel).beginRead(); 173 break; 174 case IocpOperation.read: 175 onSocketRead(channel, bytes); 176 break; 177 case IocpOperation.write: 178 onSocketWrite(channel, bytes); 179 break; 180 case IocpOperation.event: 181 channel.onRead(); 182 break; 183 case IocpOperation.close: 184 break; 185 default: 186 warning("unsupported operation type: ", op); 187 break; 188 } 189 } 190 191 override void stop() { 192 super.stop(); 193 // weakUp(); 194 PostQueuedCompletionStatus(_iocpHandle, 0, 0, null); 195 } 196 197 void handleTimer() { 198 199 } 200 201 // override void dispose() { 202 203 // } 204 205 private void onSocketRead(AbstractChannel channel, size_t len) { 206 debug if (channel is null) { 207 warning("channel is null"); 208 return; 209 } 210 211 if (channel is null) 212 { 213 warning("channel is null"); 214 return; 215 } 216 217 // (cast(AbstractStream)channel).setBusyWrite(false); 218 219 if (len == 0 || channel.isClosed) { 220 version (HUNT_IO_DEBUG) 221 infof("channel [fd=%d] closed. isClosed: %s, len: %d", channel.handle, channel.isClosed, len); 222 //channel.close(); 223 return; 224 } 225 226 AbstractSocketChannel socketChannel = cast(AbstractSocketChannel) channel; 227 // assert(socketChannel !is null, "The type of channel is: " ~ typeid(channel).name); 228 if (socketChannel is null) { 229 warning("The channel socket is null: "); 230 } else { 231 socketChannel.setRead(len); 232 channel.onRead(); 233 } 234 } 235 236 private void onSocketWrite(AbstractChannel channel, size_t len) { 237 debug if (channel is null) { 238 warning("channel is null"); 239 return; 240 } 241 AbstractStream client = cast(AbstractStream) channel; 242 // assert(client !is null, "The type of channel is: " ~ typeid(channel).name); 243 if (client is null) { 244 warning("The channel socket is null: "); 245 return; 246 } 247 client.onWriteDone(len); // Notify the client about how many bytes actually sent. 248 } 249 250 251 private: 252 HANDLE _iocpHandle; 253 CustomTimer _timer; 254 HANDLE _stopEvent; 255 TaskPool _taskPool; 256 }