1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.Kqueue; 13 14 15 // dfmt off 16 version(HAVE_KQUEUE): 17 // dfmt on 18 import hunt.event.selector.Selector; 19 import hunt.event.timer.Kqueue; 20 import hunt.Exceptions; 21 import hunt.io.channel; 22 import hunt.logging; 23 import hunt.util.CompilerHelper; 24 25 import std.exception; 26 import std.socket; 27 import std.string; 28 29 import core.time; 30 import core.stdc.string; 31 import core.stdc.errno; 32 import core.sys.posix.sys.types; // for ssize_t, size_t 33 import core.sys.posix.signal; 34 import core.sys.posix.netinet.tcp; 35 import core.sys.posix.netinet.in_; 36 import core.sys.posix.unistd; 37 import core.sys.posix.time; 38 import hunt.util.worker; 39 40 /** 41 * 42 */ 43 class AbstractSelector : Selector { 44 // kevent array size 45 enum int NUM_KEVENTS = 128; 46 private bool isDisposed = false; 47 private Kevent[NUM_KEVENTS] events; 48 private int _kqueueFD; 49 private EventChannel _eventChannel; 50 51 this(size_t number, size_t divider, Worker worker = null, size_t maxChannels = 1500) { 52 super(number, divider, worker, maxChannels); 53 _kqueueFD = kqueue(); 54 _eventChannel = new KqueueEventChannel(this); 55 register(_eventChannel); 56 } 57 58 ~this() @nogc { 59 // dispose(); 60 } 61 62 override void dispose() { 63 if (isDisposed) 64 return; 65 66 version (HUNT_IO_DEBUG) 67 tracef("disposing selector[fd=%d]...", _kqueueFD); 68 isDisposed = true; 69 _eventChannel.close(); 70 int r = core.sys.posix.unistd.close(_kqueueFD); 71 if(r != 0) { 72 version(HUNT_DEBUG) warningf("error: %d", r); 73 } 74 75 super.dispose(); 76 } 77 78 override void onStop() { 79 version (HUNT_IO_DEBUG) 80 infof("Selector stopping. fd=%d", _kqueueFD); 81 82 if(!_eventChannel.isClosed()) { 83 _eventChannel.trigger(); 84 // _eventChannel.onWrite(); 85 } 86 } 87 88 override bool register(AbstractChannel channel) { 89 super.register(channel); 90 91 const int fd = channel.handle; 92 version (HUNT_IO_DEBUG) 93 tracef("register channel: fd=%d, type=%s", fd, channel.type); 94 95 int err = -1; 96 if (channel.type == ChannelType.Timer) { 97 Kevent ev; 98 AbstractTimer timerChannel = cast(AbstractTimer) channel; 99 if (timerChannel is null) 100 return false; 101 size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond 102 EV_SET(&ev, timerChannel.handle, EVFILT_TIMER, 103 EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel); 104 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 105 } 106 else { 107 if (fd < 0) 108 return false; 109 Kevent[2] ev = void; 110 short read = EV_ADD | EV_ENABLE; 111 short write = EV_ADD | EV_ENABLE; 112 if (channel.hasFlag(ChannelFlag.ETMode)) { 113 read |= EV_CLEAR; 114 write |= EV_CLEAR; 115 } 116 EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel); 117 EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel); 118 if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write)) 119 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 120 else if (channel.hasFlag(ChannelFlag.Read)) 121 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 122 else if (channel.hasFlag(ChannelFlag.Write)) 123 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 124 } 125 if (err < 0) { 126 return false; 127 } 128 return true; 129 } 130 131 override bool deregister(AbstractChannel channel) { 132 scope(exit) { 133 super.deregister(channel); 134 version (HUNT_IO_DEBUG) 135 tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type); 136 } 137 138 const fd = channel.handle; 139 if (fd < 0) 140 return false; 141 142 int err = -1; 143 if (channel.type == ChannelType.Timer) { 144 Kevent ev; 145 AbstractTimer timerChannel = cast(AbstractTimer) channel; 146 if (timerChannel is null) 147 return false; 148 EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel); 149 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 150 } 151 else { 152 Kevent[2] ev = void; 153 EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel); 154 EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel); 155 if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write)) 156 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 157 else if (channel.hasFlag(ChannelFlag.Read)) 158 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 159 else if (channel.hasFlag(ChannelFlag.Write)) 160 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 161 } 162 if (err < 0) { 163 return false; 164 } 165 // channel.currtLoop = null; 166 channel.clear(); 167 return true; 168 } 169 170 protected override int doSelect(long timeout) { 171 // void* [] tmp; 172 // eventBuffer = tmp; 173 timespec ts; 174 timespec *tsp; 175 // timeout is in milliseconds. Convert to struct timespec. 176 // timeout == -1 : wait forever : timespec timeout of NULL 177 // timeout == 0 : return immediately : timespec timeout of zero 178 if (timeout >= 0) { 179 // For some indeterminate reason kevent(2) has been found to fail with 180 // an EINVAL error for timeout values greater than or equal to 181 // 100000001000L. To avoid this problem, clamp the timeout arbitrarily 182 // to the maximum value of a 32-bit signed integer which is 183 // approximately 25 days in milliseconds. 184 const int timeoutMax = int.max; 185 if (timeout > timeoutMax) { 186 timeout = timeoutMax; 187 } 188 ts.tv_sec = timeout / 1000; 189 ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec 190 tsp = &ts; 191 } else { 192 tsp = null; 193 } 194 195 // auto tspec = timespec(1, 1000 * 10); 196 int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp); 197 198 foreach (i; 0 .. result) { 199 AbstractChannel channel = cast(AbstractChannel)(events[i].udata); 200 ushort eventFlags = events[i].flags; 201 version (HUNT_IO_DEBUG) 202 infof("handling event: events=%d, fd=%d", eventFlags, channel.handle); 203 204 if (eventFlags & EV_ERROR) { 205 warningf("channel[fd=%d] has a error.", channel.handle); 206 channel.close(); 207 continue; 208 } 209 if (eventFlags & EV_EOF) { 210 version (HUNT_IO_DEBUG) infof("channel[fd=%d] closed", channel.handle); 211 channel.close(); 212 continue; 213 } 214 215 short filter = events[i].filter; 216 handeChannelEvent(channel, filter); 217 } 218 return result; 219 } 220 221 private void handeChannelEvent(AbstractChannel channel, uint filter) { 222 version (HUNT_IO_DEBUG) 223 infof("handling event: events=%d, fd=%d", filter, channel.handle); 224 225 try { 226 if(filter == EVFILT_TIMER) { 227 channel.onRead(); 228 } else if (filter == EVFILT_WRITE) { 229 channel.onWrite(); 230 } else if (filter == EVFILT_READ) { 231 channel.onRead(); 232 } else { 233 warningf("Unhandled channel fileter: %d", filter); 234 } 235 } catch(Exception e) { 236 errorf("error while handing channel: fd=%s, message=%s", 237 channel.handle, e.msg); 238 } 239 } 240 } 241 242 243 enum : short { 244 EVFILT_READ = -1, 245 EVFILT_WRITE = -2, 246 EVFILT_AIO = -3, /* attached to aio requests */ 247 EVFILT_VNODE = -4, /* attached to vnodes */ 248 EVFILT_PROC = -5, /* attached to struct proc */ 249 EVFILT_SIGNAL = -6, /* attached to struct proc */ 250 EVFILT_TIMER = -7, /* timers */ 251 EVFILT_MACHPORT = -8, /* Mach portsets */ 252 EVFILT_FS = -9, /* filesystem events */ 253 EVFILT_USER = -10, /* User events */ 254 EVFILT_VM = -12, /* virtual memory events */ 255 EVFILT_SYSCOUNT = 11 256 } 257 258 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow { 259 *kevp = Kevent(args); 260 } 261 262 struct Kevent { 263 uintptr_t ident; /* identifier for this event */ 264 short filter; /* filter for event */ 265 ushort flags; 266 uint fflags; 267 intptr_t data; 268 void* udata; /* opaque user data identifier */ 269 } 270 271 enum { 272 /* actions */ 273 EV_ADD = 0x0001, /* add event to kq (implies enable) */ 274 EV_DELETE = 0x0002, /* delete event from kq */ 275 EV_ENABLE = 0x0004, /* enable event */ 276 EV_DISABLE = 0x0008, /* disable event (not reported) */ 277 278 /* flags */ 279 EV_ONESHOT = 0x0010, /* only report one occurrence */ 280 EV_CLEAR = 0x0020, /* clear event state after reporting */ 281 EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ 282 EV_DISPATCH = 0x0080, /* disable event after reporting */ 283 284 EV_SYSFLAGS = 0xF000, /* reserved by system */ 285 EV_FLAG1 = 0x2000, /* filter-specific flag */ 286 287 /* returned values */ 288 EV_EOF = 0x8000, /* EOF detected */ 289 EV_ERROR = 0x4000, /* error, data contains errno */ 290 291 } 292 293 enum { 294 /* 295 * data/hint flags/masks for EVFILT_USER, shared with userspace 296 * 297 * On input, the top two bits of fflags specifies how the lower twenty four 298 * bits should be applied to the stored value of fflags. 299 * 300 * On output, the top two bits will always be set to NOTE_FFNOP and the 301 * remaining twenty four bits will contain the stored fflags value. 302 */ 303 NOTE_FFNOP = 0x00000000, /* ignore input fflags */ 304 NOTE_FFAND = 0x40000000, /* AND fflags */ 305 NOTE_FFOR = 0x80000000, /* OR fflags */ 306 NOTE_FFCOPY = 0xc0000000, /* copy fflags */ 307 NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */ 308 NOTE_FFLAGSMASK = 0x00ffffff, 309 310 NOTE_TRIGGER = 0x01000000, /* Cause the event to be 311 triggered for output. */ 312 313 /* 314 * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace 315 */ 316 NOTE_LOWAT = 0x0001, /* low water mark */ 317 318 /* 319 * data/hint flags for EVFILT_VNODE, shared with userspace 320 */ 321 NOTE_DELETE = 0x0001, /* vnode was removed */ 322 NOTE_WRITE = 0x0002, /* data contents changed */ 323 NOTE_EXTEND = 0x0004, /* size increased */ 324 NOTE_ATTRIB = 0x0008, /* attributes changed */ 325 NOTE_LINK = 0x0010, /* link count changed */ 326 NOTE_RENAME = 0x0020, /* vnode was renamed */ 327 NOTE_REVOKE = 0x0040, /* vnode access was revoked */ 328 329 /* 330 * data/hint flags for EVFILT_PROC, shared with userspace 331 */ 332 NOTE_EXIT = 0x80000000, /* process exited */ 333 NOTE_FORK = 0x40000000, /* process forked */ 334 NOTE_EXEC = 0x20000000, /* process exec'd */ 335 NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */ 336 NOTE_PDATAMASK = 0x000fffff, /* mask for pid */ 337 338 /* additional flags for EVFILT_PROC */ 339 NOTE_TRACK = 0x00000001, /* follow across forks */ 340 NOTE_TRACKERR = 0x00000002, /* could not track child */ 341 NOTE_CHILD = 0x00000004, /* am a child process */ 342 343 } 344 345 extern (C) { 346 int kqueue() @nogc nothrow; 347 int kevent(int kq, const Kevent* changelist, int nchanges, 348 Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow; 349 } 350 351 static if (CompilerHelper.isLessThan(2078)) { 352 enum SO_REUSEPORT = 0x0200; 353 }