1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.Kqueue; 13 14 15 // dfmt off 16 version(HAVE_KQUEUE): 17 // dfmt on 18 import hunt.event.selector.Selector; 19 import hunt.event.timer.Kqueue; 20 import hunt.Exceptions; 21 import hunt.io.channel; 22 import hunt.logging.ConsoleLogger; 23 import hunt.util.Common; 24 25 import std.exception; 26 import std.socket; 27 import std.string; 28 29 import core.time; 30 import core.stdc.string; 31 import core.stdc.errno; 32 import core.sys.posix.sys.types; // for ssize_t, size_t 33 import core.sys.posix.signal; 34 import core.sys.posix.netinet.tcp; 35 import core.sys.posix.netinet.in_; 36 import core.sys.posix.unistd; 37 import core.sys.posix.time; 38 import hunt.concurrency.TaskPool; 39 /** 40 * 41 */ 42 class AbstractSelector : Selector { 43 // kevent array size 44 enum int NUM_KEVENTS = 128; 45 private bool isDisposed = false; 46 private Kevent[NUM_KEVENTS] events; 47 private int _kqueueFD; 48 private EventChannel _eventChannel; 49 private TaskPool _taskPool; 50 this(size_t number, size_t divider,TaskPool pool = null, size_t maxChannels = 1500) { 51 _taskPool = pool; 52 super(number, divider, maxChannels); 53 _kqueueFD = kqueue(); 54 _eventChannel = new KqueueEventChannel(this); 55 register(_eventChannel); 56 } 57 58 ~this() @nogc { 59 // dispose(); 60 } 61 62 override void dispose() { 63 if (isDisposed) 64 return; 65 66 version (HUNT_IO_DEBUG) 67 tracef("disposing selector[fd=%d]...", _kqueueFD); 68 isDisposed = true; 69 _eventChannel.close(); 70 int r = core.sys.posix.unistd.close(_kqueueFD); 71 if(r != 0) { 72 version(HUNT_DEBUG) warningf("error: %d", r); 73 } 74 75 super.dispose(); 76 } 77 78 override void onStop() { 79 version (HUNT_IO_DEBUG) 80 infof("Selector stopping. fd=%d", _kqueueFD); 81 82 if(!_eventChannel.isClosed()) { 83 _eventChannel.trigger(); 84 // _eventChannel.onWrite(); 85 } 86 } 87 88 override bool register(AbstractChannel channel) { 89 assert(channel !is null); 90 91 const int fd = channel.handle; 92 version (HUNT_IO_DEBUG) 93 tracef("register channel: fd=%d, type=%s", fd, channel.type); 94 95 int err = -1; 96 if (channel.type == ChannelType.Timer) { 97 Kevent ev; 98 AbstractTimer timerChannel = cast(AbstractTimer) channel; 99 if (timerChannel is null) 100 return false; 101 size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond 102 EV_SET(&ev, timerChannel.handle, EVFILT_TIMER, 103 EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel); 104 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 105 } 106 else { 107 if (fd < 0) 108 return false; 109 Kevent[2] ev = void; 110 short read = EV_ADD | EV_ENABLE; 111 short write = EV_ADD | EV_ENABLE; 112 if (channel.hasFlag(ChannelFlag.ETMode)) { 113 read |= EV_CLEAR; 114 write |= EV_CLEAR; 115 } 116 EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel); 117 EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel); 118 if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write)) 119 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 120 else if (channel.hasFlag(ChannelFlag.Read)) 121 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 122 else if (channel.hasFlag(ChannelFlag.Write)) 123 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 124 } 125 if (err < 0) { 126 return false; 127 } 128 return true; 129 } 130 131 override bool deregister(AbstractChannel channel) { 132 assert(channel !is null); 133 const fd = channel.handle; 134 if (fd < 0) 135 return false; 136 137 int err = -1; 138 if (channel.type == ChannelType.Timer) { 139 Kevent ev; 140 AbstractTimer timerChannel = cast(AbstractTimer) channel; 141 if (timerChannel is null) 142 return false; 143 EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel); 144 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 145 } 146 else { 147 Kevent[2] ev = void; 148 EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel); 149 EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel); 150 if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write)) 151 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 152 else if (channel.hasFlag(ChannelFlag.Read)) 153 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 154 else if (channel.hasFlag(ChannelFlag.Write)) 155 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 156 } 157 if (err < 0) { 158 return false; 159 } 160 // channel.currtLoop = null; 161 channel.clear(); 162 return true; 163 } 164 165 protected override int doSelect(long timeout) { 166 timespec ts; 167 timespec *tsp; 168 // timeout is in milliseconds. Convert to struct timespec. 169 // timeout == -1 : wait forever : timespec timeout of NULL 170 // timeout == 0 : return immediately : timespec timeout of zero 171 if (timeout >= 0) { 172 // For some indeterminate reason kevent(2) has been found to fail with 173 // an EINVAL error for timeout values greater than or equal to 174 // 100000001000L. To avoid this problem, clamp the timeout arbitrarily 175 // to the maximum value of a 32-bit signed integer which is 176 // approximately 25 days in milliseconds. 177 const int timeoutMax = int.max; 178 if (timeout > timeoutMax) { 179 timeout = timeoutMax; 180 } 181 ts.tv_sec = timeout / 1000; 182 ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec 183 tsp = &ts; 184 } else { 185 tsp = null; 186 } 187 188 // auto tspec = timespec(1, 1000 * 10); 189 int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp); 190 191 foreach (i; 0 .. result) { 192 AbstractChannel channel = cast(AbstractChannel)(events[i].udata); 193 ushort eventFlags = events[i].flags; 194 version (HUNT_IO_DEBUG) 195 infof("handling event: events=%d, fd=%d", eventFlags, channel.handle); 196 197 if (eventFlags & EV_ERROR) { 198 warningf("channel[fd=%d] has a error.", channel.handle); 199 channel.close(); 200 continue; 201 } 202 if (eventFlags & EV_EOF) { 203 version (HUNT_IO_DEBUG) infof("channel[fd=%d] closed", channel.handle); 204 channel.close(); 205 continue; 206 } 207 208 short filter = events[i].filter; 209 handeChannelEvent(channel, filter); 210 } 211 return result; 212 } 213 214 private void handeChannelEvent(AbstractChannel channel, uint filter) { 215 version (HUNT_IO_DEBUG) 216 infof("handling event: events=%d, fd=%d", filter, channel.handle); 217 try { 218 if(filter == EVFILT_TIMER) { 219 channel.onRead(); 220 } else if (filter == EVFILT_WRITE) { 221 channel.onWrite(); 222 } else if (filter == EVFILT_READ) { 223 channel.onRead(); 224 } else { 225 warningf("Unhandled channel fileter: %d", filter); 226 } 227 } catch(Exception e) { 228 errorf("error while handing channel: fd=%s, message=%s", 229 channel.handle, e.msg); 230 } 231 } 232 } 233 234 235 enum : short { 236 EVFILT_READ = -1, 237 EVFILT_WRITE = -2, 238 EVFILT_AIO = -3, /* attached to aio requests */ 239 EVFILT_VNODE = -4, /* attached to vnodes */ 240 EVFILT_PROC = -5, /* attached to struct proc */ 241 EVFILT_SIGNAL = -6, /* attached to struct proc */ 242 EVFILT_TIMER = -7, /* timers */ 243 EVFILT_MACHPORT = -8, /* Mach portsets */ 244 EVFILT_FS = -9, /* filesystem events */ 245 EVFILT_USER = -10, /* User events */ 246 EVFILT_VM = -12, /* virtual memory events */ 247 EVFILT_SYSCOUNT = 11 248 } 249 250 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow { 251 *kevp = Kevent(args); 252 } 253 254 struct Kevent { 255 uintptr_t ident; /* identifier for this event */ 256 short filter; /* filter for event */ 257 ushort flags; 258 uint fflags; 259 intptr_t data; 260 void* udata; /* opaque user data identifier */ 261 } 262 263 enum { 264 /* actions */ 265 EV_ADD = 0x0001, /* add event to kq (implies enable) */ 266 EV_DELETE = 0x0002, /* delete event from kq */ 267 EV_ENABLE = 0x0004, /* enable event */ 268 EV_DISABLE = 0x0008, /* disable event (not reported) */ 269 270 /* flags */ 271 EV_ONESHOT = 0x0010, /* only report one occurrence */ 272 EV_CLEAR = 0x0020, /* clear event state after reporting */ 273 EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ 274 EV_DISPATCH = 0x0080, /* disable event after reporting */ 275 276 EV_SYSFLAGS = 0xF000, /* reserved by system */ 277 EV_FLAG1 = 0x2000, /* filter-specific flag */ 278 279 /* returned values */ 280 EV_EOF = 0x8000, /* EOF detected */ 281 EV_ERROR = 0x4000, /* error, data contains errno */ 282 283 } 284 285 enum { 286 /* 287 * data/hint flags/masks for EVFILT_USER, shared with userspace 288 * 289 * On input, the top two bits of fflags specifies how the lower twenty four 290 * bits should be applied to the stored value of fflags. 291 * 292 * On output, the top two bits will always be set to NOTE_FFNOP and the 293 * remaining twenty four bits will contain the stored fflags value. 294 */ 295 NOTE_FFNOP = 0x00000000, /* ignore input fflags */ 296 NOTE_FFAND = 0x40000000, /* AND fflags */ 297 NOTE_FFOR = 0x80000000, /* OR fflags */ 298 NOTE_FFCOPY = 0xc0000000, /* copy fflags */ 299 NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */ 300 NOTE_FFLAGSMASK = 0x00ffffff, 301 302 NOTE_TRIGGER = 0x01000000, /* Cause the event to be 303 triggered for output. */ 304 305 /* 306 * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace 307 */ 308 NOTE_LOWAT = 0x0001, /* low water mark */ 309 310 /* 311 * data/hint flags for EVFILT_VNODE, shared with userspace 312 */ 313 NOTE_DELETE = 0x0001, /* vnode was removed */ 314 NOTE_WRITE = 0x0002, /* data contents changed */ 315 NOTE_EXTEND = 0x0004, /* size increased */ 316 NOTE_ATTRIB = 0x0008, /* attributes changed */ 317 NOTE_LINK = 0x0010, /* link count changed */ 318 NOTE_RENAME = 0x0020, /* vnode was renamed */ 319 NOTE_REVOKE = 0x0040, /* vnode access was revoked */ 320 321 /* 322 * data/hint flags for EVFILT_PROC, shared with userspace 323 */ 324 NOTE_EXIT = 0x80000000, /* process exited */ 325 NOTE_FORK = 0x40000000, /* process forked */ 326 NOTE_EXEC = 0x20000000, /* process exec'd */ 327 NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */ 328 NOTE_PDATAMASK = 0x000fffff, /* mask for pid */ 329 330 /* additional flags for EVFILT_PROC */ 331 NOTE_TRACK = 0x00000001, /* follow across forks */ 332 NOTE_TRACKERR = 0x00000002, /* could not track child */ 333 NOTE_CHILD = 0x00000004, /* am a child process */ 334 335 } 336 337 extern (C) { 338 int kqueue() @nogc nothrow; 339 int kevent(int kq, const Kevent* changelist, int nchanges, 340 Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow; 341 } 342 343 static if (CompilerHelper.isLessThan(2078)) { 344 enum SO_REUSEPORT = 0x0200; 345 }