1 module hunt.io.channel.posix.AbstractStream;
2 
3 // dfmt off
4 version(Posix):
5 // dfmt on
6 
7 import hunt.io.BufferUtils;
8 import hunt.io.ByteBuffer;
9 import hunt.event.selector.Selector;
10 import hunt.Functions;
11 import hunt.io.channel.AbstractSocketChannel;
12 import hunt.io.channel.Common;
13 import hunt.io.IoError;
14 import hunt.io.worker.WorkerGroupObject;
15 import hunt.logging.ConsoleLogger;
16 import hunt.system.Error;
17 
18 
19 import std.format;
20 import std.socket;
21 
22 import core.atomic;
23 import core.stdc.errno;
24 import core.stdc.string;
25 import core.sys.posix.sys.socket : accept;
26 import core.sys.posix.unistd;
27 
28 enum string ResponseData = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nConnection: Keep-Alive\r\nContent-Type: text/plain\r\nServer: Hunt/1.0\r\nDate: Wed, 17 Apr 2013 12:00:00 GMT\r\n\r\nHello, World!";
29 
30 /**
31 TCP Peer
32 */
33 abstract class AbstractStream : AbstractSocketChannel {
34     enum BufferSize = 4096;
35     private const(ubyte)[] _readBuffer;
36     private ByteBuffer writeBuffer;
37 
38     /**
39     * Warning: The received data is stored a inner buffer. For a data safe,
40     * you would make a copy of it.
41     */
42     protected DataReceivedHandler dataReceivedHandler;
43     protected SimpleEventHandler disconnectionHandler;
44     protected SimpleActionHandler dataWriteDoneHandler;
45 
46     protected AddressFamily _family;
47     protected ByteBuffer _bufferForRead;
48     protected WritingBufferQueue _writeQueue;
49     protected bool _isWriteCancelling = false;
50 
51     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) {
52         this._family = family;
53         _bufferForRead = BufferUtils.allocate(bufferSize);
54         _bufferForRead.limit(cast(int)bufferSize);
55         _readBuffer = cast(ubyte[])_bufferForRead.array();
56         // _writeQueue = new WritingBufferQueue();
57         super(loop, ChannelType.TCP);
58         setFlag(ChannelFlag.Read, true);
59         setFlag(ChannelFlag.Write, true);
60         setFlag(ChannelFlag.ETMode, true);
61     }
62 
63     abstract bool isClient();
64     abstract bool isConnected() nothrow;
65     abstract protected void onDisconnected();
66 
67     /**
68      *
69      */
70     protected bool tryRead() {
71         bool isDone = true;
72         this.clearError();
73         ptrdiff_t len = read(this.handle, cast(void*) _readBuffer.ptr, _readBuffer.length);
74 
75         // ubyte[] rb = new ubyte[BufferSize];
76         // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length);
77         version (HUNT_IO_DEBUG) {
78             tracef("reading[fd=%d]: %d bytes", this.handle, len);
79         }
80 
81         if (len > 0) {
82             version(HUNT_IO_DEBUG) {
83                 if (len <= 32)
84                     infof("fd: %d, %d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. len]);
85                 else
86                     infof("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. 32]);
87             }
88             if (dataReceivedHandler !is null) {
89                 _bufferForRead.limit(cast(int)len);
90                 _bufferForRead.position(0);
91 
92                 if (gWorkerGroup !is null)
93                 {
94                   gWorkerGroup.put(this, BufferUtils.clone(_bufferForRead));
95                 }else
96                 {
97                   dataReceivedHandler(_bufferForRead);
98                 }
99                 // ByteBuffer bb = BufferUtils.wrap(cast(byte[])rb[0..len]);
100                 // dataReceivedHandler(bb);
101             }
102 
103             // size_t nBytes = tryWrite(cast(ubyte[])ResponseData);
104 
105             // if(nBytes < ResponseData.length) {
106             //     warning("data lost");
107             // }
108 
109             // It's prossible that there are more data waitting for read in the read I/O space.
110             if (len == _readBuffer.length) {
111                 version (HUNT_IO_DEBUG) infof("Read buffer is full read %d bytes. Need to read again.", len);
112                 isDone = false;
113             }
114         } else if (len == Socket.ERROR) {
115             // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call
116             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13
117             // check more error status
118             this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK;
119             if (_error) {
120                 this._errorMessage = getErrorMessage(errno);
121 
122                 if(errno == ECONNRESET) {
123                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
124                     onDisconnected();
125                     errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
126                 } else {
127                     errorOccurred(ErrorCode.INTERRUPTED , "Error occurred on read");
128                 }
129             } else {
130                 debug warningf("warning on read: fd=%d, errno=%d, message=%s", this.handle,
131                         errno, getErrorMessage(errno));
132             }
133 
134         } else {
135             version (HUNT_DEBUG)
136                 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle);
137             onDisconnected();
138         }
139 
140         return isDone;
141     }
142 
143     override protected void doClose() {
144         version (HUNT_IO_DEBUG) {
145             infof("peer socket %s closing: fd=%d", this.remoteAddress.toString(), this.handle);
146         }
147         if(this.socket is null) {
148           import core.sys.posix.unistd;
149           core.sys.posix.unistd.close(this.handle);
150         } else {
151           this.socket.shutdown(SocketShutdown.BOTH);
152           this.socket.close();
153         }
154             
155         version (HUNT_IO_DEBUG) {
156             infof("peer socket %s closed: fd=%d", this.remoteAddress.toString, this.handle);
157         }
158     }
159 
160 
161     /**
162      * Try to write a block of data.
163      */
164     protected ptrdiff_t tryWrite(const(ubyte)[] data) {
165         clearError();
166         // const nBytes = this.socket.send(data);
167         version (HUNT_IO_DEBUG)
168             tracef("try to write: %d bytes, fd=%d", data.length, this.handle);
169         const nBytes = write(this.handle, data.ptr, data.length);
170         version (HUNT_IO_DEBUG)
171             tracef("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle);
172 
173         if (nBytes > 0) {
174             return nBytes;
175         }
176 
177         if (nBytes == Socket.ERROR) {
178             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38
179             // check more error status
180             // EPIPE/Broken pipe:
181             // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll
182 
183             if(errno == EAGAIN) {
184                 version (HUNT_IO_DEBUG) {
185                     warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle,
186                         errno, getErrorMessage(errno));
187                 }
188             } else if(errno == EINTR || errno == EWOULDBLOCK) {
189                 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait
190                 debug warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle,
191                         errno, getErrorMessage(errno));
192                 // eventLoop.update(this);
193             } else {
194                 this._error = true;
195                 this._errorMessage = getErrorMessage(errno);
196                 if(errno == ECONNRESET) {
197                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
198                     onDisconnected();
199                     errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
200                 } else if(errno == EPIPE) {
201                     // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe
202                     // Handle SIGPIPE signal
203                     onDisconnected();
204                     errorOccurred(ErrorCode.BROKENPIPE , "Broken pipe detected!");
205                 }
206 
207             }
208         } else {
209             version (HUNT_DEBUG) {
210                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
211                 assert(false, "Undefined behavior!");
212             } else {
213                 this._error = true;
214             }
215         }
216 
217         return 0;
218     }
219 
220     private bool tryNextWrite(ByteBuffer buffer) {
221         const(ubyte)[] data = cast(const(ubyte)[])buffer.getRemaining();
222         version (HUNT_IO_DEBUG) {
223             tracef("writting from a buffer [fd=%d], %d bytes, buffer: %s",
224                 this.handle, data.length, buffer.toString());
225         }
226 
227         ptrdiff_t remaining = data.length;
228         if(data.length == 0)
229             return true;
230 
231         while(remaining > 0 && !_error && !isClosing() && !_isWriteCancelling) {
232             ptrdiff_t nBytes = tryWrite(data);
233             version (HUNT_IO_DEBUG)
234             {
235                 tracef("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s",
236                     this.handle, nBytes, data.length, remaining, buffer.toString());
237             }
238 
239             if (nBytes > 0) {
240                 remaining -= nBytes;
241                 data = data[nBytes .. $];
242             }
243         }
244 
245         version (HUNT_IO_DEBUG) {
246             if(remaining == 0) {
247                     tracef("A buffer is written out. fd=%d", this.handle);
248                 return true;
249             } else {
250                 warningf("Writing cancelled or an error ocurred. fd=%d", this.handle);
251                 return false;
252             }
253         } else {
254             return remaining == 0;
255         }
256     }
257 
258     void resetWriteStatus() {
259         if(_writeQueue !is null)
260             _writeQueue.clear();
261         atomicStore(_isWritting, false);
262         _isWriteCancelling = false;
263     }
264 
265     /**
266      * Should be thread-safe.
267      */
268     override void onWrite() {
269         version (HUNT_IO_DEBUG)
270         {
271             tracef("checking status, isWritting: %s, writeBuffer: %s",
272                 _isWritting, writeBuffer is null ? "null" : writeBuffer.toString());
273         }
274 
275         if(!_isWritting) {
276             version (HUNT_IO_DEBUG)
277             infof("No data needs to be written out. fd=%d", this.handle);
278             return;
279         }
280 
281         if(isClosing() && _isWriteCancelling) {
282             version (HUNT_DEBUG) infof("Write cancelled or closed, fd=%d", this.handle);
283             resetWriteStatus();
284             return;
285         }
286 
287         // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00
288         // More tests are needed
289         // keep thread-safe here
290         if(!cas(&_isBusyWritting, false, true)) {
291             // version (HUNT_IO_DEBUG)
292             version(HUNT_DEBUG) warningf("busy writing. fd=%d", this.handle);
293             return;
294         }
295 
296         scope(exit) {
297             _isBusyWritting = false;
298         }
299 
300         if(writeBuffer !is null) {
301             if(tryNextWrite(writeBuffer)) {
302                 writeBuffer = null;
303             } else {
304                 version (HUNT_IO_DEBUG)
305                 {
306                     infof("waiting to try again... fd=%d, writeBuffer: %s",
307                         this.handle, writeBuffer.toString());
308                 }
309                 // eventLoop.update(this);
310                 return;
311             }
312             version (HUNT_IO_DEBUG)
313                 tracef("running here, fd=%d", this.handle);
314         }
315 
316         if(checkAllWriteDone()) {
317             return;
318         }
319 
320         version (HUNT_IO_DEBUG) {
321             tracef("start to write [fd=%d], writeBuffer %s empty", this.handle, writeBuffer is null ? "is" : "is not");
322         }
323 
324         if(_writeQueue.tryDequeue(writeBuffer)) {
325             if(tryNextWrite(writeBuffer)) {
326                 writeBuffer = null;
327                 checkAllWriteDone();
328             } else {
329             version (HUNT_IO_DEBUG)
330                 infof("waiting to try again: fd=%d, writeBuffer: %s", this.handle, writeBuffer.toString());
331 
332                 // eventLoop.update(this);
333             }
334             version (HUNT_IO_DEBUG) {
335                 warningf("running here, fd=%d", this.handle);
336             }
337         }
338     }
339     private shared bool _isBusyWritting = false;
340 
341     protected bool checkAllWriteDone() {
342         version (HUNT_IO_DEBUG) {
343             import std.conv;
344             tracef("checking remaining: fd=%d, writeQueue empty: %s", this.handle,
345                _writeQueue is null ||  _writeQueue.isEmpty().to!string());
346         }
347 
348         if(_writeQueue is null || _writeQueue.isEmpty()) {
349             resetWriteStatus();
350             version (HUNT_IO_DEBUG)
351                 infof("All data are written out: fd=%d", this.handle);
352             if(dataWriteDoneHandler !is null)
353                 dataWriteDoneHandler(this);
354             return true;
355         }
356 
357         return false;
358     }
359 
360     protected void initializeWriteQueue() {
361         if (_writeQueue is null) {
362             _writeQueue = new WritingBufferQueue();
363         }
364     }
365 
366     protected bool doConnect(Address addr) {
367         try {
368             this.socket.connect(addr);
369         } catch (SocketOSException e) {
370             error(e.msg);
371             version(HUNT_DEBUG) error(e);
372             return false;
373         }
374         return true;
375     }
376 
377     void cancelWrite() {
378         _isWriteCancelling = true;
379     }
380 
381     bool isWriteCancelling() {
382         return _isWriteCancelling;
383     }
384 
385     DataReceivedHandler getDataReceivedHandler() {
386         return dataReceivedHandler;
387     }
388 
389 }