1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.IOCP; 13 14 // dfmt off 15 version (HAVE_IOCP) : 16 // dfmt on 17 18 import hunt.io.socket.Common; 19 import hunt.io.socket; 20 import hunt.event.timer; 21 import hunt.logging; 22 import hunt.system.Error; 23 24 import core.sys.windows.windows; 25 import std.conv; 26 27 /** 28 */ 29 class AbstractSelector : Selector { 30 this() { 31 _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); 32 if (_iocpHandle is null) 33 errorf("CreateIoCompletionPort failed: %d\n", GetLastError()); 34 _event = new EventChannel(this); 35 _timer.init(); 36 } 37 38 ~this() { 39 // import std.socket; 40 // std.socket.close(_iocpHandle); 41 } 42 43 override bool register(AbstractChannel channel) { 44 assert(channel !is null); 45 ChannelType ct = channel.type; 46 auto fd = channel.handle; 47 version (HUNT_DEBUG) 48 tracef("register, channel(fd=%d, type=%s)", fd, ct); 49 50 if (ct == ChannelType.Timer) { 51 AbstractTimer timerChannel = cast(AbstractTimer) channel; 52 assert(timerChannel !is null); 53 if (!timerChannel.setTimerOut()) 54 return false; 55 _timer.timeWheel().addNewTimer(timerChannel.timer, timerChannel.wheelSize()); 56 } else if (ct == ChannelType.TCP 57 || ct == ChannelType.Accept || ct == ChannelType.UDP) { 58 version (HUNT_DEBUG) 59 trace("Run CreateIoCompletionPort on socket: ", fd); 60 61 _event.setNext(channel); 62 CreateIoCompletionPort(cast(HANDLE) fd, _iocpHandle, 63 cast(size_t)(cast(void*) channel), 0); 64 } else { 65 warningf("Can't register a channel: %s", ct); 66 } 67 return true; 68 } 69 70 override bool reregister(AbstractChannel channel) { 71 throw new LoopException("IOCP does not support reregister!"); 72 } 73 74 override bool deregister(AbstractChannel channel) { 75 // FIXME: Needing refactor or cleanup -@Administrator at 8/28/2018, 3:28:18 PM 76 // https://stackoverflow.com/questions/6573218/removing-a-handle-from-a-i-o-completion-port-and-other-questions-about-iocp 77 //tracef("deregister (fd=%d)", channel.handle); 78 79 // IocpContext _data; 80 // _data.channel = channel; 81 // _data.operation = IocpOperation.close; 82 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 83 84 return true; 85 } 86 87 void weakUp() { 88 IocpContext _data; 89 _data.channel = _event; 90 _data.operation = IocpOperation.event; 91 92 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 93 PostQueuedCompletionStatus(_iocpHandle, 0, 0, null); 94 } 95 96 override void onLoop(scope void delegate() weakup, long timeout = -1) { 97 _timer.init(); 98 super.onLoop(weakup, timeout); 99 } 100 101 override protected int doSelect(long t) { 102 auto timeout = _timer.doWheel(); 103 OVERLAPPED* overlapped; 104 ULONG_PTR key = 0; 105 DWORD bytes = 0; 106 107 // const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, 108 // &key, &overlapped, INFINITE); 109 // tracef("GetQueuedCompletionStatus, ret=%d", ret); 110 111 // trace("timeout=", timeout); 112 const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, &key, 113 &overlapped, timeout); 114 115 IocpContext* ev = cast(IocpContext*) overlapped; 116 if (ret == 0) { 117 const auto erro = GetLastError(); 118 // About ERROR_OPERATION_ABORTED 119 // https://stackoverflow.com/questions/7228703/the-i-o-operation-has-been-aborted-because-of-either-a-thread-exit-or-an-applica 120 if (erro == WAIT_TIMEOUT || erro == ERROR_OPERATION_ABORTED) // 121 return ret; 122 123 debug errorf("error occurred, code=%d, message: %s", erro, getErrorMessage(erro)); 124 assert(ev !is null); 125 // if (ev !is null) { 126 AbstractChannel channel = ev.channel; 127 if (channel !is null && !channel.isClosed()) 128 channel.close(); 129 // } 130 } else if (ev is null || ev.channel is null) 131 warning("ev is null or ev.watche is null"); 132 else 133 handleIocpOperation(ev.operation, ev.channel, bytes); 134 return ret; 135 } 136 137 private void handleIocpOperation(IocpOperation op, AbstractChannel channel, DWORD bytes) { 138 139 version (HUNT_DEBUG) 140 info("ev.operation: ", op); 141 142 switch (op) { 143 case IocpOperation.accept: 144 channel.onRead(); 145 break; 146 case IocpOperation.connect: 147 onSocketRead(channel, 0); 148 break; 149 case IocpOperation.read: 150 onSocketRead(channel, bytes); 151 break; 152 case IocpOperation.write: 153 onSocketWrite(channel, bytes); 154 break; 155 case IocpOperation.event: 156 channel.onRead(); 157 break; 158 case IocpOperation.close: 159 warning("close: ",); 160 break; 161 default: 162 warning("unsupported operation type: ", op); 163 break; 164 } 165 } 166 167 override void stop() { 168 super.stop(); 169 weakUp(); 170 } 171 172 void handleTimer() { 173 174 } 175 176 override void dispose() { 177 178 } 179 180 private void onSocketRead(AbstractChannel channel, size_t len) { 181 debug if (channel is null) { 182 warning("channel is null"); 183 return; 184 } 185 186 if (len == 0 || channel.isClosed) { 187 version (HUNT_DEBUG) 188 info("channel closed"); 189 return; 190 } 191 192 AbstractSocketChannel socketChannel = cast(AbstractSocketChannel) channel; 193 // assert(socketChannel !is null, "The type of channel is: " ~ typeid(channel).name); 194 if (socketChannel is null) { 195 warning("The channel socket is null: "); 196 } else { 197 socketChannel.setRead(len); 198 channel.onRead(); 199 } 200 } 201 202 private void onSocketWrite(AbstractChannel channel, size_t len) { 203 debug if (channel is null) { 204 warning("channel is null"); 205 return; 206 } 207 AbstractStream client = cast(AbstractStream) channel; 208 // assert(client !is null, "The type of channel is: " ~ typeid(channel).name); 209 if (client is null) { 210 warning("The channel socket is null: "); 211 return; 212 } 213 client.onWriteDone(len); // Notify the client about how many bytes actually sent. 214 } 215 216 private: 217 HANDLE _iocpHandle; 218 EventChannel _event; 219 CustomTimer _timer; 220 }