1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.Kqueue;
13 
14 
15 // dfmt off
16 version(HAVE_KQUEUE):
17 // dfmt on
18 
19 import hunt.io.socket.Common;
20 import hunt.event.timer.Kqueue;
21 import hunt.Exceptions;
22 import hunt.logging;
23 
24 import std.exception;
25 import std.socket;
26 import std.string;
27 
28 import core.time;
29 import core.stdc.string;
30 import core.stdc.errno;
31 import core.sys.posix.sys.types; // for ssize_t, size_t
32 import core.sys.posix.signal;
33 import core.sys.posix.netinet.tcp;
34 import core.sys.posix.netinet.in_;
35 import core.sys.posix.unistd;
36 import core.sys.posix.time;
37 
38 /**
39 */
40 class AbstractSelector : Selector {
41     // kevent array size
42     enum int NUM_KEVENTS = 128;
43 
44     this() {
45         _kqueueFD = kqueue();
46         _event = new KqueueEventChannel(this);
47         register(_event);
48     }
49 
50     ~this() {
51         dispose();
52     }
53 
54     override void dispose() {
55         if (isDisposed)
56             return;
57         isDisposed = true;
58         deregister(_event);
59         core.sys.posix.unistd.close(_kqueueFD);
60     }
61 
62     private bool isDisposed = false;
63 
64     override bool register(AbstractChannel channel) {
65         assert(channel !is null);
66 
67         int err = -1;
68         if (channel.type == ChannelType.Timer) {
69             Kevent ev;
70             AbstractTimer timerChannel = cast(AbstractTimer) channel;
71             if (timerChannel is null)
72                 return false;
73             size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond
74             EV_SET(&ev, timerChannel.handle, EVFILT_TIMER,
75                     EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel);
76             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
77         }
78         else {
79             const int fd = channel.handle;
80             if (fd < 0)
81                 return false;
82             Kevent[2] ev = void;
83             short read = EV_ADD | EV_ENABLE;
84             short write = EV_ADD | EV_ENABLE;
85             if (channel.hasFlag(ChannelFlag.ETMode)) {
86                 read |= EV_CLEAR;
87                 write |= EV_CLEAR;
88             }
89             EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel);
90             EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel);
91             if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write))
92                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
93             else if (channel.hasFlag(ChannelFlag.Read))
94                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
95             else if (channel.hasFlag(ChannelFlag.Write))
96                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
97         }
98         if (err < 0) {
99             return false;
100         }
101         _event.setNext(channel);
102         return true;
103     }
104 
105     override bool reregister(AbstractChannel channel) {
106         throw new LoopException("The Kqueue does not support reregister!");
107     }
108 
109     override bool deregister(AbstractChannel channel) {
110         assert(channel !is null);
111         const fd = channel.handle;
112         if (fd < 0)
113             return false;
114 
115         int err = -1;
116         if (channel.type == ChannelType.Timer) {
117             Kevent ev;
118             AbstractTimer timerChannel = cast(AbstractTimer) channel;
119             if (timerChannel is null)
120                 return false;
121             EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel);
122             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
123         }
124         else {
125             Kevent[2] ev = void;
126             EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel);
127             EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel);
128             if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write))
129                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
130             else if (channel.hasFlag(ChannelFlag.Read))
131                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
132             else if (channel.hasFlag(ChannelFlag.Write))
133                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
134         }
135         if (err < 0) {
136             return false;
137         }
138         // channel.currtLoop = null;
139         channel.clear();
140         return true;
141     }
142 
143     override protected int doSelect(long timeout) {
144         timespec ts;
145         timespec *tsp;
146         // timeout is in milliseconds. Convert to struct timespec.
147         // timeout == -1 : wait forever : timespec timeout of NULL
148         // timeout == 0  : return immediately : timespec timeout of zero
149         if (timeout >= 0) {
150             // For some indeterminate reason kevent(2) has been found to fail with
151             // an EINVAL error for timeout values greater than or equal to
152             // 100000001000L. To avoid this problem, clamp the timeout arbitrarily
153             // to the maximum value of a 32-bit signed integer which is
154             // approximately 25 days in milliseconds.
155             const int timeoutMax = int.max; // Integer.MAX_VALUE
156             if (timeout > timeoutMax) {
157                 timeout = timeoutMax;
158             }
159             ts.tv_sec = timeout / 1000;
160             ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec
161             tsp = &ts;
162         } else {
163             tsp = null;
164         }        
165 
166         // auto tspec = timespec(1, 1000 * 10);
167         Kevent[NUM_KEVENTS] events;
168         int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp);
169         foreach (i; 0 .. result) {
170             AbstractChannel channel = cast(AbstractChannel)(events[i].udata);
171             ushort eventFlags = events[i].flags;
172             // info("eventFlags: ", eventFlags);
173 
174             if (eventFlags & EV_ERROR) {
175                 warningf("channel[fd=%d] has a error.", channel.handle);
176                 channel.close();
177                 continue;
178             }
179             if (eventFlags & EV_EOF) {
180                 version (HUNT_DEBUG) infof("channel[fd=%d] closed", channel.handle);
181                 channel.close();
182                 continue;
183             }
184 
185             short filter = events[i].filter;
186             if(filter == EVFILT_TIMER) {
187                 channel.onRead();
188                 continue;
189             } else if (filter == EVFILT_WRITE) {
190                 AbstractSocketChannel wt = cast(AbstractSocketChannel) channel;
191                 assert(wt !is null);
192                 wt.onWriteDone();
193             } else if (filter == EVFILT_READ) {
194                 channel.onRead();
195             } else {
196                 warningf("Unhandled channel fileter: %d", filter);
197             }
198         }
199         return result;
200     }
201 
202 private:
203     int _kqueueFD;
204     EventChannel _event;
205 }
206 
207 /**
208 */
209 class KqueueEventChannel : EventChannel {
210     this(Selector loop) {
211         super(loop);
212         setFlag(ChannelFlag.Read, true);
213         _pair = socketPair();
214         _pair[0].blocking = false;
215         _pair[1].blocking = false;
216         this.handle = _pair[1].handle;
217     }
218 
219     ~this() {
220         close();
221     }
222 
223     override void call() {
224         _pair[0].send("call");
225     }
226 
227     override void onRead() {
228         ubyte[128] data;
229         while (true) {
230             if (_pair[1].receive(data) <= 0)
231                 break;
232         }
233 
234         super.onRead();
235     }
236 
237     // mixin OverrideErro;
238 
239     Socket[2] _pair;
240 }
241 
242 enum : short {
243     EVFILT_READ = -1,
244     EVFILT_WRITE = -2,
245     EVFILT_AIO = -3, /* attached to aio requests */
246     EVFILT_VNODE = -4, /* attached to vnodes */
247     EVFILT_PROC = -5, /* attached to struct proc */
248     EVFILT_SIGNAL = -6, /* attached to struct proc */
249     EVFILT_TIMER = -7, /* timers */
250     EVFILT_MACHPORT = -8, /* Mach portsets */
251     EVFILT_FS = -9, /* filesystem events */
252     EVFILT_USER = -10, /* User events */
253     EVFILT_VM = -12, /* virtual memory events */
254     EVFILT_SYSCOUNT = 11
255 }
256 
257 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow {
258     *kevp = Kevent(args);
259 }
260 
261 struct Kevent {
262     uintptr_t ident; /* identifier for this event */
263     short filter; /* filter for event */
264     ushort flags;
265     uint fflags;
266     intptr_t data;
267     void* udata; /* opaque user data identifier */
268 }
269 
270 enum {
271     /* actions */
272     EV_ADD = 0x0001, /* add event to kq (implies enable) */
273     EV_DELETE = 0x0002, /* delete event from kq */
274     EV_ENABLE = 0x0004, /* enable event */
275     EV_DISABLE = 0x0008, /* disable event (not reported) */
276 
277     /* flags */
278     EV_ONESHOT = 0x0010, /* only report one occurrence */
279     EV_CLEAR = 0x0020, /* clear event state after reporting */
280     EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
281     EV_DISPATCH = 0x0080, /* disable event after reporting */
282 
283     EV_SYSFLAGS = 0xF000, /* reserved by system */
284     EV_FLAG1 = 0x2000, /* filter-specific flag */
285 
286     /* returned values */
287     EV_EOF = 0x8000, /* EOF detected */
288     EV_ERROR = 0x4000, /* error, data contains errno */
289 
290 }
291 
292 enum {
293     /*
294     * data/hint flags/masks for EVFILT_USER, shared with userspace
295     *
296     * On input, the top two bits of fflags specifies how the lower twenty four
297     * bits should be applied to the stored value of fflags.
298     *
299     * On output, the top two bits will always be set to NOTE_FFNOP and the
300     * remaining twenty four bits will contain the stored fflags value.
301     */
302     NOTE_FFNOP = 0x00000000, /* ignore input fflags */
303     NOTE_FFAND = 0x40000000, /* AND fflags */
304     NOTE_FFOR = 0x80000000, /* OR fflags */
305     NOTE_FFCOPY = 0xc0000000, /* copy fflags */
306     NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */
307     NOTE_FFLAGSMASK = 0x00ffffff,
308 
309     NOTE_TRIGGER = 0x01000000, /* Cause the event to be
310                                     triggered for output. */
311 
312     /*
313     * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
314     */
315     NOTE_LOWAT = 0x0001, /* low water mark */
316 
317     /*
318     * data/hint flags for EVFILT_VNODE, shared with userspace
319     */
320     NOTE_DELETE = 0x0001, /* vnode was removed */
321     NOTE_WRITE = 0x0002, /* data contents changed */
322     NOTE_EXTEND = 0x0004, /* size increased */
323     NOTE_ATTRIB = 0x0008, /* attributes changed */
324     NOTE_LINK = 0x0010, /* link count changed */
325     NOTE_RENAME = 0x0020, /* vnode was renamed */
326     NOTE_REVOKE = 0x0040, /* vnode access was revoked */
327 
328     /*
329     * data/hint flags for EVFILT_PROC, shared with userspace
330     */
331     NOTE_EXIT = 0x80000000, /* process exited */
332     NOTE_FORK = 0x40000000, /* process forked */
333     NOTE_EXEC = 0x20000000, /* process exec'd */
334     NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */
335     NOTE_PDATAMASK = 0x000fffff, /* mask for pid */
336 
337     /* additional flags for EVFILT_PROC */
338     NOTE_TRACK = 0x00000001, /* follow across forks */
339     NOTE_TRACKERR = 0x00000002, /* could not track child */
340     NOTE_CHILD = 0x00000004, /* am a child process */
341 
342 }
343 
344 extern (C) {
345     int kqueue() @nogc nothrow;
346     int kevent(int kq, const Kevent* changelist, int nchanges,
347             Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow;
348 }
349 
350 enum SO_REUSEPORT = 0x0200;