1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.Epoll; 13 14 // dfmt off 15 version(HAVE_EPOLL): 16 17 // dfmt on 18 19 import std.exception; 20 import std.socket; 21 import std.string; 22 23 import core.sys.posix.sys.types; 24 import core.sys.posix.netinet.tcp; 25 import core.sys.posix.netinet.in_; 26 import core.sys.posix.unistd; 27 import core.stdc.string; 28 import core.stdc.errno; 29 import core.time; 30 import core.thread; 31 32 import core.sys.posix.sys.resource; 33 import core.sys.posix.sys.time; 34 import core.sys.linux.epoll; 35 36 import hunt.event.selector.Selector; 37 import hunt.Exceptions; 38 import hunt.io.channel; 39 import hunt.logging.ConsoleLogger; 40 import hunt.event.timer; 41 import hunt.system.Error; 42 import hunt.util.TaskPool; 43 44 /* Max. theoretical number of file descriptors on system. */ 45 __gshared size_t fdLimit = 0; 46 47 shared static this() { 48 rlimit fileLimit; 49 getrlimit(RLIMIT_NOFILE, &fileLimit); 50 fdLimit = fileLimit.rlim_max; 51 } 52 53 54 /** 55 * 56 */ 57 class AbstractSelector : Selector { 58 enum int NUM_KEVENTS = 1024; 59 private int _epollFD; 60 private bool isDisposed = false; 61 private epoll_event[NUM_KEVENTS] events; 62 private EventChannel _eventChannel; 63 private TaskPool _taskPool; 64 65 this(size_t id, size_t divider, TaskPool pool = null, size_t maxChannels = 1500) { 66 _taskPool = pool; 67 super(id, divider, maxChannels); 68 69 // http://man7.org/linux/man-pages/man2/epoll_create.2.html 70 /* 71 * Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. 72 * See the description of the O_CLOEXEC flag in open(2) for reasons why 73 * this may be useful. 74 */ 75 _epollFD = epoll_create1(EPOLL_CLOEXEC); 76 if (_epollFD < 0) 77 throw new IOException("epoll_create failed"); 78 79 _eventChannel = new EpollEventChannel(this); 80 register(_eventChannel); 81 } 82 83 ~this() @nogc { 84 // dispose(); 85 } 86 87 override void dispose() { 88 if (isDisposed) 89 return; 90 91 version (HUNT_IO_DEBUG) 92 tracef("disposing selector[fd=%d]...", _epollFD); 93 isDisposed = true; 94 _eventChannel.close(); 95 int r = core.sys.posix.unistd.close(_epollFD); 96 if(r != 0) { 97 version (HUNT_IO_DEBUG) warningf("error: %d", r); 98 } 99 100 super.dispose(); 101 } 102 103 override void onStop() { 104 version (HUNT_IO_DEBUG) 105 infof("Selector stopping. fd=%d, id: %d", _epollFD, getId()); 106 107 if(!_eventChannel.isClosed()) { 108 _eventChannel.trigger(); 109 // _eventChannel.onWrite(); 110 } 111 } 112 113 override bool register(AbstractChannel channel) { 114 super.register(channel); 115 116 version (HUNT_IO_DEBUG) 117 tracef("register, channel(fd=%d, type=%s)", channel.handle, channel.type); 118 119 // epoll_event e; 120 121 // e.data.fd = infd; 122 // e.data.ptr = cast(void*) channel; 123 // e.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT; 124 // int s = epoll_ctl(_epollFD, EPOLL_CTL_ADD, infd, &e); 125 // if (s == -1) { 126 // debug warningf("failed to register channel: fd=%d", infd); 127 // return false; 128 // } else { 129 // return true; 130 // } 131 if (epollCtl(channel, EPOLL_CTL_ADD)) { 132 return true; 133 } else { 134 debug warningf("failed to register channel: fd=%d", channel.handle); 135 return false; 136 } 137 } 138 139 override bool deregister(AbstractChannel channel) { 140 scope(exit) { 141 super.deregister(channel); 142 version (HUNT_IO_DEBUG) 143 tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type); 144 } 145 146 if (epollCtl(channel, EPOLL_CTL_DEL)) { 147 return true; 148 } else { 149 warningf("deregister channel failed: fd=%d", fd); 150 return false; 151 } 152 } 153 154 // override bool update(AbstractChannel channel) { 155 // if (epollCtl(channel, EPOLL_CTL_MOD)) { 156 // return true; 157 // } else { 158 // warningf("update channel failed: fd=%d", fd); 159 // return false; 160 // } 161 // } 162 163 /** 164 timeout: in millisecond 165 */ 166 protected override int doSelect(long timeout) { 167 int len = 0; 168 169 if (timeout <= 0) { /* Indefinite or no wait */ 170 do { 171 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html 172 // https://stackoverflow.com/questions/6870158/epoll-wait-fails-due-to-eintr-how-to-remedy-this/6870391#6870391 173 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int) timeout); 174 } while ((len == -1) && (errno == EINTR)); 175 } else { /* Bounded wait; bounded restarts */ 176 len = iepoll(_epollFD, events.ptr, events.length, cast(int) timeout); 177 } 178 179 version (HUNT_IO_DEBUG) { 180 warningf("thread: %s", Thread.getThis().name()); 181 } 182 183 if(_taskPool is null) { 184 foreach (i; 0 .. len) { 185 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr); 186 if (channel is null) { 187 debug warningf("channel is null"); 188 } else { 189 handeChannelEvent(channel, events[i].events); 190 } 191 } 192 } else { // using worker thread 193 foreach (i; 0 .. len) { 194 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr); 195 if (channel is null) { 196 debug warningf("channel is null"); 197 } else { 198 uint currentEvents = events[i].events; 199 _taskPool.put(cast(int)channel.handle, makeTask(&handeChannelEvent, channel, currentEvents)); 200 } 201 } 202 } 203 204 return len; 205 } 206 207 private void handeChannelEvent(AbstractChannel channel, uint event) { 208 version (HUNT_IO_DEBUG) { 209 warningf("thread: %s", Thread.getThis().name()); 210 211 // Thread.sleep(300.msecs); 212 infof("handling event: selector=%d, channel=%d, events=%d, isReadable: %s, isWritable: %s, isClosed: %s", 213 this._epollFD, channel.handle, event, isReadable(event), isWritable(event), isClosed(event)); 214 } 215 216 try { 217 if (isClosed(event)) { // && errno != EINTR 218 /* An error has occured on this fd, or the socket is not 219 ready for reading (why were we notified then?) */ 220 version (HUNT_IO_DEBUG) { 221 warningf("event=%d, isReadable: %s, isWritable: %s", 222 event, isReadable(event), isWritable(event)); 223 224 if (isError(event)) { 225 warningf("channel error: fd=%s, event=%d, errno=%d, message=%s", 226 channel.handle, event, errno, getErrorMessage(errno)); 227 } else { 228 tracef("channel closed: fd=%d, errno=%d, message=%s", 229 channel.handle, errno, getErrorMessage(errno)); 230 } 231 } 232 233 // The remote connection broken abnormally, so the channel should be notified. 234 if(isReadable(event)) { 235 channel.onRead(); 236 } 237 238 // if(isWritable(event)) { 239 // channel.onWrite(); 240 // } 241 242 channel.close(); 243 } else if (event == EPOLLIN) { 244 version (HUNT_IO_DEBUG) 245 tracef("channel read event: fd=%d", channel.handle); 246 channel.onRead(); 247 } else if (event == EPOLLOUT) { 248 version (HUNT_IO_DEBUG) 249 tracef("channel write event: fd=%d", channel.handle); 250 channel.onWrite(); 251 } else if (event == (EPOLLIN | EPOLLOUT)) { 252 version (HUNT_IO_DEBUG) 253 tracef("channel read and write: fd=%d", channel.handle); 254 channel.onWrite(); 255 channel.onRead(); 256 } else { 257 debug warningf("Only read/write/close events can be handled, current event: %d", event); 258 } 259 } catch (Exception e) { 260 debug { 261 errorf("error while handing channel: fd=%s, exception=%s, message=%s", 262 channel.handle, typeid(e), e.msg); 263 } 264 version(HUNT_DEBUG) warning(e); 265 } 266 } 267 268 private int iepoll(int epfd, epoll_event* events, int numfds, int timeout) { 269 long start, now; 270 int remaining = timeout; 271 timeval t; 272 long diff; 273 274 gettimeofday(&t, null); 275 start = t.tv_sec * 1000 + t.tv_usec / 1000; 276 277 for (;;) { 278 int res = epoll_wait(epfd, events, numfds, remaining); 279 if (res < 0 && errno == EINTR) { 280 if (remaining >= 0) { 281 gettimeofday(&t, null); 282 now = t.tv_sec * 1000 + t.tv_usec / 1000; 283 diff = now - start; 284 remaining -= diff; 285 if (diff < 0 || remaining <= 0) { 286 return 0; 287 } 288 start = now; 289 } 290 } else { 291 return res; 292 } 293 } 294 } 295 296 // https://blog.csdn.net/ljx0305/article/details/4065058 297 private static bool isError(uint events) nothrow { 298 return (events & EPOLLERR) != 0; 299 } 300 301 private static bool isClosed(uint e) nothrow { 302 return (e & EPOLLERR) != 0 || (e & EPOLLHUP) != 0 || (e & EPOLLRDHUP) != 0 303 || (!(e & EPOLLIN) && !(e & EPOLLOUT)) != 0; 304 } 305 306 private static bool isReadable(uint events) nothrow { 307 return (events & EPOLLIN) != 0; 308 } 309 310 private static bool isWritable(uint events) nothrow { 311 return (events & EPOLLOUT) != 0; 312 } 313 314 private static buildEpollEvent(AbstractChannel channel, ref epoll_event ev) { 315 ev.data.ptr = cast(void*) channel; 316 // ev.data.fd = channel.handle; 317 ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP; 318 if (channel.hasFlag(ChannelFlag.Read)) 319 ev.events |= EPOLLIN; 320 if (channel.hasFlag(ChannelFlag.Write)) 321 ev.events |= EPOLLOUT; 322 // if (channel.hasFlag(ChannelFlag.OneShot)) 323 // ev.events |= EPOLLONESHOT; 324 if (channel.hasFlag(ChannelFlag.ETMode)) 325 ev.events |= EPOLLET; 326 return ev; 327 } 328 329 private bool epollCtl(AbstractChannel channel, int opcode) { 330 assert(channel !is null); 331 const fd = channel.handle; 332 assert(fd >= 0, "The channel.handle is not initialized!"); 333 334 epoll_event ev; 335 buildEpollEvent(channel, ev); 336 int res = 0; 337 338 do { 339 res = epoll_ctl(_epollFD, opcode, fd, &ev); 340 } 341 while ((res == -1) && (errno == EINTR)); 342 343 /* 344 * A channel may be registered with several Selectors. When each Selector 345 * is polled a EPOLL_CTL_DEL op will be inserted into its pending update 346 * list to remove the file descriptor from epoll. The "last" Selector will 347 * close the file descriptor which automatically unregisters it from each 348 * epoll descriptor. To avoid costly synchronization between Selectors we 349 * allow pending updates to be processed, ignoring errors. The errors are 350 * harmless as the last update for the file descriptor is guaranteed to 351 * be EPOLL_CTL_DEL. 352 */ 353 if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) { 354 warning("epoll_ctl failed"); 355 return false; 356 } else 357 return true; 358 } 359 }