1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.selector.Kqueue; 13 14 15 // dfmt off 16 version(HAVE_KQUEUE): 17 // dfmt on 18 19 import hunt.io.socket.Common; 20 import hunt.event.timer.Kqueue; 21 import hunt.Exceptions; 22 import hunt.logging; 23 24 import std.exception; 25 import std.socket; 26 import std.string; 27 28 import core.time; 29 import core.stdc.string; 30 import core.stdc.errno; 31 import core.sys.posix.sys.types; // for ssize_t, size_t 32 import core.sys.posix.signal; 33 import core.sys.posix.netinet.tcp; 34 import core.sys.posix.netinet.in_; 35 import core.sys.posix.unistd; 36 import core.sys.posix.time; 37 38 /** 39 */ 40 class AbstractSelector : Selector { 41 // kevent array size 42 enum int NUM_KEVENTS = 128; 43 44 this() { 45 _kqueueFD = kqueue(); 46 _event = new KqueueEventChannel(this); 47 register(_event); 48 } 49 50 ~this() { 51 dispose(); 52 } 53 54 override void dispose() { 55 if (isDisposed) 56 return; 57 isDisposed = true; 58 deregister(_event); 59 core.sys.posix.unistd.close(_kqueueFD); 60 } 61 62 private bool isDisposed = false; 63 64 override bool register(AbstractChannel channel) { 65 assert(channel !is null); 66 67 int err = -1; 68 if (channel.type == ChannelType.Timer) { 69 Kevent ev; 70 AbstractTimer timerChannel = cast(AbstractTimer) channel; 71 if (timerChannel is null) 72 return false; 73 size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond 74 EV_SET(&ev, timerChannel.handle, EVFILT_TIMER, 75 EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel); 76 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 77 } 78 else { 79 const int fd = channel.handle; 80 if (fd < 0) 81 return false; 82 Kevent[2] ev = void; 83 short read = EV_ADD | EV_ENABLE; 84 short write = EV_ADD | EV_ENABLE; 85 if (channel.hasFlag(ChannelFlag.ETMode)) { 86 read |= EV_CLEAR; 87 write |= EV_CLEAR; 88 } 89 EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel); 90 EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel); 91 if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write)) 92 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 93 else if (channel.hasFlag(ChannelFlag.Read)) 94 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 95 else if (channel.hasFlag(ChannelFlag.Write)) 96 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 97 } 98 if (err < 0) { 99 return false; 100 } 101 _event.setNext(channel); 102 return true; 103 } 104 105 override bool reregister(AbstractChannel channel) { 106 throw new LoopException("The Kqueue does not support reregister!"); 107 } 108 109 override bool deregister(AbstractChannel channel) { 110 assert(channel !is null); 111 const fd = channel.handle; 112 if (fd < 0) 113 return false; 114 115 int err = -1; 116 if (channel.type == ChannelType.Timer) { 117 Kevent ev; 118 AbstractTimer timerChannel = cast(AbstractTimer) channel; 119 if (timerChannel is null) 120 return false; 121 EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel); 122 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 123 } 124 else { 125 Kevent[2] ev = void; 126 EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel); 127 EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel); 128 if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write)) 129 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 130 else if (channel.hasFlag(ChannelFlag.Read)) 131 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 132 else if (channel.hasFlag(ChannelFlag.Write)) 133 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 134 } 135 if (err < 0) { 136 return false; 137 } 138 // channel.currtLoop = null; 139 channel.clear(); 140 return true; 141 } 142 143 override protected int doSelect(long timeout) { 144 timespec ts; 145 timespec *tsp; 146 // timeout is in milliseconds. Convert to struct timespec. 147 // timeout == -1 : wait forever : timespec timeout of NULL 148 // timeout == 0 : return immediately : timespec timeout of zero 149 if (timeout >= 0) { 150 // For some indeterminate reason kevent(2) has been found to fail with 151 // an EINVAL error for timeout values greater than or equal to 152 // 100000001000L. To avoid this problem, clamp the timeout arbitrarily 153 // to the maximum value of a 32-bit signed integer which is 154 // approximately 25 days in milliseconds. 155 const int timeoutMax = int.max; // Integer.MAX_VALUE 156 if (timeout > timeoutMax) { 157 timeout = timeoutMax; 158 } 159 ts.tv_sec = timeout / 1000; 160 ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec 161 tsp = &ts; 162 } else { 163 tsp = null; 164 } 165 166 // auto tspec = timespec(1, 1000 * 10); 167 Kevent[NUM_KEVENTS] events; 168 int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp); 169 foreach (i; 0 .. result) { 170 AbstractChannel channel = cast(AbstractChannel)(events[i].udata); 171 ushort eventFlags = events[i].flags; 172 // info("eventFlags: ", eventFlags); 173 174 if (eventFlags & EV_ERROR) { 175 warningf("channel[fd=%d] has a error.", channel.handle); 176 channel.close(); 177 continue; 178 } 179 if (eventFlags & EV_EOF) { 180 version (HUNT_DEBUG) infof("channel[fd=%d] closed", channel.handle); 181 channel.close(); 182 continue; 183 } 184 185 short filter = events[i].filter; 186 if(filter == EVFILT_TIMER) { 187 channel.onRead(); 188 continue; 189 } else if (filter == EVFILT_WRITE) { 190 AbstractSocketChannel wt = cast(AbstractSocketChannel) channel; 191 assert(wt !is null); 192 wt.onWriteDone(); 193 } else if (filter == EVFILT_READ) { 194 channel.onRead(); 195 } else { 196 warningf("Unhandled channel fileter: %d", filter); 197 } 198 } 199 return result; 200 } 201 202 private: 203 int _kqueueFD; 204 EventChannel _event; 205 } 206 207 /** 208 */ 209 class KqueueEventChannel : EventChannel { 210 this(Selector loop) { 211 super(loop); 212 setFlag(ChannelFlag.Read, true); 213 _pair = socketPair(); 214 _pair[0].blocking = false; 215 _pair[1].blocking = false; 216 this.handle = _pair[1].handle; 217 } 218 219 ~this() { 220 close(); 221 } 222 223 override void call() { 224 _pair[0].send("call"); 225 } 226 227 override void onRead() { 228 ubyte[128] data; 229 while (true) { 230 if (_pair[1].receive(data) <= 0) 231 break; 232 } 233 234 super.onRead(); 235 } 236 237 // mixin OverrideErro; 238 239 Socket[2] _pair; 240 } 241 242 enum : short { 243 EVFILT_READ = -1, 244 EVFILT_WRITE = -2, 245 EVFILT_AIO = -3, /* attached to aio requests */ 246 EVFILT_VNODE = -4, /* attached to vnodes */ 247 EVFILT_PROC = -5, /* attached to struct proc */ 248 EVFILT_SIGNAL = -6, /* attached to struct proc */ 249 EVFILT_TIMER = -7, /* timers */ 250 EVFILT_MACHPORT = -8, /* Mach portsets */ 251 EVFILT_FS = -9, /* filesystem events */ 252 EVFILT_USER = -10, /* User events */ 253 EVFILT_VM = -12, /* virtual memory events */ 254 EVFILT_SYSCOUNT = 11 255 } 256 257 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow { 258 *kevp = Kevent(args); 259 } 260 261 struct Kevent { 262 uintptr_t ident; /* identifier for this event */ 263 short filter; /* filter for event */ 264 ushort flags; 265 uint fflags; 266 intptr_t data; 267 void* udata; /* opaque user data identifier */ 268 } 269 270 enum { 271 /* actions */ 272 EV_ADD = 0x0001, /* add event to kq (implies enable) */ 273 EV_DELETE = 0x0002, /* delete event from kq */ 274 EV_ENABLE = 0x0004, /* enable event */ 275 EV_DISABLE = 0x0008, /* disable event (not reported) */ 276 277 /* flags */ 278 EV_ONESHOT = 0x0010, /* only report one occurrence */ 279 EV_CLEAR = 0x0020, /* clear event state after reporting */ 280 EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ 281 EV_DISPATCH = 0x0080, /* disable event after reporting */ 282 283 EV_SYSFLAGS = 0xF000, /* reserved by system */ 284 EV_FLAG1 = 0x2000, /* filter-specific flag */ 285 286 /* returned values */ 287 EV_EOF = 0x8000, /* EOF detected */ 288 EV_ERROR = 0x4000, /* error, data contains errno */ 289 290 } 291 292 enum { 293 /* 294 * data/hint flags/masks for EVFILT_USER, shared with userspace 295 * 296 * On input, the top two bits of fflags specifies how the lower twenty four 297 * bits should be applied to the stored value of fflags. 298 * 299 * On output, the top two bits will always be set to NOTE_FFNOP and the 300 * remaining twenty four bits will contain the stored fflags value. 301 */ 302 NOTE_FFNOP = 0x00000000, /* ignore input fflags */ 303 NOTE_FFAND = 0x40000000, /* AND fflags */ 304 NOTE_FFOR = 0x80000000, /* OR fflags */ 305 NOTE_FFCOPY = 0xc0000000, /* copy fflags */ 306 NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */ 307 NOTE_FFLAGSMASK = 0x00ffffff, 308 309 NOTE_TRIGGER = 0x01000000, /* Cause the event to be 310 triggered for output. */ 311 312 /* 313 * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace 314 */ 315 NOTE_LOWAT = 0x0001, /* low water mark */ 316 317 /* 318 * data/hint flags for EVFILT_VNODE, shared with userspace 319 */ 320 NOTE_DELETE = 0x0001, /* vnode was removed */ 321 NOTE_WRITE = 0x0002, /* data contents changed */ 322 NOTE_EXTEND = 0x0004, /* size increased */ 323 NOTE_ATTRIB = 0x0008, /* attributes changed */ 324 NOTE_LINK = 0x0010, /* link count changed */ 325 NOTE_RENAME = 0x0020, /* vnode was renamed */ 326 NOTE_REVOKE = 0x0040, /* vnode access was revoked */ 327 328 /* 329 * data/hint flags for EVFILT_PROC, shared with userspace 330 */ 331 NOTE_EXIT = 0x80000000, /* process exited */ 332 NOTE_FORK = 0x40000000, /* process forked */ 333 NOTE_EXEC = 0x20000000, /* process exec'd */ 334 NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */ 335 NOTE_PDATAMASK = 0x000fffff, /* mask for pid */ 336 337 /* additional flags for EVFILT_PROC */ 338 NOTE_TRACK = 0x00000001, /* follow across forks */ 339 NOTE_TRACKERR = 0x00000002, /* could not track child */ 340 NOTE_CHILD = 0x00000004, /* am a child process */ 341 342 } 343 344 extern (C) { 345 int kqueue() @nogc nothrow; 346 int kevent(int kq, const Kevent* changelist, int nchanges, 347 Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow; 348 } 349 350 enum SO_REUSEPORT = 0x0200;