1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.Kqueue; 13 14 15 // dfmt off 16 version(HAVE_KQUEUE): 17 // dfmt on 18 import hunt.event.selector.Selector; 19 import hunt.event.timer.Kqueue; 20 import hunt.Exceptions; 21 import hunt.io.channel; 22 import hunt.logging.ConsoleLogger; 23 import hunt.util.CompilerHelper; 24 25 import std.exception; 26 import std.socket; 27 import std.string; 28 29 import core.time; 30 import core.stdc.string; 31 import core.stdc.errno; 32 import core.sys.posix.sys.types; // for ssize_t, size_t 33 import core.sys.posix.signal; 34 import core.sys.posix.netinet.tcp; 35 import core.sys.posix.netinet.in_; 36 import core.sys.posix.unistd; 37 import core.sys.posix.time; 38 import hunt.util.TaskPool; 39 /** 40 * 41 */ 42 class AbstractSelector : Selector { 43 // kevent array size 44 enum int NUM_KEVENTS = 128; 45 private bool isDisposed = false; 46 private Kevent[NUM_KEVENTS] events; 47 private int _kqueueFD; 48 private EventChannel _eventChannel; 49 // private TaskPool _taskPool; 50 51 this(size_t number, size_t divider, TaskPool pool = null, size_t maxChannels = 1500) { 52 // _taskPool = pool; 53 super(number, divider, maxChannels); 54 _kqueueFD = kqueue(); 55 _eventChannel = new KqueueEventChannel(this); 56 register(_eventChannel); 57 } 58 59 ~this() @nogc { 60 // dispose(); 61 } 62 63 override void dispose() { 64 if (isDisposed) 65 return; 66 67 version (HUNT_IO_DEBUG) 68 tracef("disposing selector[fd=%d]...", _kqueueFD); 69 isDisposed = true; 70 _eventChannel.close(); 71 int r = core.sys.posix.unistd.close(_kqueueFD); 72 if(r != 0) { 73 version(HUNT_DEBUG) warningf("error: %d", r); 74 } 75 76 super.dispose(); 77 } 78 79 override void onStop() { 80 version (HUNT_IO_DEBUG) 81 infof("Selector stopping. fd=%d", _kqueueFD); 82 83 if(!_eventChannel.isClosed()) { 84 _eventChannel.trigger(); 85 // _eventChannel.onWrite(); 86 } 87 } 88 89 override bool register(AbstractChannel channel) { 90 super.register(channel); 91 92 const int fd = channel.handle; 93 version (HUNT_IO_DEBUG) 94 tracef("register channel: fd=%d, type=%s", fd, channel.type); 95 96 int err = -1; 97 if (channel.type == ChannelType.Timer) { 98 Kevent ev; 99 AbstractTimer timerChannel = cast(AbstractTimer) channel; 100 if (timerChannel is null) 101 return false; 102 size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond 103 EV_SET(&ev, timerChannel.handle, EVFILT_TIMER, 104 EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel); 105 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 106 } 107 else { 108 if (fd < 0) 109 return false; 110 Kevent[2] ev = void; 111 short read = EV_ADD | EV_ENABLE; 112 short write = EV_ADD | EV_ENABLE; 113 if (channel.hasFlag(ChannelFlag.ETMode)) { 114 read |= EV_CLEAR; 115 write |= EV_CLEAR; 116 } 117 EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel); 118 EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel); 119 if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write)) 120 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 121 else if (channel.hasFlag(ChannelFlag.Read)) 122 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 123 else if (channel.hasFlag(ChannelFlag.Write)) 124 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 125 } 126 if (err < 0) { 127 return false; 128 } 129 return true; 130 } 131 132 override bool deregister(AbstractChannel channel) { 133 scope(exit) { 134 super.deregister(channel); 135 version (HUNT_IO_DEBUG) 136 tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type); 137 } 138 139 const fd = channel.handle; 140 if (fd < 0) 141 return false; 142 143 int err = -1; 144 if (channel.type == ChannelType.Timer) { 145 Kevent ev; 146 AbstractTimer timerChannel = cast(AbstractTimer) channel; 147 if (timerChannel is null) 148 return false; 149 EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel); 150 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 151 } 152 else { 153 Kevent[2] ev = void; 154 EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel); 155 EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel); 156 if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write)) 157 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 158 else if (channel.hasFlag(ChannelFlag.Read)) 159 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 160 else if (channel.hasFlag(ChannelFlag.Write)) 161 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 162 } 163 if (err < 0) { 164 return false; 165 } 166 // channel.currtLoop = null; 167 channel.clear(); 168 return true; 169 } 170 171 protected override int doSelect(long timeout) { 172 // void* [] tmp; 173 // eventBuffer = tmp; 174 timespec ts; 175 timespec *tsp; 176 // timeout is in milliseconds. Convert to struct timespec. 177 // timeout == -1 : wait forever : timespec timeout of NULL 178 // timeout == 0 : return immediately : timespec timeout of zero 179 if (timeout >= 0) { 180 // For some indeterminate reason kevent(2) has been found to fail with 181 // an EINVAL error for timeout values greater than or equal to 182 // 100000001000L. To avoid this problem, clamp the timeout arbitrarily 183 // to the maximum value of a 32-bit signed integer which is 184 // approximately 25 days in milliseconds. 185 const int timeoutMax = int.max; 186 if (timeout > timeoutMax) { 187 timeout = timeoutMax; 188 } 189 ts.tv_sec = timeout / 1000; 190 ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec 191 tsp = &ts; 192 } else { 193 tsp = null; 194 } 195 196 // auto tspec = timespec(1, 1000 * 10); 197 int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp); 198 199 foreach (i; 0 .. result) { 200 AbstractChannel channel = cast(AbstractChannel)(events[i].udata); 201 ushort eventFlags = events[i].flags; 202 version (HUNT_IO_DEBUG) 203 infof("handling event: events=%d, fd=%d", eventFlags, channel.handle); 204 205 if (eventFlags & EV_ERROR) { 206 warningf("channel[fd=%d] has a error.", channel.handle); 207 channel.close(); 208 // rmEventArray(channel); 209 continue; 210 } 211 if (eventFlags & EV_EOF) { 212 version (HUNT_IO_DEBUG) infof("channel[fd=%d] closed", channel.handle); 213 channel.close(); 214 // rmEventArray(channel); 215 continue; 216 } 217 218 short filter = events[i].filter; 219 handeChannelEvent(channel, filter); 220 } 221 return result; 222 } 223 224 private void handeChannelEvent(AbstractChannel channel, uint filter) { 225 version (HUNT_IO_DEBUG) 226 infof("handling event: events=%d, fd=%d", filter, channel.handle); 227 try { 228 if(filter == EVFILT_TIMER) { 229 channel.onRead(); 230 } else if (filter == EVFILT_WRITE) { 231 channel.onWrite(); 232 } else if (filter == EVFILT_READ) { 233 channel.onRead(); 234 } else { 235 warningf("Unhandled channel fileter: %d", filter); 236 } 237 } catch(Exception e) { 238 errorf("error while handing channel: fd=%s, message=%s", 239 channel.handle, e.msg); 240 } 241 } 242 } 243 244 245 enum : short { 246 EVFILT_READ = -1, 247 EVFILT_WRITE = -2, 248 EVFILT_AIO = -3, /* attached to aio requests */ 249 EVFILT_VNODE = -4, /* attached to vnodes */ 250 EVFILT_PROC = -5, /* attached to struct proc */ 251 EVFILT_SIGNAL = -6, /* attached to struct proc */ 252 EVFILT_TIMER = -7, /* timers */ 253 EVFILT_MACHPORT = -8, /* Mach portsets */ 254 EVFILT_FS = -9, /* filesystem events */ 255 EVFILT_USER = -10, /* User events */ 256 EVFILT_VM = -12, /* virtual memory events */ 257 EVFILT_SYSCOUNT = 11 258 } 259 260 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow { 261 *kevp = Kevent(args); 262 } 263 264 struct Kevent { 265 uintptr_t ident; /* identifier for this event */ 266 short filter; /* filter for event */ 267 ushort flags; 268 uint fflags; 269 intptr_t data; 270 void* udata; /* opaque user data identifier */ 271 } 272 273 enum { 274 /* actions */ 275 EV_ADD = 0x0001, /* add event to kq (implies enable) */ 276 EV_DELETE = 0x0002, /* delete event from kq */ 277 EV_ENABLE = 0x0004, /* enable event */ 278 EV_DISABLE = 0x0008, /* disable event (not reported) */ 279 280 /* flags */ 281 EV_ONESHOT = 0x0010, /* only report one occurrence */ 282 EV_CLEAR = 0x0020, /* clear event state after reporting */ 283 EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ 284 EV_DISPATCH = 0x0080, /* disable event after reporting */ 285 286 EV_SYSFLAGS = 0xF000, /* reserved by system */ 287 EV_FLAG1 = 0x2000, /* filter-specific flag */ 288 289 /* returned values */ 290 EV_EOF = 0x8000, /* EOF detected */ 291 EV_ERROR = 0x4000, /* error, data contains errno */ 292 293 } 294 295 enum { 296 /* 297 * data/hint flags/masks for EVFILT_USER, shared with userspace 298 * 299 * On input, the top two bits of fflags specifies how the lower twenty four 300 * bits should be applied to the stored value of fflags. 301 * 302 * On output, the top two bits will always be set to NOTE_FFNOP and the 303 * remaining twenty four bits will contain the stored fflags value. 304 */ 305 NOTE_FFNOP = 0x00000000, /* ignore input fflags */ 306 NOTE_FFAND = 0x40000000, /* AND fflags */ 307 NOTE_FFOR = 0x80000000, /* OR fflags */ 308 NOTE_FFCOPY = 0xc0000000, /* copy fflags */ 309 NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */ 310 NOTE_FFLAGSMASK = 0x00ffffff, 311 312 NOTE_TRIGGER = 0x01000000, /* Cause the event to be 313 triggered for output. */ 314 315 /* 316 * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace 317 */ 318 NOTE_LOWAT = 0x0001, /* low water mark */ 319 320 /* 321 * data/hint flags for EVFILT_VNODE, shared with userspace 322 */ 323 NOTE_DELETE = 0x0001, /* vnode was removed */ 324 NOTE_WRITE = 0x0002, /* data contents changed */ 325 NOTE_EXTEND = 0x0004, /* size increased */ 326 NOTE_ATTRIB = 0x0008, /* attributes changed */ 327 NOTE_LINK = 0x0010, /* link count changed */ 328 NOTE_RENAME = 0x0020, /* vnode was renamed */ 329 NOTE_REVOKE = 0x0040, /* vnode access was revoked */ 330 331 /* 332 * data/hint flags for EVFILT_PROC, shared with userspace 333 */ 334 NOTE_EXIT = 0x80000000, /* process exited */ 335 NOTE_FORK = 0x40000000, /* process forked */ 336 NOTE_EXEC = 0x20000000, /* process exec'd */ 337 NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */ 338 NOTE_PDATAMASK = 0x000fffff, /* mask for pid */ 339 340 /* additional flags for EVFILT_PROC */ 341 NOTE_TRACK = 0x00000001, /* follow across forks */ 342 NOTE_TRACKERR = 0x00000002, /* could not track child */ 343 NOTE_CHILD = 0x00000004, /* am a child process */ 344 345 } 346 347 extern (C) { 348 int kqueue() @nogc nothrow; 349 int kevent(int kq, const Kevent* changelist, int nchanges, 350 Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow; 351 } 352 353 static if (CompilerHelper.isLessThan(2078)) { 354 enum SO_REUSEPORT = 0x0200; 355 }