1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.Epoll; 13 14 // dfmt off 15 version(HAVE_EPOLL): 16 17 // dfmt on 18 19 import std.exception; 20 import std.socket; 21 import std.string; 22 23 import core.time; 24 import core.stdc.string; 25 import core.stdc.errno; 26 import core.sys.posix.sys.types; 27 import core.sys.posix.netinet.tcp; 28 import core.sys.posix.netinet.in_; 29 import core.sys.posix.unistd; 30 31 import core.sys.posix.sys.resource; 32 import core.sys.posix.sys.time; 33 import core.sys.linux.epoll; 34 35 import hunt.event.selector.Selector; 36 import hunt.Exceptions; 37 import hunt.io.channel; 38 import hunt.logging.ConsoleLogger; 39 import hunt.event.timer; 40 import hunt.system.Error; 41 import hunt.concurrency.TaskPool; 42 43 /* Max. theoretical number of file descriptors on system. */ 44 __gshared size_t fdLimit = 0; 45 46 shared static this() { 47 rlimit fileLimit; 48 getrlimit(RLIMIT_NOFILE, &fileLimit); 49 fdLimit = fileLimit.rlim_max; 50 } 51 52 53 /** 54 * 55 */ 56 class AbstractSelector : Selector { 57 enum int NUM_KEVENTS = 1024; 58 private int _epollFD; 59 private bool isDisposed = false; 60 private epoll_event[NUM_KEVENTS] events; 61 private EventChannel _eventChannel; 62 private TaskPool _taskPool; 63 64 this(size_t id, size_t divider, TaskPool pool = null, size_t maxChannels = 1500) { 65 _taskPool = pool; 66 super(id, divider, maxChannels); 67 68 // http://man7.org/linux/man-pages/man2/epoll_create.2.html 69 /* 70 * Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. 71 * See the description of the O_CLOEXEC flag in open(2) for reasons why 72 * this may be useful. 73 */ 74 _epollFD = epoll_create1(EPOLL_CLOEXEC); 75 if (_epollFD < 0) 76 throw new IOException("epoll_create failed"); 77 78 _eventChannel = new EpollEventChannel(this); 79 register(_eventChannel); 80 } 81 82 ~this() @nogc { 83 // dispose(); 84 } 85 86 override void dispose() { 87 if (isDisposed) 88 return; 89 90 version (HUNT_IO_DEBUG) 91 tracef("disposing selector[fd=%d]...", _epollFD); 92 isDisposed = true; 93 _eventChannel.close(); 94 int r = core.sys.posix.unistd.close(_epollFD); 95 if(r != 0) { 96 version (HUNT_IO_DEBUG) warningf("error: %d", r); 97 } 98 99 super.dispose(); 100 } 101 102 override void onStop() { 103 version (HUNT_IO_DEBUG) 104 infof("Selector stopping. fd=%d, id: %d", _epollFD, getId()); 105 106 if(!_eventChannel.isClosed()) { 107 _eventChannel.trigger(); 108 // _eventChannel.onWrite(); 109 } 110 } 111 112 override bool register(AbstractChannel channel) { 113 super.register(channel); 114 115 // epoll_event e; 116 117 // e.data.fd = infd; 118 // e.data.ptr = cast(void*) channel; 119 // e.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT; 120 // int s = epoll_ctl(_epollFD, EPOLL_CTL_ADD, infd, &e); 121 // if (s == -1) { 122 // debug warningf("failed to register channel: fd=%d", infd); 123 // return false; 124 // } else { 125 // return true; 126 // } 127 if (epollCtl(channel, EPOLL_CTL_ADD)) { 128 return true; 129 } else { 130 debug warningf("failed to register channel: fd=%d", channel.handle); 131 return false; 132 } 133 } 134 135 override bool deregister(AbstractChannel channel) { 136 super.deregister(channel); 137 138 if (epollCtl(channel, EPOLL_CTL_DEL)) { 139 return true; 140 } else { 141 warningf("deregister channel failed: fd=%d", fd); 142 return false; 143 } 144 } 145 146 override bool update(AbstractChannel channel) { 147 if (epollCtl(channel, EPOLL_CTL_MOD)) { 148 return true; 149 } else { 150 warningf("update channel failed: fd=%d", fd); 151 return false; 152 } 153 } 154 155 /** 156 timeout: in millisecond 157 */ 158 protected override int doSelect(long timeout) { 159 int len = 0; 160 161 if (timeout <= 0) { /* Indefinite or no wait */ 162 do { 163 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html 164 // https://stackoverflow.com/questions/6870158/epoll-wait-fails-due-to-eintr-how-to-remedy-this/6870391#6870391 165 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int) timeout); 166 } while ((len == -1) && (errno == EINTR)); 167 } else { /* Bounded wait; bounded restarts */ 168 len = iepoll(_epollFD, events.ptr, events.length, cast(int) timeout); 169 } 170 171 if(_taskPool is null) { 172 foreach (i; 0 .. len) { 173 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr); 174 if (channel is null) { 175 debug warningf("channel is null"); 176 } else { 177 handeChannelEvent(channel, events[i].events); 178 } 179 } 180 } else { // using worker thread 181 foreach (i; 0 .. len) { 182 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr); 183 if (channel is null) { 184 debug warningf("channel is null"); 185 } else { 186 uint currentEvents = events[i].events; 187 _taskPool.put(cast(int)channel.handle, makeTask(&handeChannelEvent, channel, currentEvents)); 188 } 189 } 190 } 191 192 return len; 193 } 194 195 private void handeChannelEvent(AbstractChannel channel, uint event) { 196 version (HUNT_IO_DEBUG) 197 infof("handling event: selector=%d, events=%d, channel=%d", this._epollFD, event, channel.handle); 198 199 try { 200 if (isClosed(event)) { // && errno != EINTR 201 /* An error has occured on this fd, or the socket is not 202 ready for reading (why were we notified then?) */ 203 version (HUNT_IO_DEBUG) { 204 if (isError(event)) { 205 warningf("channel error: fd=%s, event=%d, errno=%d, message=%s", 206 channel.handle, event, errno, getErrorMessage(errno)); 207 } else { 208 tracef("channel closed: fd=%d, errno=%d, message=%s", 209 channel.handle, errno, getErrorMessage(errno)); 210 } 211 } 212 // FIXME: Needing refactor or cleanup -@zxp at 2/28/2019, 3:25:24 PM 213 // May be triggered twice for a channel, for example: 214 // events=8197, fd=13 215 // events=8221, fd=13 216 // The remote connection broken abnormally, so the channel should be notified. 217 // channel.onClose(); // More tests are needed 218 channel.close(); 219 } else if (event == EPOLLIN) { 220 version (HUNT_IO_DEBUG) 221 tracef("channel read event: fd=%d", channel.handle); 222 channel.onRead(); 223 } else if (event == EPOLLOUT) { 224 version (HUNT_IO_DEBUG) 225 tracef("channel write event: fd=%d", channel.handle); 226 channel.onWrite(); 227 } else if (event == (EPOLLIN | EPOLLOUT)) { 228 version (HUNT_IO_DEBUG) 229 tracef("channel read and write: fd=%d", channel.handle); 230 channel.onWrite(); 231 channel.onRead(); 232 } else { 233 debug warningf("this thread only for read/write/close events, event: %d", event); 234 } 235 } catch (Exception e) { 236 debug { 237 errorf("error while handing channel: fd=%s, exception=%s, message=%s", 238 channel.handle, typeid(e), e.msg); 239 } 240 version(HUNT_DEBUG) warning(e); 241 } 242 } 243 244 private int iepoll(int epfd, epoll_event* events, int numfds, int timeout) { 245 long start, now; 246 int remaining = timeout; 247 timeval t; 248 long diff; 249 250 gettimeofday(&t, null); 251 start = t.tv_sec * 1000 + t.tv_usec / 1000; 252 253 for (;;) { 254 int res = epoll_wait(epfd, events, numfds, remaining); 255 if (res < 0 && errno == EINTR) { 256 if (remaining >= 0) { 257 gettimeofday(&t, null); 258 now = t.tv_sec * 1000 + t.tv_usec / 1000; 259 diff = now - start; 260 remaining -= diff; 261 if (diff < 0 || remaining <= 0) { 262 return 0; 263 } 264 start = now; 265 } 266 } else { 267 return res; 268 } 269 } 270 } 271 272 // https://blog.csdn.net/ljx0305/article/details/4065058 273 private static bool isError(uint events) nothrow { 274 return (events & EPOLLERR) != 0; 275 } 276 277 private static bool isClosed(uint e) nothrow { 278 return (e & EPOLLERR) != 0 || (e & EPOLLHUP) != 0 || (e & EPOLLRDHUP) != 0 279 || (!(e & EPOLLIN) && !(e & EPOLLOUT)) != 0; 280 } 281 282 private static bool isReadable(uint events) nothrow { 283 return (events & EPOLLIN) != 0; 284 } 285 286 private static bool isWritable(uint events) nothrow { 287 return (events & EPOLLOUT) != 0; 288 } 289 290 private static buildEpollEvent(AbstractChannel channel, ref epoll_event ev) { 291 ev.data.ptr = cast(void*) channel; 292 // ev.data.fd = channel.handle; 293 ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP; 294 if (channel.hasFlag(ChannelFlag.Read)) 295 ev.events |= EPOLLIN; 296 if (channel.hasFlag(ChannelFlag.Write)) 297 ev.events |= EPOLLOUT; 298 // if (channel.hasFlag(ChannelFlag.OneShot)) 299 // ev.events |= EPOLLONESHOT; 300 if (channel.hasFlag(ChannelFlag.ETMode)) 301 ev.events |= EPOLLET; 302 return ev; 303 } 304 305 private bool epollCtl(AbstractChannel channel, int opcode) { 306 assert(channel !is null); 307 const fd = channel.handle; 308 assert(fd >= 0, "The channel.handle is not initialized!"); 309 310 epoll_event ev; 311 buildEpollEvent(channel, ev); 312 int res = 0; 313 314 do { 315 res = epoll_ctl(_epollFD, opcode, fd, &ev); 316 } 317 while ((res == -1) && (errno == EINTR)); 318 319 /* 320 * A channel may be registered with several Selectors. When each Selector 321 * is polled a EPOLL_CTL_DEL op will be inserted into its pending update 322 * list to remove the file descriptor from epoll. The "last" Selector will 323 * close the file descriptor which automatically unregisters it from each 324 * epoll descriptor. To avoid costly synchronization between Selectors we 325 * allow pending updates to be processed, ignoring errors. The errors are 326 * harmless as the last update for the file descriptor is guaranteed to 327 * be EPOLL_CTL_DEL. 328 */ 329 if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) { 330 warning("epoll_ctl failed"); 331 return false; 332 } else 333 return true; 334 } 335 }