1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.Kqueue;
13 
14 
15 // dfmt off
16 version(HAVE_KQUEUE):
17 // dfmt on
18 import hunt.event.selector.Selector;
19 import hunt.event.timer.Kqueue;
20 import hunt.Exceptions;
21 import hunt.io.channel;
22 import hunt.logging.ConsoleLogger;
23 import hunt.util.Common;
24 
25 import std.exception;
26 import std.socket;
27 import std.string;
28 
29 import core.time;
30 import core.stdc.string;
31 import core.stdc.errno;
32 import core.sys.posix.sys.types; // for ssize_t, size_t
33 import core.sys.posix.signal;
34 import core.sys.posix.netinet.tcp;
35 import core.sys.posix.netinet.in_;
36 import core.sys.posix.unistd;
37 import core.sys.posix.time;
38 import hunt.concurrency.TaskPool;
39 /**
40  * 
41  */
42 class AbstractSelector : Selector {
43     // kevent array size
44     enum int NUM_KEVENTS = 128;
45     private bool isDisposed = false;
46     private Kevent[NUM_KEVENTS] events;
47     private int _kqueueFD;
48     private EventChannel _eventChannel;
49     private TaskPool _taskPool;
50     this(size_t number, size_t divider,TaskPool pool = null, size_t maxChannels = 1500) {
51         _taskPool = pool;
52         super(number, divider, maxChannels);
53         _kqueueFD = kqueue();
54         _eventChannel = new KqueueEventChannel(this);
55         register(_eventChannel);
56     }
57 
58     ~this() @nogc {
59         // dispose();
60     }
61 
62     override void dispose() {
63         if (isDisposed)
64             return;
65 
66         version (HUNT_IO_DEBUG)
67             tracef("disposing selector[fd=%d]...", _kqueueFD);
68         isDisposed = true;
69         _eventChannel.close();
70         int r = core.sys.posix.unistd.close(_kqueueFD);
71         if(r != 0) {
72             version(HUNT_DEBUG) warningf("error: %d", r);
73         }
74 
75         super.dispose();
76     }
77 
78     override void onStop() {
79         version (HUNT_IO_DEBUG)
80             infof("Selector stopping. fd=%d", _kqueueFD);  
81                
82         if(!_eventChannel.isClosed()) {
83             _eventChannel.trigger();
84             // _eventChannel.onWrite();
85         }
86     }
87 
88     override bool register(AbstractChannel channel) {
89         assert(channel !is null);
90         
91         const int fd = channel.handle;
92         version (HUNT_IO_DEBUG)
93             tracef("register channel: fd=%d, type=%s", fd, channel.type);
94 
95         int err = -1;
96         if (channel.type == ChannelType.Timer) {
97             Kevent ev;
98             AbstractTimer timerChannel = cast(AbstractTimer) channel;
99             if (timerChannel is null)
100                 return false;
101             size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond
102             EV_SET(&ev, timerChannel.handle, EVFILT_TIMER,
103                     EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel);
104             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
105         }
106         else {
107             if (fd < 0)
108                 return false;
109             Kevent[2] ev = void;
110             short read = EV_ADD | EV_ENABLE;
111             short write = EV_ADD | EV_ENABLE;
112             if (channel.hasFlag(ChannelFlag.ETMode)) {
113                 read |= EV_CLEAR;
114                 write |= EV_CLEAR;
115             }
116             EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel);
117             EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel);
118             if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write))
119                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
120             else if (channel.hasFlag(ChannelFlag.Read))
121                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
122             else if (channel.hasFlag(ChannelFlag.Write))
123                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
124         }
125         if (err < 0) {
126             return false;
127         }
128         return true;
129     }
130 
131     override bool deregister(AbstractChannel channel) {
132         assert(channel !is null);
133         const fd = channel.handle;
134         if (fd < 0)
135             return false;
136 
137         int err = -1;
138         if (channel.type == ChannelType.Timer) {
139             Kevent ev;
140             AbstractTimer timerChannel = cast(AbstractTimer) channel;
141             if (timerChannel is null)
142                 return false;
143             EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel);
144             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
145         }
146         else {
147             Kevent[2] ev = void;
148             EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel);
149             EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel);
150             if (channel.hasFlag(ChannelFlag.Read) && channel.hasFlag(ChannelFlag.Write))
151                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
152             else if (channel.hasFlag(ChannelFlag.Read))
153                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
154             else if (channel.hasFlag(ChannelFlag.Write))
155                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
156         }
157         if (err < 0) {
158             return false;
159         }
160         // channel.currtLoop = null;
161         channel.clear();
162         return true;
163     }
164 
165     protected override int doSelect(long timeout) {
166         timespec ts;
167         timespec *tsp;
168         // timeout is in milliseconds. Convert to struct timespec.
169         // timeout == -1 : wait forever : timespec timeout of NULL
170         // timeout == 0  : return immediately : timespec timeout of zero
171         if (timeout >= 0) {
172             // For some indeterminate reason kevent(2) has been found to fail with
173             // an EINVAL error for timeout values greater than or equal to
174             // 100000001000L. To avoid this problem, clamp the timeout arbitrarily
175             // to the maximum value of a 32-bit signed integer which is
176             // approximately 25 days in milliseconds.
177             const int timeoutMax = int.max;
178             if (timeout > timeoutMax) {
179                 timeout = timeoutMax;
180             }
181             ts.tv_sec = timeout / 1000;
182             ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec
183             tsp = &ts;
184         } else {
185             tsp = null;
186         }        
187 
188         // auto tspec = timespec(1, 1000 * 10);
189         int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp);
190 
191         foreach (i; 0 .. result) {
192             AbstractChannel channel = cast(AbstractChannel)(events[i].udata);
193             ushort eventFlags = events[i].flags;
194             version (HUNT_IO_DEBUG)
195             infof("handling event: events=%d, fd=%d", eventFlags, channel.handle);
196 
197             if (eventFlags & EV_ERROR) {
198                 warningf("channel[fd=%d] has a error.", channel.handle);
199                 channel.close();
200                 continue;
201             }
202             if (eventFlags & EV_EOF) {
203                 version (HUNT_IO_DEBUG) infof("channel[fd=%d] closed", channel.handle);
204                 channel.close();
205                 continue;
206             }
207 
208             short filter = events[i].filter;
209             handeChannelEvent(channel, filter);
210         }
211         return result;
212     }
213 
214     private void handeChannelEvent(AbstractChannel channel, uint filter) {
215         version (HUNT_IO_DEBUG)
216         infof("handling event: events=%d, fd=%d", filter, channel.handle);
217         try {
218             if(filter == EVFILT_TIMER) {
219                 channel.onRead();
220             } else if (filter == EVFILT_WRITE) {
221                 channel.onWrite();
222             } else if (filter == EVFILT_READ) {
223                 channel.onRead();
224             } else {
225                 warningf("Unhandled channel fileter: %d", filter);
226             }
227         } catch(Exception e) {
228             errorf("error while handing channel: fd=%s, message=%s",
229                         channel.handle, e.msg);
230         }
231     }
232 }
233 
234 
235 enum : short {
236     EVFILT_READ = -1,
237     EVFILT_WRITE = -2,
238     EVFILT_AIO = -3, /* attached to aio requests */
239     EVFILT_VNODE = -4, /* attached to vnodes */
240     EVFILT_PROC = -5, /* attached to struct proc */
241     EVFILT_SIGNAL = -6, /* attached to struct proc */
242     EVFILT_TIMER = -7, /* timers */
243     EVFILT_MACHPORT = -8, /* Mach portsets */
244     EVFILT_FS = -9, /* filesystem events */
245     EVFILT_USER = -10, /* User events */
246     EVFILT_VM = -12, /* virtual memory events */
247     EVFILT_SYSCOUNT = 11
248 }
249 
250 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow {
251     *kevp = Kevent(args);
252 }
253 
254 struct Kevent {
255     uintptr_t ident; /* identifier for this event */
256     short filter; /* filter for event */
257     ushort flags;
258     uint fflags;
259     intptr_t data;
260     void* udata; /* opaque user data identifier */
261 }
262 
263 enum {
264     /* actions */
265     EV_ADD = 0x0001, /* add event to kq (implies enable) */
266     EV_DELETE = 0x0002, /* delete event from kq */
267     EV_ENABLE = 0x0004, /* enable event */
268     EV_DISABLE = 0x0008, /* disable event (not reported) */
269 
270     /* flags */
271     EV_ONESHOT = 0x0010, /* only report one occurrence */
272     EV_CLEAR = 0x0020, /* clear event state after reporting */
273     EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
274     EV_DISPATCH = 0x0080, /* disable event after reporting */
275 
276     EV_SYSFLAGS = 0xF000, /* reserved by system */
277     EV_FLAG1 = 0x2000, /* filter-specific flag */
278 
279     /* returned values */
280     EV_EOF = 0x8000, /* EOF detected */
281     EV_ERROR = 0x4000, /* error, data contains errno */
282 
283 }
284 
285 enum {
286     /*
287     * data/hint flags/masks for EVFILT_USER, shared with userspace
288     *
289     * On input, the top two bits of fflags specifies how the lower twenty four
290     * bits should be applied to the stored value of fflags.
291     *
292     * On output, the top two bits will always be set to NOTE_FFNOP and the
293     * remaining twenty four bits will contain the stored fflags value.
294     */
295     NOTE_FFNOP = 0x00000000, /* ignore input fflags */
296     NOTE_FFAND = 0x40000000, /* AND fflags */
297     NOTE_FFOR = 0x80000000, /* OR fflags */
298     NOTE_FFCOPY = 0xc0000000, /* copy fflags */
299     NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */
300     NOTE_FFLAGSMASK = 0x00ffffff,
301 
302     NOTE_TRIGGER = 0x01000000, /* Cause the event to be
303                                     triggered for output. */
304 
305     /*
306     * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
307     */
308     NOTE_LOWAT = 0x0001, /* low water mark */
309 
310     /*
311     * data/hint flags for EVFILT_VNODE, shared with userspace
312     */
313     NOTE_DELETE = 0x0001, /* vnode was removed */
314     NOTE_WRITE = 0x0002, /* data contents changed */
315     NOTE_EXTEND = 0x0004, /* size increased */
316     NOTE_ATTRIB = 0x0008, /* attributes changed */
317     NOTE_LINK = 0x0010, /* link count changed */
318     NOTE_RENAME = 0x0020, /* vnode was renamed */
319     NOTE_REVOKE = 0x0040, /* vnode access was revoked */
320 
321     /*
322     * data/hint flags for EVFILT_PROC, shared with userspace
323     */
324     NOTE_EXIT = 0x80000000, /* process exited */
325     NOTE_FORK = 0x40000000, /* process forked */
326     NOTE_EXEC = 0x20000000, /* process exec'd */
327     NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */
328     NOTE_PDATAMASK = 0x000fffff, /* mask for pid */
329 
330     /* additional flags for EVFILT_PROC */
331     NOTE_TRACK = 0x00000001, /* follow across forks */
332     NOTE_TRACKERR = 0x00000002, /* could not track child */
333     NOTE_CHILD = 0x00000004, /* am a child process */
334 
335 }
336 
337 extern (C) {
338     int kqueue() @nogc nothrow;
339     int kevent(int kq, const Kevent* changelist, int nchanges,
340             Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow;
341 }
342 
343 static if (CompilerHelper.isLessThan(2078)) {
344     enum SO_REUSEPORT = 0x0200;
345 }