1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.Kqueue;
13 
14 
15 // dfmt off
16 version(HAVE_KQUEUE):
17 // dfmt on
18 import hunt.event.selector.Selector;
19 import hunt.event.timer.Kqueue;
20 import hunt.Exceptions;
21 import hunt.io.channel;
22 import hunt.logging.ConsoleLogger;
23 import hunt.util.CompilerHelper;
24 
25 import std.exception;
26 import std.socket;
27 import std.string;
28 
29 import core.time;
30 import core.stdc.string;
31 import core.stdc.errno;
32 import core.sys.posix.sys.types; // for ssize_t, size_t
33 import core.sys.posix.signal;
34 import core.sys.posix.netinet.tcp;
35 import core.sys.posix.netinet.in_;
36 import core.sys.posix.unistd;
37 import core.sys.posix.time;
38 import hunt.util.TaskPool;
39 /**
40  * 
41  */
42 class AbstractSelector : Selector {
43     // kevent array size
44     enum int NUM_KEVENTS = 128;
45     private bool isDisposed = false;
46     private Kevent[NUM_KEVENTS] events;
47     private int _kqueueFD;
48     private EventChannel _eventChannel;
49     // private TaskPool _taskPool;
50 
51     this(size_t number, size_t divider, TaskPool pool = null, size_t maxChannels = 1500) {
52         // _taskPool = pool;
53         super(number, divider, maxChannels);
54         _kqueueFD = kqueue();
55         _eventChannel = new KqueueEventChannel(this);
56         register(_eventChannel);
57     }
58 
59     ~this() @nogc {
60         // dispose();
61     }
62 
63     override void dispose() {
64         if (isDisposed)
65             return;
66 
67         version (HUNT_IO_DEBUG)
68             tracef("disposing selector[fd=%d]...", _kqueueFD);
69         isDisposed = true;
70         _eventChannel.close();
71         int r = core.sys.posix.unistd.close(_kqueueFD);
72         if(r != 0) {
73             version(HUNT_DEBUG) warningf("error: %d", r);
74         }
75 
76         super.dispose();
77     }
78 
79     override void onStop() {
80         version (HUNT_IO_DEBUG)
81             infof("Selector stopping. fd=%d", _kqueueFD);  
82                
83         if(!_eventChannel.isClosed()) {
84             _eventChannel.trigger();
85             // _eventChannel.onWrite();
86         }
87     }
88 
89     override bool register(AbstractChannel channel) {
90         super.register(channel);
91         
92         const int fd = channel.handle;
93         version (HUNT_IO_DEBUG)
94             tracef("register channel: fd=%d, type=%s", fd, channel.type);
95 
96         int err = -1;
97         if (channel.type == ChannelType.Timer) {
98             Kevent ev;
99             AbstractTimer timerChannel = cast(AbstractTimer) channel;
100             if (timerChannel is null)
101                 return false;
102             size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond
103             EV_SET(&ev, timerChannel.handle, EVFILT_TIMER,
104                     EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel);
105             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
106         }
107         else {
108             if (fd < 0)
109                 return false;
110             Kevent[2] ev = void;
111             short read = EV_ADD | EV_ENABLE;
112             short write = EV_ADD | EV_ENABLE;
113             if (channel.hasFlag(ChannelFlag.ETMode)) {
114                 read |= EV_CLEAR;
115                 write |= EV_CLEAR;
116             }
117             EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel);
118             EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel);
119             if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write))
120                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
121             else if (channel.hasFlag(ChannelFlag.Read))
122                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
123             else if (channel.hasFlag(ChannelFlag.Write))
124                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
125         }
126         if (err < 0) {
127             return false;
128         }
129         return true;
130     }
131 
132     override bool deregister(AbstractChannel channel) {
133         scope(exit) {
134             super.deregister(channel);
135             version (HUNT_IO_DEBUG)
136                 tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type);
137         }
138 
139         const fd = channel.handle;
140         if (fd < 0)
141             return false;
142 
143         int err = -1;
144         if (channel.type == ChannelType.Timer) {
145             Kevent ev;
146             AbstractTimer timerChannel = cast(AbstractTimer) channel;
147             if (timerChannel is null)
148                 return false;
149             EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel);
150             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
151         }
152         else {
153             Kevent[2] ev = void;
154             EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel);
155             EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel);
156             if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write))
157                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
158             else if (channel.hasFlag(ChannelFlag.Read))
159                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
160             else if (channel.hasFlag(ChannelFlag.Write))
161                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
162         }
163         if (err < 0) {
164             return false;
165         }
166         // channel.currtLoop = null;
167         channel.clear();
168         return true;
169     }
170 
171     protected override int doSelect(long timeout) {
172         // void* [] tmp;
173         // eventBuffer = tmp;
174         timespec ts;
175         timespec *tsp;
176         // timeout is in milliseconds. Convert to struct timespec.
177         // timeout == -1 : wait forever : timespec timeout of NULL
178         // timeout == 0  : return immediately : timespec timeout of zero
179         if (timeout >= 0) {
180             // For some indeterminate reason kevent(2) has been found to fail with
181             // an EINVAL error for timeout values greater than or equal to
182             // 100000001000L. To avoid this problem, clamp the timeout arbitrarily
183             // to the maximum value of a 32-bit signed integer which is
184             // approximately 25 days in milliseconds.
185             const int timeoutMax = int.max;
186             if (timeout > timeoutMax) {
187                 timeout = timeoutMax;
188             }
189             ts.tv_sec = timeout / 1000;
190             ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec
191             tsp = &ts;
192         } else {
193             tsp = null;
194         }        
195 
196         // auto tspec = timespec(1, 1000 * 10);
197         int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp);
198 
199         foreach (i; 0 .. result) {
200             AbstractChannel channel = cast(AbstractChannel)(events[i].udata);
201             ushort eventFlags = events[i].flags;
202             version (HUNT_IO_DEBUG)
203             infof("handling event: events=%d, fd=%d", eventFlags, channel.handle);
204 
205             if (eventFlags & EV_ERROR) {
206                 warningf("channel[fd=%d] has a error.", channel.handle);
207                 channel.close();
208                 // rmEventArray(channel);
209                 continue;
210             }
211             if (eventFlags & EV_EOF) {
212                 version (HUNT_IO_DEBUG) infof("channel[fd=%d] closed", channel.handle);
213                 channel.close();
214                 // rmEventArray(channel);
215                 continue;
216             }
217 
218             short filter = events[i].filter;
219             handeChannelEvent(channel, filter);
220         }
221         return result;
222     }
223 
224     private void handeChannelEvent(AbstractChannel channel, uint filter) {
225         version (HUNT_IO_DEBUG)
226         infof("handling event: events=%d, fd=%d", filter, channel.handle);
227         try {
228             if(filter == EVFILT_TIMER) {
229                 channel.onRead();
230             } else if (filter == EVFILT_WRITE) {
231                 channel.onWrite();
232             } else if (filter == EVFILT_READ) {
233                 channel.onRead();
234             } else {
235                 warningf("Unhandled channel fileter: %d", filter);
236             }
237         } catch(Exception e) {
238             errorf("error while handing channel: fd=%s, message=%s",
239                         channel.handle, e.msg);
240         }
241     }
242 }
243 
244 
245 enum : short {
246     EVFILT_READ = -1,
247     EVFILT_WRITE = -2,
248     EVFILT_AIO = -3, /* attached to aio requests */
249     EVFILT_VNODE = -4, /* attached to vnodes */
250     EVFILT_PROC = -5, /* attached to struct proc */
251     EVFILT_SIGNAL = -6, /* attached to struct proc */
252     EVFILT_TIMER = -7, /* timers */
253     EVFILT_MACHPORT = -8, /* Mach portsets */
254     EVFILT_FS = -9, /* filesystem events */
255     EVFILT_USER = -10, /* User events */
256     EVFILT_VM = -12, /* virtual memory events */
257     EVFILT_SYSCOUNT = 11
258 }
259 
260 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow {
261     *kevp = Kevent(args);
262 }
263 
264 struct Kevent {
265     uintptr_t ident; /* identifier for this event */
266     short filter; /* filter for event */
267     ushort flags;
268     uint fflags;
269     intptr_t data;
270     void* udata; /* opaque user data identifier */
271 }
272 
273 enum {
274     /* actions */
275     EV_ADD = 0x0001, /* add event to kq (implies enable) */
276     EV_DELETE = 0x0002, /* delete event from kq */
277     EV_ENABLE = 0x0004, /* enable event */
278     EV_DISABLE = 0x0008, /* disable event (not reported) */
279 
280     /* flags */
281     EV_ONESHOT = 0x0010, /* only report one occurrence */
282     EV_CLEAR = 0x0020, /* clear event state after reporting */
283     EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
284     EV_DISPATCH = 0x0080, /* disable event after reporting */
285 
286     EV_SYSFLAGS = 0xF000, /* reserved by system */
287     EV_FLAG1 = 0x2000, /* filter-specific flag */
288 
289     /* returned values */
290     EV_EOF = 0x8000, /* EOF detected */
291     EV_ERROR = 0x4000, /* error, data contains errno */
292 
293 }
294 
295 enum {
296     /*
297     * data/hint flags/masks for EVFILT_USER, shared with userspace
298     *
299     * On input, the top two bits of fflags specifies how the lower twenty four
300     * bits should be applied to the stored value of fflags.
301     *
302     * On output, the top two bits will always be set to NOTE_FFNOP and the
303     * remaining twenty four bits will contain the stored fflags value.
304     */
305     NOTE_FFNOP = 0x00000000, /* ignore input fflags */
306     NOTE_FFAND = 0x40000000, /* AND fflags */
307     NOTE_FFOR = 0x80000000, /* OR fflags */
308     NOTE_FFCOPY = 0xc0000000, /* copy fflags */
309     NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */
310     NOTE_FFLAGSMASK = 0x00ffffff,
311 
312     NOTE_TRIGGER = 0x01000000, /* Cause the event to be
313                                     triggered for output. */
314 
315     /*
316     * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
317     */
318     NOTE_LOWAT = 0x0001, /* low water mark */
319 
320     /*
321     * data/hint flags for EVFILT_VNODE, shared with userspace
322     */
323     NOTE_DELETE = 0x0001, /* vnode was removed */
324     NOTE_WRITE = 0x0002, /* data contents changed */
325     NOTE_EXTEND = 0x0004, /* size increased */
326     NOTE_ATTRIB = 0x0008, /* attributes changed */
327     NOTE_LINK = 0x0010, /* link count changed */
328     NOTE_RENAME = 0x0020, /* vnode was renamed */
329     NOTE_REVOKE = 0x0040, /* vnode access was revoked */
330 
331     /*
332     * data/hint flags for EVFILT_PROC, shared with userspace
333     */
334     NOTE_EXIT = 0x80000000, /* process exited */
335     NOTE_FORK = 0x40000000, /* process forked */
336     NOTE_EXEC = 0x20000000, /* process exec'd */
337     NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */
338     NOTE_PDATAMASK = 0x000fffff, /* mask for pid */
339 
340     /* additional flags for EVFILT_PROC */
341     NOTE_TRACK = 0x00000001, /* follow across forks */
342     NOTE_TRACKERR = 0x00000002, /* could not track child */
343     NOTE_CHILD = 0x00000004, /* am a child process */
344 
345 }
346 
347 extern (C) {
348     int kqueue() @nogc nothrow;
349     int kevent(int kq, const Kevent* changelist, int nchanges,
350             Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow;
351 }
352 
353 static if (CompilerHelper.isLessThan(2078)) {
354     enum SO_REUSEPORT = 0x0200;
355 }