1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.Epoll;
13 
14 // dfmt off
15 version(HAVE_EPOLL):
16 
17 // dfmt on
18 
19 import std.exception;
20 import std.socket;
21 import std.string;
22 
23 import core.time;
24 import core.stdc.string;
25 import core.stdc.errno;
26 import core.sys.posix.sys.types;
27 import core.sys.posix.netinet.tcp;
28 import core.sys.posix.netinet.in_;
29 import core.sys.posix.unistd;
30 
31 import core.sys.posix.sys.resource;
32 import core.sys.posix.sys.time;
33 import core.sys.linux.epoll;
34 
35 import hunt.event.selector.Selector;
36 import hunt.Exceptions;
37 import hunt.io.channel;
38 import hunt.logging.ConsoleLogger;
39 import hunt.event.timer;
40 import hunt.system.Error;
41 import hunt.concurrency.TaskPool;
42 
43 /* Max. theoretical number of file descriptors on system. */
44 __gshared size_t fdLimit = 0;
45 
46 shared static this() {
47     rlimit fileLimit;
48     getrlimit(RLIMIT_NOFILE, &fileLimit);
49     fdLimit = fileLimit.rlim_max;
50 }
51 
52 
53 /**
54  * 
55  */
56 class AbstractSelector : Selector {
57     enum int NUM_KEVENTS = 1024;
58     private int _epollFD;
59     private bool isDisposed = false;
60     private epoll_event[NUM_KEVENTS] events;
61     private EventChannel _eventChannel;
62     private TaskPool _taskPool;
63 
64     this(size_t id, size_t divider, TaskPool pool = null, size_t maxChannels = 1500) {
65         _taskPool = pool;
66         super(id, divider, maxChannels);
67 
68         // http://man7.org/linux/man-pages/man2/epoll_create.2.html
69         /*
70          * Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor.
71          * See the description of the O_CLOEXEC flag in open(2) for reasons why
72          * this may be useful.
73          */
74         _epollFD = epoll_create1(EPOLL_CLOEXEC);
75         if (_epollFD < 0)
76             throw new IOException("epoll_create failed");
77 
78         _eventChannel = new EpollEventChannel(this);
79         register(_eventChannel);
80     }
81 
82     ~this() @nogc {
83         // dispose();
84     }
85 
86     override void dispose() {
87         if (isDisposed)
88             return;
89 
90         version (HUNT_IO_DEBUG)
91             tracef("disposing selector[fd=%d]...", _epollFD);
92         isDisposed = true;
93         _eventChannel.close();
94         int r = core.sys.posix.unistd.close(_epollFD);
95         if(r != 0) {
96             version (HUNT_IO_DEBUG) warningf("error: %d", r);
97         }
98 
99         super.dispose();
100     }
101 
102     override void onStop() {
103         version (HUNT_IO_DEBUG)
104             infof("Selector stopping. fd=%d, id: %d", _epollFD, getId());
105 
106         if(!_eventChannel.isClosed()) {
107             _eventChannel.trigger();
108             // _eventChannel.onWrite();
109         }
110     }
111 
112     override bool register(AbstractChannel channel) {
113         super.register(channel);
114 
115         // epoll_event e;
116 
117         // e.data.fd = infd;
118         // e.data.ptr = cast(void*) channel;
119         // e.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT;
120         // int s = epoll_ctl(_epollFD, EPOLL_CTL_ADD, infd, &e);
121         // if (s == -1) {
122         //     debug warningf("failed to register channel: fd=%d", infd);
123         //     return false;
124         // } else {
125         //     return true;
126         // }
127         if (epollCtl(channel, EPOLL_CTL_ADD)) {
128             return true;
129         } else {
130             debug warningf("failed to register channel: fd=%d", channel.handle);
131             return false;
132         }
133     }
134 
135     override bool deregister(AbstractChannel channel) {
136         super.deregister(channel);
137 
138         if (epollCtl(channel, EPOLL_CTL_DEL)) {
139             return true;
140         } else {
141             warningf("deregister channel failed: fd=%d", fd);
142             return false;
143         }
144     }
145 
146     override bool update(AbstractChannel channel) {
147         if (epollCtl(channel, EPOLL_CTL_MOD)) {
148             return true;
149         } else {
150             warningf("update channel failed: fd=%d", fd);
151             return false;
152         }
153     }
154 
155     /**
156         timeout: in millisecond
157     */
158     protected override int doSelect(long timeout) {
159         int len = 0;
160 
161         if (timeout <= 0) { /* Indefinite or no wait */
162             do {
163                 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html
164                 // https://stackoverflow.com/questions/6870158/epoll-wait-fails-due-to-eintr-how-to-remedy-this/6870391#6870391
165                 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int) timeout);
166             } while ((len == -1) && (errno == EINTR));
167         } else { /* Bounded wait; bounded restarts */
168             len = iepoll(_epollFD, events.ptr, events.length, cast(int) timeout);
169         }
170 
171         if(_taskPool is null) {
172             foreach (i; 0 .. len) {
173                 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr);
174                 if (channel is null) {
175                     debug warningf("channel is null");
176                 } else {
177                     handeChannelEvent(channel, events[i].events);
178                 }
179             }
180         } else {  // using worker thread
181             foreach (i; 0 .. len) {
182                 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr);
183                 if (channel is null) {
184                     debug warningf("channel is null");
185                 } else {
186                     uint currentEvents = events[i].events;
187                     _taskPool.put(cast(int)channel.handle, makeTask(&handeChannelEvent, channel, currentEvents));
188                 }
189             }
190         }
191 
192         return len;
193     }
194 
195     private void handeChannelEvent(AbstractChannel channel, uint event) {
196         version (HUNT_IO_DEBUG)
197         infof("handling event: selector=%d, events=%d, channel=%d", this._epollFD, event, channel.handle);
198 
199         try {
200             if (isClosed(event)) { // && errno != EINTR
201                 /* An error has occured on this fd, or the socket is not
202                     ready for reading (why were we notified then?) */
203                 version (HUNT_IO_DEBUG) {
204                     if (isError(event)) {
205                         warningf("channel error: fd=%s, event=%d, errno=%d, message=%s",
206                                 channel.handle, event, errno, getErrorMessage(errno));
207                     } else {
208                         tracef("channel closed: fd=%d, errno=%d, message=%s",
209                                     channel.handle, errno, getErrorMessage(errno));
210                     }
211                 }
212                 // FIXME: Needing refactor or cleanup -@zxp at 2/28/2019, 3:25:24 PM
213                 // May be triggered twice for a channel, for example:
214                 // events=8197, fd=13
215                 // events=8221, fd=13
216                 // The remote connection broken abnormally, so the channel should be notified.
217                 // channel.onClose(); // More tests are needed
218                 channel.close();
219             } else if (event == EPOLLIN) {
220                 version (HUNT_IO_DEBUG)
221                     tracef("channel read event: fd=%d", channel.handle);
222                 channel.onRead();
223             } else if (event == EPOLLOUT) {
224                 version (HUNT_IO_DEBUG)
225                     tracef("channel write event: fd=%d", channel.handle);
226                 channel.onWrite();
227             } else if (event == (EPOLLIN | EPOLLOUT)) {
228                 version (HUNT_IO_DEBUG)
229                     tracef("channel read and write: fd=%d", channel.handle);
230                 channel.onWrite();
231                 channel.onRead();
232             } else {
233                 debug warningf("this thread only for read/write/close events, event: %d", event);
234             }
235         } catch (Exception e) {
236             debug {
237                 errorf("error while handing channel: fd=%s, exception=%s, message=%s",
238                         channel.handle, typeid(e), e.msg);
239             }
240             version(HUNT_DEBUG) warning(e);
241         }
242     }
243 
244     private int iepoll(int epfd, epoll_event* events, int numfds, int timeout) {
245         long start, now;
246         int remaining = timeout;
247         timeval t;
248         long diff;
249 
250         gettimeofday(&t, null);
251         start = t.tv_sec * 1000 + t.tv_usec / 1000;
252 
253         for (;;) {
254             int res = epoll_wait(epfd, events, numfds, remaining);
255             if (res < 0 && errno == EINTR) {
256                 if (remaining >= 0) {
257                     gettimeofday(&t, null);
258                     now = t.tv_sec * 1000 + t.tv_usec / 1000;
259                     diff = now - start;
260                     remaining -= diff;
261                     if (diff < 0 || remaining <= 0) {
262                         return 0;
263                     }
264                     start = now;
265                 }
266             } else {
267                 return res;
268             }
269         }
270     }
271 
272     // https://blog.csdn.net/ljx0305/article/details/4065058
273     private static bool isError(uint events) nothrow {
274         return (events & EPOLLERR) != 0;
275     }
276 
277     private static bool isClosed(uint e) nothrow {
278         return (e & EPOLLERR) != 0 || (e & EPOLLHUP) != 0 || (e & EPOLLRDHUP) != 0
279                 || (!(e & EPOLLIN) && !(e & EPOLLOUT)) != 0;
280     }
281 
282     private static bool isReadable(uint events) nothrow {
283         return (events & EPOLLIN) != 0;
284     }
285 
286     private static bool isWritable(uint events) nothrow {
287         return (events & EPOLLOUT) != 0;
288     }
289 
290     private static buildEpollEvent(AbstractChannel channel, ref epoll_event ev) {
291         ev.data.ptr = cast(void*) channel;
292         // ev.data.fd = channel.handle;
293         ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP;
294         if (channel.hasFlag(ChannelFlag.Read))
295             ev.events |= EPOLLIN;
296         if (channel.hasFlag(ChannelFlag.Write))
297             ev.events |= EPOLLOUT;
298         // if (channel.hasFlag(ChannelFlag.OneShot))
299         //     ev.events |= EPOLLONESHOT;
300         if (channel.hasFlag(ChannelFlag.ETMode))
301             ev.events |= EPOLLET;
302         return ev;
303     }
304 
305     private bool epollCtl(AbstractChannel channel, int opcode) {
306         assert(channel !is null);
307         const fd = channel.handle;
308         assert(fd >= 0, "The channel.handle is not initialized!");
309 
310         epoll_event ev;
311         buildEpollEvent(channel, ev);
312         int res = 0;
313 
314         do {
315             res = epoll_ctl(_epollFD, opcode, fd, &ev);
316         }
317         while ((res == -1) && (errno == EINTR));
318 
319         /*
320          * A channel may be registered with several Selectors. When each Selector
321          * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
322          * list to remove the file descriptor from epoll. The "last" Selector will
323          * close the file descriptor which automatically unregisters it from each
324          * epoll descriptor. To avoid costly synchronization between Selectors we
325          * allow pending updates to be processed, ignoring errors. The errors are
326          * harmless as the last update for the file descriptor is guaranteed to
327          * be EPOLL_CTL_DEL.
328          */
329         if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
330             warning("epoll_ctl failed");
331             return false;
332         } else
333             return true;
334     }
335 }