1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.Epoll; 13 14 // dfmt off 15 version(HAVE_EPOLL): 16 17 // dfmt on 18 19 import std.exception; 20 import std.socket; 21 import std.string; 22 import hunt.logging; 23 24 import core.time; 25 import core.stdc.string; 26 import core.stdc.errno; 27 import core.sys.posix.sys.types; // for ssize_t, size_t 28 import core.sys.posix.netinet.tcp; 29 import core.sys.posix.netinet.in_; 30 import core.sys.posix.unistd; 31 32 import core.sys.posix.sys.resource; 33 import core.sys.posix.sys.time; 34 35 import hunt.Exceptions; 36 import hunt.io.socket; 37 import hunt.event.timer; 38 import hunt.system.Error; 39 40 /* Max. theoretical number of file descriptors on system. */ 41 __gshared size_t fdLimit = 0; 42 43 shared static this() { 44 rlimit fileLimit; 45 getrlimit(RLIMIT_NOFILE, &fileLimit); 46 fdLimit = fileLimit.rlim_max; 47 } 48 49 /** 50 */ 51 class AbstractSelector : Selector { 52 enum int NUM_KEVENTS = 512; 53 private int _epollFD; 54 private EventChannel _event; 55 56 this() { 57 // http://man7.org/linux/man-pages/man2/epoll_create.2.html 58 /* 59 * epoll_create expects a size as a hint to the kernel about how to 60 * dimension internal structures. We can't predict the size in advance. 61 */ 62 // _epollFD = epoll_create1(0); 63 _epollFD = epoll_create(256); 64 if(_epollFD < 0) 65 throw new IOException("epoll_create failed"); 66 _event = new EpollEventChannel(this); 67 register(_event); 68 } 69 70 ~this() { 71 dispose(); 72 } 73 74 override void dispose() { 75 if (isDisposed) 76 return; 77 78 version (HUNT_DEBUG) tracef("disposing selector[fd=%d]...", _epollFD); 79 isDisposed = true; 80 _event.close(); 81 core.sys.posix.unistd.close(_epollFD); 82 } 83 84 private bool isDisposed = false; 85 86 override void stop() { 87 if(_running) { 88 super.stop(); 89 version (HUNT_DEBUG) tracef("notice that selector[fd=%d] stopped", _epollFD); 90 _event.call(); 91 } 92 } 93 94 override bool register(AbstractChannel channel) { 95 assert(channel !is null); 96 version (HUNT_DEBUG) tracef("register channel: fd=%d", channel.handle); 97 98 if (channel.type == ChannelType.Timer) { 99 auto wt = cast(AbstractTimer) channel; 100 if (wt !is null) 101 wt.setTimer(); 102 } 103 104 if(epollCtl(channel, EPOLL_CTL_ADD)) { 105 _event.setNext(channel); 106 return true; 107 } else { 108 warningf("register channell failed: fd=%d", channel.handle); 109 return false; 110 } 111 } 112 113 override bool reregister(AbstractChannel channel) { 114 return epollCtl(channel, EPOLL_CTL_MOD); 115 } 116 117 override bool deregister(AbstractChannel channel) { 118 if(epollCtl(channel, EPOLL_CTL_DEL)) { 119 version (HUNT_DEBUG) tracef("deregister channel: fd=%d", channel.handle); 120 return true; 121 } else { 122 warningf("deregister channel failed: fd=%d", channel.handle); 123 return false; 124 } 125 } 126 127 /** 128 timeout: in millisecond 129 */ 130 override protected int doSelect(long timeout) { 131 epoll_event[NUM_KEVENTS] events; 132 int len = 0; 133 134 if(timeout <= 0) { /* Indefinite or no wait */ 135 do { 136 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html 137 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int)timeout); 138 } while((len == -1) && (errno == EINTR)); 139 } else { /* Bounded wait; bounded restarts */ 140 len = iepoll(_epollFD, events.ptr, events.length, cast(int)timeout); 141 } 142 143 foreach (i; 0 .. len) { 144 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr); 145 if (channel is null) { 146 warningf("channel is null"); 147 continue; 148 } 149 150 uint currentEvents = events[i].events; 151 version (HUNT_DEBUG) infof("handling event: events=%d, fd=%d", currentEvents, channel.handle); 152 153 if (isClosed(currentEvents)) { 154 version (HUNT_DEBUG) 155 tracef("channel closed: fd=%d, errno=%d, message=%s", channel.handle, 156 errno, getErrorMessage(errno)); 157 channel.close(); 158 } else if (isError(currentEvents)) { 159 // version (HUNT_DEBUG) 160 debug warningf("channel error: fd=%s, errno=%d, message=%s", channel.handle, 161 errno, getErrorMessage(errno)); 162 channel.close(); 163 } else if (isReadable(currentEvents)) { 164 channel.onRead(); 165 } else if (isWritable(currentEvents)) { 166 AbstractSocketChannel wt = cast(AbstractSocketChannel) channel; 167 assert(wt !is null); 168 wt.onWriteDone(); 169 } else { 170 warningf("Undefined behavior: fd=%d, registered=%s", channel.handle, channel.isRegistered); 171 } 172 } 173 174 return len; 175 } 176 177 private int iepoll(int epfd, epoll_event* events, int numfds, int timeout) { 178 long start, now; 179 int remaining = timeout; 180 timeval t; 181 long diff; 182 183 gettimeofday(&t, null); 184 start = t.tv_sec * 1000 + t.tv_usec / 1000; 185 186 for (;;) { 187 int res = epoll_wait(epfd, events, numfds, remaining); 188 if (res < 0 && errno == EINTR) { 189 if (remaining >= 0) { 190 gettimeofday(&t, null); 191 now = t.tv_sec * 1000 + t.tv_usec / 1000; 192 diff = now - start; 193 remaining -= diff; 194 if (diff < 0 || remaining <= 0) { 195 return 0; 196 } 197 start = now; 198 } 199 } else { 200 return res; 201 } 202 } 203 } 204 205 // https://blog.csdn.net/ljx0305/article/details/4065058 206 private static bool isError(uint events) nothrow { 207 return (events & EPOLLERR ) != 0; 208 } 209 210 private static bool isClosed(uint events) nothrow { 211 return (events & (EPOLLHUP | EPOLLRDHUP)) != 0; 212 } 213 214 private static bool isReadable(uint events) nothrow { 215 return (events & EPOLLIN) != 0; 216 } 217 218 private static bool isWritable(uint events) nothrow { 219 return (events & EPOLLOUT) != 0; 220 } 221 222 private static buildEpollEvent(AbstractChannel channel, ref epoll_event ev) { 223 ev.data.ptr = cast(void*) channel; 224 // ev.data.fd = channel.handle; 225 ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP; 226 if (channel.hasFlag(ChannelFlag.Read)) 227 ev.events |= EPOLLIN; 228 if (channel.hasFlag(ChannelFlag.Write)) 229 ev.events |= EPOLLOUT; 230 if (channel.hasFlag(ChannelFlag.OneShot)) 231 ev.events |= EPOLLONESHOT; 232 if (channel.hasFlag(ChannelFlag.ETMode)) 233 ev.events |= EPOLLET; 234 return ev; 235 } 236 237 private bool epollCtl(AbstractChannel channel, int opcode) { 238 assert(channel !is null); 239 const fd = channel.handle; 240 assert(fd >= 0, "The channel.handle is not initialized!"); 241 242 epoll_event ev; 243 buildEpollEvent(channel, ev); 244 int res = 0; 245 do { 246 res = epoll_ctl(_epollFD, opcode, fd, &ev); 247 } while((res == -1) && (errno == EINTR)); 248 249 /* 250 * A channel may be registered with several Selectors. When each Selector 251 * is polled a EPOLL_CTL_DEL op will be inserted into its pending update 252 * list to remove the file descriptor from epoll. The "last" Selector will 253 * close the file descriptor which automatically unregisters it from each 254 * epoll descriptor. To avoid costly synchronization between Selectors we 255 * allow pending updates to be processed, ignoring errors. The errors are 256 * harmless as the last update for the file descriptor is guaranteed to 257 * be EPOLL_CTL_DEL. 258 */ 259 if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) { 260 warning("epoll_ctl failed"); 261 return false; 262 } else 263 return true; 264 } 265 } 266 267 268 /** 269 */ 270 class EpollEventChannel : EventChannel { 271 this(Selector loop) { 272 super(loop); 273 setFlag(ChannelFlag.Read, true); 274 this.handle = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 275 _isRegistered = true; 276 } 277 278 ~this() { 279 close(); 280 } 281 282 override void call() { 283 version (HUNT_DEBUG) tracef("calling event [fd=%d]...%s", this.handle, eventLoop.isRuning); 284 ulong value = 1; 285 core.sys.posix.unistd.write(this.handle, &value, value.sizeof); 286 } 287 288 override void onRead() { 289 version (HUNT_DEBUG) tracef("channel reading [fd=%d]...", this.handle); 290 this.clearError(); 291 ulong value; 292 ssize_t n = core.sys.posix.unistd.read(this.handle, &value, value.sizeof); 293 version (HUNT_DEBUG) tracef("channel read done: %d bytes, fd=%d", n, this.handle); 294 } 295 296 override protected void onClose() { 297 version (HUNT_DEBUG) tracef("close event channel [fd=%d]...", this.handle); 298 core.sys.posix.unistd.close(this.handle); 299 } 300 } 301 302 enum { 303 EFD_SEMAPHORE = 0x1, 304 EFD_CLOEXEC = 0x80000, 305 EFD_NONBLOCK = 0x800 306 } 307 308 enum { 309 EPOLL_CLOEXEC = 0x80000, 310 EPOLL_NONBLOCK = 0x800 311 } 312 313 enum { 314 EPOLLIN = 0x001, 315 EPOLLPRI = 0x002, 316 EPOLLOUT = 0x004, 317 EPOLLRDNORM = 0x040, 318 EPOLLRDBAND = 0x080, 319 EPOLLWRNORM = 0x100, 320 EPOLLWRBAND = 0x200, 321 EPOLLMSG = 0x400, 322 EPOLLERR = 0x008, 323 EPOLLHUP = 0x010, 324 EPOLLRDHUP = 0x2000, // since Linux 2.6.17 325 EPOLLONESHOT = 1u << 30, 326 EPOLLET = 1u << 31 327 } 328 329 /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */ 330 enum { 331 EPOLL_CTL_ADD = 1, // Add a file descriptor to the interface. 332 EPOLL_CTL_DEL = 2, // Remove a file descriptor from the interface. 333 EPOLL_CTL_MOD = 3, // Change file descriptor epoll_event structure. 334 } 335 336 // dfmt off 337 extern (C) : @system : nothrow : 338 339 align(1) struct epoll_event { 340 align(1): 341 uint events; 342 epoll_data data; 343 } 344 345 union epoll_data { 346 void* ptr; 347 int fd; 348 uint u32; 349 ulong u64; 350 } 351 352 // dfmt on 353 354 int epoll_create(int size); 355 int epoll_create1(int flags); 356 int epoll_ctl(int epfd, int op, int fd, epoll_event* event); 357 int epoll_wait(int epfd, epoll_event* events, int maxevents, int timeout); 358 359 socket_t eventfd(uint initval, int flags);