1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.Kqueue;
13 
14 
15 // dfmt off
16 version(HAVE_KQUEUE):
17 // dfmt on
18 import hunt.event.selector.Selector;
19 import hunt.event.timer.Kqueue;
20 import hunt.Exceptions;
21 import hunt.io.channel;
22 import hunt.logging;
23 import hunt.util.CompilerHelper;
24 
25 import std.exception;
26 import std.socket;
27 import std.string;
28 
29 import core.time;
30 import core.stdc.string;
31 import core.stdc.errno;
32 import core.sys.posix.sys.types; // for ssize_t, size_t
33 import core.sys.posix.signal;
34 import core.sys.posix.netinet.tcp;
35 import core.sys.posix.netinet.in_;
36 import core.sys.posix.unistd;
37 import core.sys.posix.time;
38 import hunt.util.worker;
39 
40 /**
41  * 
42  */
43 class AbstractSelector : Selector {
44     // kevent array size
45     enum int NUM_KEVENTS = 128;
46     private bool isDisposed = false;
47     private Kevent[NUM_KEVENTS] events;
48     private int _kqueueFD;
49     private EventChannel _eventChannel;
50 
51     this(size_t number, size_t divider, Worker worker = null, size_t maxChannels = 1500) {
52         super(number, divider, worker, maxChannels);
53         _kqueueFD = kqueue();
54         _eventChannel = new KqueueEventChannel(this);
55         register(_eventChannel);
56     }
57 
58     ~this() @nogc {
59         // dispose();
60     }
61 
62     override void dispose() {
63         if (isDisposed)
64             return;
65 
66         version (HUNT_IO_DEBUG)
67             tracef("disposing selector[fd=%d]...", _kqueueFD);
68         isDisposed = true;
69         _eventChannel.close();
70         int r = core.sys.posix.unistd.close(_kqueueFD);
71         if(r != 0) {
72             version(HUNT_DEBUG) warningf("error: %d", r);
73         }
74 
75         super.dispose();
76     }
77 
78     override void onStop() {
79         version (HUNT_IO_DEBUG)
80             infof("Selector stopping. fd=%d", _kqueueFD);  
81                
82         if(!_eventChannel.isClosed()) {
83             _eventChannel.trigger();
84             // _eventChannel.onWrite();
85         }
86     }
87 
88     override bool register(AbstractChannel channel) {
89         super.register(channel);
90         
91         const int fd = channel.handle;
92         version (HUNT_IO_DEBUG)
93             tracef("register channel: fd=%d, type=%s", fd, channel.type);
94 
95         int err = -1;
96         if (channel.type == ChannelType.Timer) {
97             Kevent ev;
98             AbstractTimer timerChannel = cast(AbstractTimer) channel;
99             if (timerChannel is null)
100                 return false;
101             size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond
102             EV_SET(&ev, timerChannel.handle, EVFILT_TIMER,
103                     EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel);
104             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
105         }
106         else {
107             if (fd < 0)
108                 return false;
109             Kevent[2] ev = void;
110             short read = EV_ADD | EV_ENABLE;
111             short write = EV_ADD | EV_ENABLE;
112             if (channel.hasFlag(ChannelFlag.ETMode)) {
113                 read |= EV_CLEAR;
114                 write |= EV_CLEAR;
115             }
116             EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel);
117             EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel);
118             if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write))
119                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
120             else if (channel.hasFlag(ChannelFlag.Read))
121                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
122             else if (channel.hasFlag(ChannelFlag.Write))
123                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
124         }
125         if (err < 0) {
126             return false;
127         }
128         return true;
129     }
130 
131     override bool deregister(AbstractChannel channel) {
132         scope(exit) {
133             super.deregister(channel);
134             version (HUNT_IO_DEBUG)
135                 tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type);
136         }
137         
138         const fd = channel.handle;
139         if (fd < 0)
140             return false;
141 
142         int err = -1;
143         if (channel.type == ChannelType.Timer) {
144             Kevent ev;
145             AbstractTimer timerChannel = cast(AbstractTimer) channel;
146             if (timerChannel is null)
147                 return false;
148             EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel);
149             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
150         }
151         else {
152             Kevent[2] ev = void;
153             EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel);
154             EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel);
155             if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write))
156                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
157             else if (channel.hasFlag(ChannelFlag.Read))
158                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
159             else if (channel.hasFlag(ChannelFlag.Write))
160                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
161         }
162         if (err < 0) {
163             return false;
164         }
165         // channel.currtLoop = null;
166         channel.clear();
167         return true;
168     }
169 
170     protected override int doSelect(long timeout) {
171         // void* [] tmp;
172         // eventBuffer = tmp;
173         timespec ts;
174         timespec *tsp;
175         // timeout is in milliseconds. Convert to struct timespec.
176         // timeout == -1 : wait forever : timespec timeout of NULL
177         // timeout == 0  : return immediately : timespec timeout of zero
178         if (timeout >= 0) {
179             // For some indeterminate reason kevent(2) has been found to fail with
180             // an EINVAL error for timeout values greater than or equal to
181             // 100000001000L. To avoid this problem, clamp the timeout arbitrarily
182             // to the maximum value of a 32-bit signed integer which is
183             // approximately 25 days in milliseconds.
184             const int timeoutMax = int.max;
185             if (timeout > timeoutMax) {
186                 timeout = timeoutMax;
187             }
188             ts.tv_sec = timeout / 1000;
189             ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec
190             tsp = &ts;
191         } else {
192             tsp = null;
193         }        
194 
195         // auto tspec = timespec(1, 1000 * 10);
196         int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp);
197 
198         foreach (i; 0 .. result) {
199             AbstractChannel channel = cast(AbstractChannel)(events[i].udata);
200             ushort eventFlags = events[i].flags;
201             version (HUNT_IO_DEBUG)
202             infof("handling event: events=%d, fd=%d", eventFlags, channel.handle);
203 
204             if (eventFlags & EV_ERROR) {
205                 warningf("channel[fd=%d] has a error.", channel.handle);
206                 channel.close();
207                 continue;
208             }
209             if (eventFlags & EV_EOF) {
210                 version (HUNT_IO_DEBUG) infof("channel[fd=%d] closed", channel.handle);
211                 channel.close();
212                 continue;
213             }
214 
215             short filter = events[i].filter;
216             handeChannelEvent(channel, filter);
217         }
218         return result;
219     }
220 
221     private void handeChannelEvent(AbstractChannel channel, uint filter) {
222         version (HUNT_IO_DEBUG)
223         infof("handling event: events=%d, fd=%d", filter, channel.handle);
224 
225         try {
226             if(filter == EVFILT_TIMER) {
227                 channel.onRead();
228             } else if (filter == EVFILT_WRITE) {
229                 channel.onWrite();
230             } else if (filter == EVFILT_READ) {
231                 channel.onRead();
232             } else {
233                 warningf("Unhandled channel fileter: %d", filter);
234             }
235         } catch(Exception e) {
236             errorf("error while handing channel: fd=%s, message=%s",
237                         channel.handle, e.msg);
238         }
239     }
240 }
241 
242 
243 enum : short {
244     EVFILT_READ = -1,
245     EVFILT_WRITE = -2,
246     EVFILT_AIO = -3, /* attached to aio requests */
247     EVFILT_VNODE = -4, /* attached to vnodes */
248     EVFILT_PROC = -5, /* attached to struct proc */
249     EVFILT_SIGNAL = -6, /* attached to struct proc */
250     EVFILT_TIMER = -7, /* timers */
251     EVFILT_MACHPORT = -8, /* Mach portsets */
252     EVFILT_FS = -9, /* filesystem events */
253     EVFILT_USER = -10, /* User events */
254     EVFILT_VM = -12, /* virtual memory events */
255     EVFILT_SYSCOUNT = 11
256 }
257 
258 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow {
259     *kevp = Kevent(args);
260 }
261 
262 struct Kevent {
263     uintptr_t ident; /* identifier for this event */
264     short filter; /* filter for event */
265     ushort flags;
266     uint fflags;
267     intptr_t data;
268     void* udata; /* opaque user data identifier */
269 }
270 
271 enum {
272     /* actions */
273     EV_ADD = 0x0001, /* add event to kq (implies enable) */
274     EV_DELETE = 0x0002, /* delete event from kq */
275     EV_ENABLE = 0x0004, /* enable event */
276     EV_DISABLE = 0x0008, /* disable event (not reported) */
277 
278     /* flags */
279     EV_ONESHOT = 0x0010, /* only report one occurrence */
280     EV_CLEAR = 0x0020, /* clear event state after reporting */
281     EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
282     EV_DISPATCH = 0x0080, /* disable event after reporting */
283 
284     EV_SYSFLAGS = 0xF000, /* reserved by system */
285     EV_FLAG1 = 0x2000, /* filter-specific flag */
286 
287     /* returned values */
288     EV_EOF = 0x8000, /* EOF detected */
289     EV_ERROR = 0x4000, /* error, data contains errno */
290 
291 }
292 
293 enum {
294     /*
295     * data/hint flags/masks for EVFILT_USER, shared with userspace
296     *
297     * On input, the top two bits of fflags specifies how the lower twenty four
298     * bits should be applied to the stored value of fflags.
299     *
300     * On output, the top two bits will always be set to NOTE_FFNOP and the
301     * remaining twenty four bits will contain the stored fflags value.
302     */
303     NOTE_FFNOP = 0x00000000, /* ignore input fflags */
304     NOTE_FFAND = 0x40000000, /* AND fflags */
305     NOTE_FFOR = 0x80000000, /* OR fflags */
306     NOTE_FFCOPY = 0xc0000000, /* copy fflags */
307     NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */
308     NOTE_FFLAGSMASK = 0x00ffffff,
309 
310     NOTE_TRIGGER = 0x01000000, /* Cause the event to be
311                                     triggered for output. */
312 
313     /*
314     * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
315     */
316     NOTE_LOWAT = 0x0001, /* low water mark */
317 
318     /*
319     * data/hint flags for EVFILT_VNODE, shared with userspace
320     */
321     NOTE_DELETE = 0x0001, /* vnode was removed */
322     NOTE_WRITE = 0x0002, /* data contents changed */
323     NOTE_EXTEND = 0x0004, /* size increased */
324     NOTE_ATTRIB = 0x0008, /* attributes changed */
325     NOTE_LINK = 0x0010, /* link count changed */
326     NOTE_RENAME = 0x0020, /* vnode was renamed */
327     NOTE_REVOKE = 0x0040, /* vnode access was revoked */
328 
329     /*
330     * data/hint flags for EVFILT_PROC, shared with userspace
331     */
332     NOTE_EXIT = 0x80000000, /* process exited */
333     NOTE_FORK = 0x40000000, /* process forked */
334     NOTE_EXEC = 0x20000000, /* process exec'd */
335     NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */
336     NOTE_PDATAMASK = 0x000fffff, /* mask for pid */
337 
338     /* additional flags for EVFILT_PROC */
339     NOTE_TRACK = 0x00000001, /* follow across forks */
340     NOTE_TRACKERR = 0x00000002, /* could not track child */
341     NOTE_CHILD = 0x00000004, /* am a child process */
342 
343 }
344 
345 extern (C) {
346     int kqueue() @nogc nothrow;
347     int kevent(int kq, const Kevent* changelist, int nchanges,
348             Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow;
349 }
350 
351 static if (CompilerHelper.isLessThan(2078)) {
352     enum SO_REUSEPORT = 0x0200;
353 }