1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.selector.Epoll;
13 
14 // dfmt off
15 version(HAVE_EPOLL):
16 
17 // dfmt on
18 
19 import std.exception;
20 import std.socket;
21 import std.string;
22 import hunt.logging;
23 
24 import core.time;
25 import core.stdc.string;
26 import core.stdc.errno;
27 import core.sys.posix.sys.types; // for ssize_t, size_t
28 import core.sys.posix.netinet.tcp;
29 import core.sys.posix.netinet.in_;
30 import core.sys.posix.unistd;
31 
32 import core.sys.posix.sys.resource;
33 import core.sys.posix.sys.time;
34 
35 import hunt.Exceptions;
36 import hunt.io.socket;
37 import hunt.event.timer;
38 import hunt.system.Error;
39 
40 /* Max. theoretical number of file descriptors on system. */
41 __gshared size_t fdLimit = 0;
42 
43 shared static this() {
44     rlimit fileLimit;
45     getrlimit(RLIMIT_NOFILE, &fileLimit);
46     fdLimit = fileLimit.rlim_max;
47 }
48 
49 /**
50 */
51 class AbstractSelector : Selector {
52     enum int NUM_KEVENTS = 512;
53     private int _epollFD;
54     private EventChannel _event;
55 
56     this() {
57         // http://man7.org/linux/man-pages/man2/epoll_create.2.html
58         /*
59          * epoll_create expects a size as a hint to the kernel about how to
60          * dimension internal structures. We can't predict the size in advance.
61          */
62         // _epollFD = epoll_create1(0);
63         _epollFD = epoll_create(256);
64         if(_epollFD < 0)
65             throw new IOException("epoll_create failed");
66         _event = new EpollEventChannel(this);
67         register(_event);
68     }
69 
70     ~this() {
71         dispose();
72     }
73 
74     override void dispose() {
75         if (isDisposed)
76             return;
77         
78         version (HUNT_DEBUG) tracef("disposing selector[fd=%d]...", _epollFD);
79         isDisposed = true;
80         _event.close();
81         core.sys.posix.unistd.close(_epollFD);
82     }
83 
84     private bool isDisposed = false;
85 
86     override void stop() {
87         if(_running) {
88             super.stop();
89             version (HUNT_DEBUG) tracef("notice that selector[fd=%d] stopped", _epollFD);
90             _event.call(); 
91         }
92     }
93 
94     override bool register(AbstractChannel channel) {
95         assert(channel !is null);
96         version (HUNT_DEBUG) tracef("register channel: fd=%d", channel.handle);
97 
98         if (channel.type == ChannelType.Timer) {
99             auto wt = cast(AbstractTimer) channel;
100             if (wt !is null)
101                 wt.setTimer();
102         }
103 
104         if(epollCtl(channel, EPOLL_CTL_ADD)) {
105             _event.setNext(channel);
106             return true;
107         } else {
108             warningf("register channell failed: fd=%d", channel.handle);
109             return false;
110         }
111     }
112 
113     override bool reregister(AbstractChannel channel) {        
114         return epollCtl(channel, EPOLL_CTL_MOD);
115     }
116 
117     override bool deregister(AbstractChannel channel) {
118         if(epollCtl(channel, EPOLL_CTL_DEL)) {
119             version (HUNT_DEBUG) tracef("deregister channel: fd=%d", channel.handle);
120             return true;
121         } else {
122             warningf("deregister channel failed: fd=%d", channel.handle);
123             return false;
124         }
125     }
126 
127     /**
128         timeout: in millisecond
129     */
130     override protected int doSelect(long timeout) {
131         epoll_event[NUM_KEVENTS] events;
132         int len = 0;
133 
134         if(timeout <= 0) { /* Indefinite or no wait */
135             do {
136                 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html
137                 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int)timeout);
138             } while((len == -1) && (errno == EINTR));
139         } else { /* Bounded wait; bounded restarts */
140             len = iepoll(_epollFD, events.ptr, events.length, cast(int)timeout);
141         }
142 
143         foreach (i; 0 .. len) {
144             AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr);
145             if (channel is null) {
146                 warningf("channel is null");
147                 continue;
148             }
149 
150             uint currentEvents = events[i].events;
151             version (HUNT_DEBUG) infof("handling event: events=%d, fd=%d", currentEvents, channel.handle);
152 
153             if (isClosed(currentEvents)) {
154                 version (HUNT_DEBUG)
155                 tracef("channel closed: fd=%d, errno=%d, message=%s", channel.handle,
156                         errno, getErrorMessage(errno));
157                 channel.close();
158             } else if (isError(currentEvents)) {
159                 // version (HUNT_DEBUG)
160                 debug warningf("channel error: fd=%s, errno=%d, message=%s", channel.handle,
161                         errno, getErrorMessage(errno));
162                 channel.close();
163             } else if (isReadable(currentEvents)) {
164                 channel.onRead();
165             } else if (isWritable(currentEvents)) {
166                 AbstractSocketChannel wt = cast(AbstractSocketChannel) channel;
167                 assert(wt !is null);
168                 wt.onWriteDone();
169             } else {
170                 warningf("Undefined behavior: fd=%d, registered=%s", channel.handle, channel.isRegistered);
171             }
172         }
173 
174         return len;
175     }
176 
177     private int iepoll(int epfd, epoll_event* events, int numfds, int timeout) {
178         long start, now;
179         int remaining = timeout;
180         timeval t;
181         long diff;
182 
183         gettimeofday(&t, null);
184         start = t.tv_sec * 1000 + t.tv_usec / 1000;
185 
186         for (;;) {
187             int res = epoll_wait(epfd, events, numfds, remaining);
188             if (res < 0 && errno == EINTR) {
189                 if (remaining >= 0) {
190                     gettimeofday(&t, null);
191                     now = t.tv_sec * 1000 + t.tv_usec / 1000;
192                     diff = now - start;
193                     remaining -= diff;
194                     if (diff < 0 || remaining <= 0) {
195                         return 0;
196                     }
197                     start = now;
198                 }
199             } else {
200                 return res;
201             }
202         }
203     }
204 
205     // https://blog.csdn.net/ljx0305/article/details/4065058
206     private static bool isError(uint events) nothrow {
207         return (events & EPOLLERR ) != 0;
208     }
209 
210     private static bool isClosed(uint events) nothrow {
211         return (events & (EPOLLHUP | EPOLLRDHUP)) != 0;
212     }
213 
214     private static bool isReadable(uint events) nothrow {
215         return (events & EPOLLIN) != 0;
216     }
217 
218     private static bool isWritable(uint events) nothrow {
219         return (events & EPOLLOUT) != 0;
220     }
221 
222     private static buildEpollEvent(AbstractChannel channel, ref epoll_event ev) {
223         ev.data.ptr = cast(void*) channel;
224         // ev.data.fd = channel.handle;
225         ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP;
226         if (channel.hasFlag(ChannelFlag.Read))
227             ev.events |= EPOLLIN;
228         if (channel.hasFlag(ChannelFlag.Write))
229             ev.events |= EPOLLOUT;
230         if (channel.hasFlag(ChannelFlag.OneShot))
231             ev.events |= EPOLLONESHOT;
232         if (channel.hasFlag(ChannelFlag.ETMode))
233             ev.events |= EPOLLET;
234         return ev;
235     }
236 
237     private bool epollCtl(AbstractChannel channel, int opcode) {
238         assert(channel !is null);
239         const fd = channel.handle;
240         assert(fd >= 0, "The channel.handle is not initialized!");
241 
242         epoll_event ev;
243         buildEpollEvent(channel, ev);
244         int res = 0;
245         do {
246             res = epoll_ctl(_epollFD, opcode, fd, &ev);
247         } while((res == -1) && (errno == EINTR));
248 
249         /*
250          * A channel may be registered with several Selectors. When each Selector
251          * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
252          * list to remove the file descriptor from epoll. The "last" Selector will
253          * close the file descriptor which automatically unregisters it from each
254          * epoll descriptor. To avoid costly synchronization between Selectors we
255          * allow pending updates to be processed, ignoring errors. The errors are
256          * harmless as the last update for the file descriptor is guaranteed to
257          * be EPOLL_CTL_DEL.
258          */
259         if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
260             warning("epoll_ctl failed");
261             return false;
262         } else
263             return true;        
264     }
265 }
266 
267 
268 /**
269 */
270 class EpollEventChannel : EventChannel {
271     this(Selector loop) {
272         super(loop);
273         setFlag(ChannelFlag.Read, true);
274         this.handle = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
275         _isRegistered = true;
276     }
277 
278     ~this() {
279         close();
280     }
281 
282     override void call() {
283         version (HUNT_DEBUG) tracef("calling event [fd=%d]...%s", this.handle, eventLoop.isRuning);
284         ulong value = 1;
285         core.sys.posix.unistd.write(this.handle, &value, value.sizeof);
286     }
287 
288     override void onRead() {
289         version (HUNT_DEBUG) tracef("channel reading [fd=%d]...", this.handle);
290         this.clearError();
291         ulong value;
292         ssize_t n = core.sys.posix.unistd.read(this.handle, &value, value.sizeof);
293         version (HUNT_DEBUG) tracef("channel read done: %d bytes, fd=%d", n, this.handle);
294     }
295 
296     override protected void onClose() {
297         version (HUNT_DEBUG) tracef("close event channel [fd=%d]...", this.handle);
298         core.sys.posix.unistd.close(this.handle);
299     }
300 }
301 
302 enum {
303     EFD_SEMAPHORE = 0x1,
304     EFD_CLOEXEC = 0x80000,
305     EFD_NONBLOCK = 0x800
306 }
307 
308 enum {
309     EPOLL_CLOEXEC = 0x80000,
310     EPOLL_NONBLOCK = 0x800
311 }
312 
313 enum {
314     EPOLLIN = 0x001,
315     EPOLLPRI = 0x002,
316     EPOLLOUT = 0x004,
317     EPOLLRDNORM = 0x040,
318     EPOLLRDBAND = 0x080,
319     EPOLLWRNORM = 0x100,
320     EPOLLWRBAND = 0x200,
321     EPOLLMSG = 0x400,
322     EPOLLERR = 0x008,
323     EPOLLHUP = 0x010,
324     EPOLLRDHUP = 0x2000, // since Linux 2.6.17
325     EPOLLONESHOT = 1u << 30,
326     EPOLLET = 1u << 31
327 }
328 
329 /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl().  */
330 enum {
331     EPOLL_CTL_ADD = 1, // Add a file descriptor to the interface.
332     EPOLL_CTL_DEL = 2, // Remove a file descriptor from the interface.
333     EPOLL_CTL_MOD = 3, // Change file descriptor epoll_event structure.
334 }
335 
336 // dfmt off
337 extern (C) : @system : nothrow :
338 
339 align(1) struct epoll_event {
340 align(1):
341     uint events;
342     epoll_data data;
343 }
344 
345 union epoll_data {
346     void* ptr;
347     int fd;
348     uint u32;
349     ulong u64;
350 }
351 
352 // dfmt on
353 
354 int epoll_create(int size);
355 int epoll_create1(int flags);
356 int epoll_ctl(int epfd, int op, int fd, epoll_event* event);
357 int epoll_wait(int epfd, epoll_event* events, int maxevents, int timeout);
358 
359 socket_t eventfd(uint initval, int flags);