1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.Epoll;
13 
14 // dfmt off
15 version(HAVE_EPOLL):
16 
17 // dfmt on
18 
19 import std.exception;
20 import std.socket;
21 import std.string;
22 
23 import core.sys.posix.sys.types;
24 import core.sys.posix.netinet.tcp;
25 import core.sys.posix.netinet.in_;
26 import core.sys.posix.unistd;
27 import core.stdc.string;
28 import core.stdc.errno;
29 import core.time;
30 import core.thread;
31 
32 import core.sys.posix.sys.resource;
33 import core.sys.posix.sys.time;
34 import core.sys.linux.epoll;
35 
36 import hunt.event.selector.Selector;
37 import hunt.Exceptions;
38 import hunt.io.channel;
39 import hunt.logging;
40 import hunt.event.timer;
41 import hunt.system.Error;
42 import hunt.util.worker;
43 
44 /* Max. theoretical number of file descriptors on system. */
45 __gshared size_t fdLimit = 0;
46 
47 shared static this() {
48     rlimit fileLimit;
49     getrlimit(RLIMIT_NOFILE, &fileLimit);
50     fdLimit = fileLimit.rlim_max;
51 }
52 
53 
54 /**
55  * 
56  */
57 class AbstractSelector : Selector {
58     enum int NUM_KEVENTS = 1024;
59     private int _epollFD;
60     private bool isDisposed = false;
61     private epoll_event[NUM_KEVENTS] events;
62     private EventChannel _eventChannel;
63 
64     this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) {
65         super(id, divider, worker, maxChannels);
66 
67         // http://man7.org/linux/man-pages/man2/epoll_create.2.html
68         /*
69          * Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor.
70          * See the description of the O_CLOEXEC flag in open(2) for reasons why
71          * this may be useful.
72          */
73         _epollFD = epoll_create1(EPOLL_CLOEXEC);
74         if (_epollFD < 0)
75             throw new IOException("epoll_create failed");
76 
77         _eventChannel = new EpollEventChannel(this);
78         register(_eventChannel);
79     }
80 
81     ~this() @nogc {
82         // dispose();
83     }
84 
85     override void dispose() {
86         if (isDisposed)
87             return;
88 
89         version (HUNT_IO_DEBUG)
90             tracef("disposing selector[fd=%d]...", _epollFD);
91         isDisposed = true;
92         _eventChannel.close();
93         int r = core.sys.posix.unistd.close(_epollFD);
94         if(r != 0) {
95             version (HUNT_IO_DEBUG) warningf("error: %d", r);
96         }
97 
98         super.dispose();
99     }
100 
101     override void onStop() {
102         version (HUNT_IO_DEBUG)
103             infof("Selector stopping. fd=%d, id: %d", _epollFD, getId());
104 
105         if(!_eventChannel.isClosed()) {
106             _eventChannel.trigger();
107             // _eventChannel.onWrite();
108         }
109     }
110 
111     override bool register(AbstractChannel channel) {
112         super.register(channel);
113         
114         version (HUNT_IO_DEBUG)
115             tracef("register, channel(fd=%d, type=%s)", channel.handle, channel.type);
116 
117         // epoll_event e;
118 
119         // e.data.fd = infd;
120         // e.data.ptr = cast(void*) channel;
121         // e.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT;
122         // int s = epoll_ctl(_epollFD, EPOLL_CTL_ADD, infd, &e);
123         // if (s == -1) {
124         //     debug warningf("failed to register channel: fd=%d", infd);
125         //     return false;
126         // } else {
127         //     return true;
128         // }
129         if (epollCtl(channel, EPOLL_CTL_ADD)) {
130             return true;
131         } else {
132             debug warningf("failed to register channel: fd=%d", channel.handle);
133             return false;
134         }
135     }
136 
137     override bool deregister(AbstractChannel channel) {
138         scope(exit) {
139             super.deregister(channel);
140             version (HUNT_IO_DEBUG)
141                 tracef("deregister, channel(fd=%d, type=%s)", channel.handle, channel.type);
142         }
143 
144         if (epollCtl(channel, EPOLL_CTL_DEL)) {
145             return true;
146         } else {
147             warningf("deregister channel failed: fd=%d", fd);
148             return false;
149         }
150     }
151 
152     /**
153         timeout: in millisecond
154     */
155     protected override int doSelect(long timeout) {
156         int len = 0;
157 
158         if (timeout <= 0) { /* Indefinite or no wait */
159             do {
160                 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html
161                 // https://stackoverflow.com/questions/6870158/epoll-wait-fails-due-to-eintr-how-to-remedy-this/6870391#6870391
162                 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int) timeout);
163             } while ((len == -1) && (errno == EINTR));
164         } else { /* Bounded wait; bounded restarts */
165             len = iepoll(_epollFD, events.ptr, events.length, cast(int) timeout);
166         }
167 
168         foreach (i; 0 .. len) {
169             AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr);
170             if (channel is null) {
171                 debug warningf("channel is null");
172             } else {
173                 handeChannelEvent(channel, events[i].events);
174             }
175         }
176 
177         return len;
178     }
179 
180     private void handeChannelEvent(AbstractChannel channel, uint event) {
181         version (HUNT_IO_DEBUG) {
182             warningf("thread: %s", Thread.getThis().name());
183 
184             // Thread.sleep(300.msecs);
185             infof("handling event: selector=%d, channel=%d, events=%d, isReadable: %s, isWritable: %s, isClosed: %s", 
186                 this._epollFD, channel.handle, event, isReadable(event), isWritable(event), isClosed(event));
187         }
188 
189         try {
190             if (isClosed(event)) { // && errno != EINTR
191                 /* An error has occured on this fd, or the socket is not
192                     ready for reading (why were we notified then?) */
193                 version (HUNT_IO_DEBUG) {
194                     warningf("event=%d, isReadable: %s, isWritable: %s", 
195                         event, isReadable(event), isWritable(event));
196 
197                     if (isError(event)) {
198                         warningf("channel error: fd=%s, event=%d, errno=%d, message=%s",
199                                 channel.handle, event, errno, getErrorMessage(errno));
200                     } else {
201                         infof("channel closed: fd=%d, errno=%d, message=%s",
202                                     channel.handle, errno, getErrorMessage(errno));
203                     }
204                 }
205                 
206                 // The remote connection broken abnormally, so the channel should be notified.
207                 if(isReadable(event)) {
208                     channel.onRead();
209                 }
210 
211                 // if(isWritable(event)) {
212                 //     channel.onWrite();
213                 // }
214 
215                 channel.close();
216             } else if (event == EPOLLIN) {
217                 version (HUNT_IO_DEBUG)
218                     tracef("channel read event: fd=%d", channel.handle);
219                 channel.onRead();
220             } else if (event == EPOLLOUT) {
221                 version (HUNT_IO_DEBUG)
222                     tracef("channel write event: fd=%d", channel.handle);
223                 channel.onWrite();
224             } else if (event == (EPOLLIN | EPOLLOUT)) {
225                 version (HUNT_IO_DEBUG)
226                     tracef("channel read and write: fd=%d", channel.handle);
227                 channel.onWrite();
228                 channel.onRead();
229             } else {
230                 debug warningf("Only read/write/close events can be handled, current event: %d", event);
231             }
232         } catch (Exception e) {
233             debug {
234                 errorf("error while handing channel: fd=%s, exception=%s, message=%s",
235                         channel.handle, typeid(e), e.msg);
236             }
237             version(HUNT_DEBUG) warning(e);
238         }
239     }
240 
241     private int iepoll(int epfd, epoll_event* events, int numfds, int timeout) {
242         long start, now;
243         int remaining = timeout;
244         timeval t;
245         long diff;
246 
247         gettimeofday(&t, null);
248         start = t.tv_sec * 1000 + t.tv_usec / 1000;
249 
250         for (;;) {
251             int res = epoll_wait(epfd, events, numfds, remaining);
252             if (res < 0 && errno == EINTR) {
253                 if (remaining >= 0) {
254                     gettimeofday(&t, null);
255                     now = t.tv_sec * 1000 + t.tv_usec / 1000;
256                     diff = now - start;
257                     remaining -= diff;
258                     if (diff < 0 || remaining <= 0) {
259                         return 0;
260                     }
261                     start = now;
262                 }
263             } else {
264                 return res;
265             }
266         }
267     }
268 
269     // https://blog.csdn.net/ljx0305/article/details/4065058
270     private static bool isError(uint events) nothrow {
271         return (events & EPOLLERR) != 0;
272     }
273 
274     private static bool isClosed(uint e) nothrow {
275         return (e & EPOLLERR) != 0 || (e & EPOLLHUP) != 0 || (e & EPOLLRDHUP) != 0
276                 || (!(e & EPOLLIN) && !(e & EPOLLOUT)) != 0;
277     }
278 
279     private static bool isReadable(uint events) nothrow {
280         return (events & EPOLLIN) != 0;
281     }
282 
283     private static bool isWritable(uint events) nothrow {
284         return (events & EPOLLOUT) != 0;
285     }
286 
287     private static buildEpollEvent(AbstractChannel channel, ref epoll_event ev) {
288         ev.data.ptr = cast(void*) channel;
289         // ev.data.fd = channel.handle;
290         ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP;
291         if (channel.hasFlag(ChannelFlag.Read))
292             ev.events |= EPOLLIN;
293         if (channel.hasFlag(ChannelFlag.Write))
294             ev.events |= EPOLLOUT;
295         // if (channel.hasFlag(ChannelFlag.OneShot))
296         //     ev.events |= EPOLLONESHOT;
297         if (channel.hasFlag(ChannelFlag.ETMode))
298             ev.events |= EPOLLET;
299         return ev;
300     }
301 
302     private bool epollCtl(AbstractChannel channel, int opcode) {
303         assert(channel !is null);
304         const fd = channel.handle;
305         assert(fd >= 0, "The channel.handle is not initialized!");
306 
307         epoll_event ev;
308         buildEpollEvent(channel, ev);
309         int res = 0;
310 
311         do {
312             res = epoll_ctl(_epollFD, opcode, fd, &ev);
313         }
314         while ((res == -1) && (errno == EINTR));
315 
316         /*
317          * A channel may be registered with several Selectors. When each Selector
318          * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
319          * list to remove the file descriptor from epoll. The "last" Selector will
320          * close the file descriptor which automatically unregisters it from each
321          * epoll descriptor. To avoid costly synchronization between Selectors we
322          * allow pending updates to be processed, ignoring errors. The errors are
323          * harmless as the last update for the file descriptor is guaranteed to
324          * be EPOLL_CTL_DEL.
325          */
326         if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
327             warning("epoll_ctl failed");
328             return false;
329         } else
330             return true;
331     }
332 }