1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.Epoll; 13 14 // dfmt off 15 version(HAVE_EPOLL): 16 17 // dfmt on 18 19 import std.exception; 20 import std.socket; 21 import std.string; 22 23 import core.sys.posix.sys.types; 24 import core.sys.posix.netinet.tcp; 25 import core.sys.posix.netinet.in_; 26 import core.sys.posix.unistd; 27 import core.stdc.string; 28 import core.stdc.errno; 29 import core.time; 30 import core.thread; 31 32 import core.sys.posix.sys.resource; 33 import core.sys.posix.sys.time; 34 import core.sys.linux.epoll; 35 36 import hunt.event.selector.Selector; 37 import hunt.Exceptions; 38 import hunt.io.channel; 39 import hunt.logging; 40 import hunt.event.timer; 41 import hunt.system.Error; 42 import hunt.util.worker; 43 44 /* Max. theoretical number of file descriptors on system. */ 45 __gshared size_t fdLimit = 0; 46 47 shared static this() { 48 rlimit fileLimit; 49 getrlimit(RLIMIT_NOFILE, &fileLimit); 50 fdLimit = fileLimit.rlim_max; 51 } 52 53 54 /** 55 * 56 */ 57 class AbstractSelector : Selector { 58 enum int NUM_KEVENTS = 1024; 59 private int _epollFD; 60 private bool isDisposed = false; 61 private epoll_event[NUM_KEVENTS] events; 62 private EventChannel _eventChannel; 63 64 this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) { 65 super(id, divider, worker, maxChannels); 66 67 // http://man7.org/linux/man-pages/man2/epoll_create.2.html 68 /* 69 * Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. 70 * See the description of the O_CLOEXEC flag in open(2) for reasons why 71 * this may be useful. 72 */ 73 _epollFD = epoll_create1(EPOLL_CLOEXEC); 74 if (_epollFD < 0) 75 throw new IOException("epoll_create failed"); 76 77 _eventChannel = new EpollEventChannel(this); 78 register(_eventChannel); 79 } 80 81 ~this() @nogc { 82 // dispose(); 83 } 84 85 override void dispose() { 86 if (isDisposed) 87 return; 88 89 version (HUNT_IO_DEBUG) 90 tracef("disposing selector[fd=%d]...", _epollFD); 91 isDisposed = true; 92 _eventChannel.close(); 93 int r = core.sys.posix.unistd.close(_epollFD); 94 if(r != 0) { 95 version (HUNT_IO_DEBUG) warningf("error: %d", r); 96 } 97 98 super.dispose(); 99 } 100 101 override void onStop() { 102 version (HUNT_IO_DEBUG) 103 infof("Selector stopping. fd=%d, id: %d", _epollFD, getId()); 104 105 if(!_eventChannel.isClosed()) { 106 _eventChannel.trigger(); 107 // _eventChannel.onWrite(); 108 } 109 } 110 111 override bool register(AbstractChannel channel) { 112 super.register(channel); 113 114 version (HUNT_IO_DEBUG) 115 tracef("register, channel(fd=%d, type=%s)", channel.handle, channel.type); 116 117 // epoll_event e; 118 119 // e.data.fd = infd; 120 // e.data.ptr = cast(void*) channel; 121 // e.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT; 122 // int s = epoll_ctl(_epollFD, EPOLL_CTL_ADD, infd, &e); 123 // if (s == -1) { 124 // debug warningf("failed to register channel: fd=%d", infd); 125 // return false; 126 // } else { 127 // return true; 128 // } 129 if (epollCtl(channel, EPOLL_CTL_ADD)) { 130 return true; 131 } else { 132 debug warningf("failed to register channel: fd=%d", channel.handle); 133 return false; 134 } 135 } 136 137 override bool deregister(AbstractChannel channel) { 138 scope(exit) { 139 super.deregister(channel); 140 version (HUNT_IO_DEBUG) 141 tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type); 142 } 143 144 if (epollCtl(channel, EPOLL_CTL_DEL)) { 145 return true; 146 } else { 147 warningf("deregister channel failed: fd=%d", fd); 148 return false; 149 } 150 } 151 152 /** 153 timeout: in millisecond 154 */ 155 protected override int doSelect(long timeout) { 156 int len = 0; 157 158 if (timeout <= 0) { /* Indefinite or no wait */ 159 do { 160 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html 161 // https://stackoverflow.com/questions/6870158/epoll-wait-fails-due-to-eintr-how-to-remedy-this/6870391#6870391 162 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int) timeout); 163 } while ((len == -1) && (errno == EINTR)); 164 } else { /* Bounded wait; bounded restarts */ 165 len = iepoll(_epollFD, events.ptr, events.length, cast(int) timeout); 166 } 167 168 foreach (i; 0 .. len) { 169 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr); 170 if (channel is null) { 171 debug warningf("channel is null"); 172 } else { 173 handeChannelEvent(channel, events[i].events); 174 } 175 } 176 177 return len; 178 } 179 180 private void handeChannelEvent(AbstractChannel channel, uint event) { 181 version (HUNT_IO_DEBUG) { 182 warningf("thread: %s", Thread.getThis().name()); 183 184 // Thread.sleep(300.msecs); 185 infof("handling event: selector=%d, channel=%d, events=%d, isReadable: %s, isWritable: %s, isClosed: %s", 186 this._epollFD, channel.handle, event, isReadable(event), isWritable(event), isClosed(event)); 187 } 188 189 try { 190 if (isClosed(event)) { // && errno != EINTR 191 /* An error has occured on this fd, or the socket is not 192 ready for reading (why were we notified then?) */ 193 version (HUNT_IO_DEBUG) { 194 warningf("event=%d, isReadable: %s, isWritable: %s", 195 event, isReadable(event), isWritable(event)); 196 197 if (isError(event)) { 198 warningf("channel error: fd=%s, event=%d, errno=%d, message=%s", 199 channel.handle, event, errno, getErrorMessage(errno)); 200 } else { 201 infof("channel closed: fd=%d, errno=%d, message=%s", 202 channel.handle, errno, getErrorMessage(errno)); 203 } 204 } 205 206 // The remote connection broken abnormally, so the channel should be notified. 207 if(isReadable(event)) { 208 channel.onRead(); 209 } 210 211 // if(isWritable(event)) { 212 // channel.onWrite(); 213 // } 214 215 channel.close(); 216 } else if (event == EPOLLIN) { 217 version (HUNT_IO_DEBUG) 218 tracef("channel read event: fd=%d", channel.handle); 219 channel.onRead(); 220 } else if (event == EPOLLOUT) { 221 version (HUNT_IO_DEBUG) 222 tracef("channel write event: fd=%d", channel.handle); 223 channel.onWrite(); 224 } else if (event == (EPOLLIN | EPOLLOUT)) { 225 version (HUNT_IO_DEBUG) 226 tracef("channel read and write: fd=%d", channel.handle); 227 channel.onWrite(); 228 channel.onRead(); 229 } else { 230 debug warningf("Only read/write/close events can be handled, current event: %d", event); 231 } 232 } catch (Exception e) { 233 debug { 234 errorf("error while handing channel: fd=%s, exception=%s, message=%s", 235 channel.handle, typeid(e), e.msg); 236 } 237 version(HUNT_DEBUG) warning(e); 238 } 239 } 240 241 private int iepoll(int epfd, epoll_event* events, int numfds, int timeout) { 242 long start, now; 243 int remaining = timeout; 244 timeval t; 245 long diff; 246 247 gettimeofday(&t, null); 248 start = t.tv_sec * 1000 + t.tv_usec / 1000; 249 250 for (;;) { 251 int res = epoll_wait(epfd, events, numfds, remaining); 252 if (res < 0 && errno == EINTR) { 253 if (remaining >= 0) { 254 gettimeofday(&t, null); 255 now = t.tv_sec * 1000 + t.tv_usec / 1000; 256 diff = now - start; 257 remaining -= diff; 258 if (diff < 0 || remaining <= 0) { 259 return 0; 260 } 261 start = now; 262 } 263 } else { 264 return res; 265 } 266 } 267 } 268 269 // https://blog.csdn.net/ljx0305/article/details/4065058 270 private static bool isError(uint events) nothrow { 271 return (events & EPOLLERR) != 0; 272 } 273 274 private static bool isClosed(uint e) nothrow { 275 return (e & EPOLLERR) != 0 || (e & EPOLLHUP) != 0 || (e & EPOLLRDHUP) != 0 276 || (!(e & EPOLLIN) && !(e & EPOLLOUT)) != 0; 277 } 278 279 private static bool isReadable(uint events) nothrow { 280 return (events & EPOLLIN) != 0; 281 } 282 283 private static bool isWritable(uint events) nothrow { 284 return (events & EPOLLOUT) != 0; 285 } 286 287 private static buildEpollEvent(AbstractChannel channel, ref epoll_event ev) { 288 ev.data.ptr = cast(void*) channel; 289 // ev.data.fd = channel.handle; 290 ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP; 291 if (channel.hasFlag(ChannelFlag.Read)) 292 ev.events |= EPOLLIN; 293 if (channel.hasFlag(ChannelFlag.Write)) 294 ev.events |= EPOLLOUT; 295 // if (channel.hasFlag(ChannelFlag.OneShot)) 296 // ev.events |= EPOLLONESHOT; 297 if (channel.hasFlag(ChannelFlag.ETMode)) 298 ev.events |= EPOLLET; 299 return ev; 300 } 301 302 private bool epollCtl(AbstractChannel channel, int opcode) { 303 assert(channel !is null); 304 const fd = channel.handle; 305 assert(fd >= 0, "The channel.handle is not initialized!"); 306 307 epoll_event ev; 308 buildEpollEvent(channel, ev); 309 int res = 0; 310 311 do { 312 res = epoll_ctl(_epollFD, opcode, fd, &ev); 313 } 314 while ((res == -1) && (errno == EINTR)); 315 316 /* 317 * A channel may be registered with several Selectors. When each Selector 318 * is polled a EPOLL_CTL_DEL op will be inserted into its pending update 319 * list to remove the file descriptor from epoll. The "last" Selector will 320 * close the file descriptor which automatically unregisters it from each 321 * epoll descriptor. To avoid costly synchronization between Selectors we 322 * allow pending updates to be processed, ignoring errors. The errors are 323 * harmless as the last update for the file descriptor is guaranteed to 324 * be EPOLL_CTL_DEL. 325 */ 326 if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) { 327 warning("epoll_ctl failed"); 328 return false; 329 } else 330 return true; 331 } 332 }