1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.Epoll;
13 
14 // dfmt off
15 version(HAVE_EPOLL):
16 
17 // dfmt on
18 
19 import std.exception;
20 import std.socket;
21 import std.string;
22 
23 import core.sys.posix.sys.types;
24 import core.sys.posix.netinet.tcp;
25 import core.sys.posix.netinet.in_;
26 import core.sys.posix.unistd;
27 import core.stdc.string;
28 import core.stdc.errno;
29 import core.time;
30 import core.thread;
31 
32 import core.sys.posix.sys.resource;
33 import core.sys.posix.sys.time;
34 import core.sys.linux.epoll;
35 
36 import hunt.event.selector.Selector;
37 import hunt.Exceptions;
38 import hunt.io.channel;
39 import hunt.logging.ConsoleLogger;
40 import hunt.event.timer;
41 import hunt.system.Error;
42 import hunt.util.TaskPool;
43 
44 /* Max. theoretical number of file descriptors on system. */
45 __gshared size_t fdLimit = 0;
46 
47 shared static this() {
48     rlimit fileLimit;
49     getrlimit(RLIMIT_NOFILE, &fileLimit);
50     fdLimit = fileLimit.rlim_max;
51 }
52 
53 
54 /**
55  * 
56  */
57 class AbstractSelector : Selector {
58     enum int NUM_KEVENTS = 1024;
59     private int _epollFD;
60     private bool isDisposed = false;
61     private epoll_event[NUM_KEVENTS] events;
62     private EventChannel _eventChannel;
63     private TaskPool _taskPool;
64 
65     this(size_t id, size_t divider, TaskPool pool = null, size_t maxChannels = 1500) {
66         _taskPool = pool;
67         super(id, divider, maxChannels);
68 
69         // http://man7.org/linux/man-pages/man2/epoll_create.2.html
70         /*
71          * Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor.
72          * See the description of the O_CLOEXEC flag in open(2) for reasons why
73          * this may be useful.
74          */
75         _epollFD = epoll_create1(EPOLL_CLOEXEC);
76         if (_epollFD < 0)
77             throw new IOException("epoll_create failed");
78 
79         _eventChannel = new EpollEventChannel(this);
80         register(_eventChannel);
81     }
82 
83     ~this() @nogc {
84         // dispose();
85     }
86 
87     override void dispose() {
88         if (isDisposed)
89             return;
90 
91         version (HUNT_IO_DEBUG)
92             tracef("disposing selector[fd=%d]...", _epollFD);
93         isDisposed = true;
94         _eventChannel.close();
95         int r = core.sys.posix.unistd.close(_epollFD);
96         if(r != 0) {
97             version (HUNT_IO_DEBUG) warningf("error: %d", r);
98         }
99 
100         super.dispose();
101     }
102 
103     override void onStop() {
104         version (HUNT_IO_DEBUG)
105             infof("Selector stopping. fd=%d, id: %d", _epollFD, getId());
106 
107         if(!_eventChannel.isClosed()) {
108             _eventChannel.trigger();
109             // _eventChannel.onWrite();
110         }
111     }
112 
113     override bool register(AbstractChannel channel) {
114         super.register(channel);
115 
116         version (HUNT_IO_DEBUG)
117             tracef("register, channel(fd=%d, type=%s)", channel.handle, channel.type);
118 
119         // epoll_event e;
120 
121         // e.data.fd = infd;
122         // e.data.ptr = cast(void*) channel;
123         // e.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT;
124         // int s = epoll_ctl(_epollFD, EPOLL_CTL_ADD, infd, &e);
125         // if (s == -1) {
126         //     debug warningf("failed to register channel: fd=%d", infd);
127         //     return false;
128         // } else {
129         //     return true;
130         // }
131         if (epollCtl(channel, EPOLL_CTL_ADD)) {
132             return true;
133         } else {
134             debug warningf("failed to register channel: fd=%d", channel.handle);
135             return false;
136         }
137     }
138 
139     override bool deregister(AbstractChannel channel) {
140         scope(exit) {
141             super.deregister(channel);
142             version (HUNT_IO_DEBUG)
143                 tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type);
144         }
145 
146         if (epollCtl(channel, EPOLL_CTL_DEL)) {
147             return true;
148         } else {
149             warningf("deregister channel failed: fd=%d", fd);
150             return false;
151         }
152     }
153 
154     // override bool update(AbstractChannel channel) {
155     //     if (epollCtl(channel, EPOLL_CTL_MOD)) {
156     //         return true;
157     //     } else {
158     //         warningf("update channel failed: fd=%d", fd);
159     //         return false;
160     //     }
161     // }
162 
163     /**
164         timeout: in millisecond
165     */
166     protected override int doSelect(long timeout) {
167         int len = 0;
168 
169         if (timeout <= 0) { /* Indefinite or no wait */
170             do {
171                 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html
172                 // https://stackoverflow.com/questions/6870158/epoll-wait-fails-due-to-eintr-how-to-remedy-this/6870391#6870391
173                 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int) timeout);
174             } while ((len == -1) && (errno == EINTR));
175         } else { /* Bounded wait; bounded restarts */
176             len = iepoll(_epollFD, events.ptr, events.length, cast(int) timeout);
177         }
178 
179 version (HUNT_IO_DEBUG) {
180             warningf("thread: %s", Thread.getThis().name());
181 }
182 
183         if(_taskPool is null) {
184             foreach (i; 0 .. len) {
185                 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr);
186                 if (channel is null) {
187                     debug warningf("channel is null");
188                 } else {
189                     handeChannelEvent(channel, events[i].events);
190                 }
191             }
192         } else {  // using worker thread
193             foreach (i; 0 .. len) {
194                 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr);
195                 if (channel is null) {
196                     debug warningf("channel is null");
197                 } else {
198                     uint currentEvents = events[i].events;
199                     _taskPool.put(cast(int)channel.handle, makeTask(&handeChannelEvent, channel, currentEvents));
200                 }
201             }
202         }
203 
204         return len;
205     }
206 
207     private void handeChannelEvent(AbstractChannel channel, uint event) {
208         version (HUNT_IO_DEBUG) {
209             warningf("thread: %s", Thread.getThis().name());
210 
211             // Thread.sleep(300.msecs);
212             infof("handling event: selector=%d, channel=%d, events=%d, isReadable: %s, isWritable: %s, isClosed: %s", 
213                 this._epollFD, channel.handle, event, isReadable(event), isWritable(event), isClosed(event));
214         }
215 
216         try {
217             if (isClosed(event)) { // && errno != EINTR
218                 /* An error has occured on this fd, or the socket is not
219                     ready for reading (why were we notified then?) */
220                 version (HUNT_IO_DEBUG) {
221                     warningf("event=%d, isReadable: %s, isWritable: %s", 
222                         event, isReadable(event), isWritable(event));
223 
224                     if (isError(event)) {
225                         warningf("channel error: fd=%s, event=%d, errno=%d, message=%s",
226                                 channel.handle, event, errno, getErrorMessage(errno));
227                     } else {
228                         tracef("channel closed: fd=%d, errno=%d, message=%s",
229                                     channel.handle, errno, getErrorMessage(errno));
230                     }
231                 }
232                 
233                 // The remote connection broken abnormally, so the channel should be notified.
234                 if(isReadable(event)) {
235                     channel.onRead();
236                 }
237 
238                 // if(isWritable(event)) {
239                 //     channel.onWrite();
240                 // }
241 
242                 channel.close();
243             } else if (event == EPOLLIN) {
244                 version (HUNT_IO_DEBUG)
245                     tracef("channel read event: fd=%d", channel.handle);
246                 channel.onRead();
247             } else if (event == EPOLLOUT) {
248                 version (HUNT_IO_DEBUG)
249                     tracef("channel write event: fd=%d", channel.handle);
250                 channel.onWrite();
251             } else if (event == (EPOLLIN | EPOLLOUT)) {
252                 version (HUNT_IO_DEBUG)
253                     tracef("channel read and write: fd=%d", channel.handle);
254                 channel.onWrite();
255                 channel.onRead();
256             } else {
257                 debug warningf("Only read/write/close events can be handled, current event: %d", event);
258             }
259         } catch (Exception e) {
260             debug {
261                 errorf("error while handing channel: fd=%s, exception=%s, message=%s",
262                         channel.handle, typeid(e), e.msg);
263             }
264             version(HUNT_DEBUG) warning(e);
265         }
266     }
267 
268     private int iepoll(int epfd, epoll_event* events, int numfds, int timeout) {
269         long start, now;
270         int remaining = timeout;
271         timeval t;
272         long diff;
273 
274         gettimeofday(&t, null);
275         start = t.tv_sec * 1000 + t.tv_usec / 1000;
276 
277         for (;;) {
278             int res = epoll_wait(epfd, events, numfds, remaining);
279             if (res < 0 && errno == EINTR) {
280                 if (remaining >= 0) {
281                     gettimeofday(&t, null);
282                     now = t.tv_sec * 1000 + t.tv_usec / 1000;
283                     diff = now - start;
284                     remaining -= diff;
285                     if (diff < 0 || remaining <= 0) {
286                         return 0;
287                     }
288                     start = now;
289                 }
290             } else {
291                 return res;
292             }
293         }
294     }
295 
296     // https://blog.csdn.net/ljx0305/article/details/4065058
297     private static bool isError(uint events) nothrow {
298         return (events & EPOLLERR) != 0;
299     }
300 
301     private static bool isClosed(uint e) nothrow {
302         return (e & EPOLLERR) != 0 || (e & EPOLLHUP) != 0 || (e & EPOLLRDHUP) != 0
303                 || (!(e & EPOLLIN) && !(e & EPOLLOUT)) != 0;
304     }
305 
306     private static bool isReadable(uint events) nothrow {
307         return (events & EPOLLIN) != 0;
308     }
309 
310     private static bool isWritable(uint events) nothrow {
311         return (events & EPOLLOUT) != 0;
312     }
313 
314     private static buildEpollEvent(AbstractChannel channel, ref epoll_event ev) {
315         ev.data.ptr = cast(void*) channel;
316         // ev.data.fd = channel.handle;
317         ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP;
318         if (channel.hasFlag(ChannelFlag.Read))
319             ev.events |= EPOLLIN;
320         if (channel.hasFlag(ChannelFlag.Write))
321             ev.events |= EPOLLOUT;
322         // if (channel.hasFlag(ChannelFlag.OneShot))
323         //     ev.events |= EPOLLONESHOT;
324         if (channel.hasFlag(ChannelFlag.ETMode))
325             ev.events |= EPOLLET;
326         return ev;
327     }
328 
329     private bool epollCtl(AbstractChannel channel, int opcode) {
330         assert(channel !is null);
331         const fd = channel.handle;
332         assert(fd >= 0, "The channel.handle is not initialized!");
333 
334         epoll_event ev;
335         buildEpollEvent(channel, ev);
336         int res = 0;
337 
338         do {
339             res = epoll_ctl(_epollFD, opcode, fd, &ev);
340         }
341         while ((res == -1) && (errno == EINTR));
342 
343         /*
344          * A channel may be registered with several Selectors. When each Selector
345          * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
346          * list to remove the file descriptor from epoll. The "last" Selector will
347          * close the file descriptor which automatically unregisters it from each
348          * epoll descriptor. To avoid costly synchronization between Selectors we
349          * allow pending updates to be processed, ignoring errors. The errors are
350          * harmless as the last update for the file descriptor is guaranteed to
351          * be EPOLL_CTL_DEL.
352          */
353         if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
354             warning("epoll_ctl failed");
355             return false;
356         } else
357             return true;
358     }
359 }