1 module hunt.io.channel.posix.AbstractStream;
2 
3 // dfmt off
4 version(Posix):
5 // dfmt on
6 
7 import hunt.event.selector.Selector;
8 import hunt.Functions;
9 import hunt.io.BufferUtils;
10 import hunt.io.ByteBuffer;
11 import hunt.io.channel.AbstractSocketChannel;
12 import hunt.io.channel.ChannelTask;
13 import hunt.io.channel.Common;
14 import hunt.io.IoError;
15 import hunt.io.SimpleQueue;
16 import hunt.logging.ConsoleLogger;
17 import hunt.system.Error;
18 import hunt.util.worker;
19 
20 
21 import std.format;
22 import std.socket;
23 
24 import core.atomic;
25 import core.stdc.errno;
26 import core.stdc.string;
27 import core.sys.posix.sys.socket : accept;
28 import core.sys.posix.unistd;
29 
30 
31 /**
32 TCP Peer
33 */
34 abstract class AbstractStream : AbstractSocketChannel {
35     private size_t _bufferSize = 4096;
36     private const(ubyte)[] _readBuffer;
37     private ByteBuffer writeBuffer;
38     private ChannelTask _task = null;
39 
40     /**
41     * Warning: The received data is stored a inner buffer. For a data safe,
42     * you would make a copy of it.
43     */
44     protected DataReceivedHandler dataReceivedHandler;
45     protected SimpleEventHandler disconnectionHandler;
46     protected SimpleActionHandler dataWriteDoneHandler;
47 
48     protected AddressFamily _family;
49     // protected ByteBuffer _bufferForRead;
50     protected WritingBufferQueue _writeQueue;
51     protected bool _isWriteCancelling = false;
52 
53     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) {
54         this._family = family;
55         _bufferSize = bufferSize;
56         // _bufferForRead = BufferUtils.allocate(bufferSize);
57         // _bufferForRead.limit(cast(int)bufferSize);
58         // _readBuffer = cast(ubyte[])_bufferForRead.array();
59         // _writeQueue = new WritingBufferQueue();
60         super(loop, ChannelType.TCP);
61         setFlag(ChannelFlag.Read, true);
62         setFlag(ChannelFlag.Write, true);
63         setFlag(ChannelFlag.ETMode, true);
64     }
65 
66     abstract bool isClient();
67     abstract bool isConnected() nothrow;
68     abstract protected void onDisconnected();
69 
70     private void onDataReceived(ByteBuffer buffer) {
71 
72         if (dataReceivedHandler is null) 
73             return;
74 
75         // _bufferForRead.limit(cast(int)len);
76         // _bufferForRead.position(0);
77 
78         if(taskWorker is null) {
79             // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00
80             // Using memory pool
81             // ByteBuffer bufferCopy = BufferUtils.clone(_bufferForRead);
82             // dataReceivedHandler(bufferCopy);
83             dataReceivedHandler(buffer);
84         } else {
85             ChannelTask task = _task;
86 
87             // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00
88             // More tests needed
89             if(task is null || task.isFinishing()) {
90                 task = createChannelTask();
91                 _task = task;
92 
93             } else {
94                 version(HUNT_METRIC) {
95                     warningf("Request peeding... Task status: %s", task.status);
96                 }
97             }
98 
99             task.put(buffer);
100         }
101     }
102 
103     private ChannelTask createChannelTask() {
104         ChannelTask task = new ChannelTask();
105         task.dataReceivedHandler = dataReceivedHandler;
106         taskWorker.put(task);
107         return task;
108     }
109 
110     /**
111      *
112      */
113     protected bool tryRead() {
114         bool isDone = true;
115         this.clearError();
116 
117         // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00
118         // Using memory pool        
119         // if(taskWorker !is null) {
120         ByteBuffer    _bufferForRead = BufferUtils.allocate(_bufferSize);
121             _bufferForRead.limit(cast(int)_bufferSize);
122         ubyte[]   _readBuffer = cast(ubyte[])_bufferForRead.array();
123         // }
124         ptrdiff_t len = read(this.handle, cast(void*) _readBuffer.ptr, _readBuffer.length);
125 
126         // ubyte[] rb = new ubyte[BufferSize];
127         // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length);
128         version (HUNT_IO_DEBUG) {
129             tracef("reading[fd=%d]: %d bytes", this.handle, len);
130         }
131 
132         if (len > 0) {
133             version(HUNT_IO_DEBUG) {
134                 if (len <= 32)
135                     infof("fd: %d, %d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. len]);
136                 else
137                     infof("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. 32]);
138             }
139 
140             _bufferForRead.limit(cast(int)len);
141             _bufferForRead.position(0);
142             onDataReceived(_bufferForRead);
143 
144             // It's prossible that there are more data waitting for read in the read I/O space.
145             if (len == _readBuffer.length) {
146                 version (HUNT_IO_DEBUG) infof("Read buffer is full read %d bytes. Need to read again.", len);
147                 isDone = false;
148             }
149         } else if (len == Socket.ERROR) {
150             // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call
151             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13
152             // check more error status
153             this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK;
154             if (_error) {
155                 this._errorMessage = getErrorMessage(errno);
156 
157                 if(errno == ECONNRESET) {
158                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
159                     onDisconnected();
160                     errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
161                 } else {
162                     errorOccurred(ErrorCode.INTERRUPTED , "Error occurred on read");
163                 }
164             } else {
165                 debug warningf("warning on read: fd=%d, errno=%d, message=%s", this.handle,
166                         errno, getErrorMessage(errno));
167             }
168 
169         } else {
170             version (HUNT_DEBUG)
171                 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle);
172             onDisconnected();
173         }
174 
175         return isDone;
176     }
177 
178     override protected void doClose() {
179         version (HUNT_IO_DEBUG) {
180             infof("peer socket %s closing: fd=%d", this.remoteAddress.toString(), this.handle);
181         }
182         if(this.socket is null) {
183           import core.sys.posix.unistd;
184           core.sys.posix.unistd.close(this.handle);
185         } else {
186           this.socket.shutdown(SocketShutdown.BOTH);
187           this.socket.close();
188         }
189             
190         version (HUNT_IO_DEBUG) {
191             infof("peer socket %s closed: fd=%d", this.remoteAddress.toString, this.handle);
192         }
193 
194         Task task = _task;
195         if(task !is null) {
196             task.stop();
197         }
198     }
199 
200 
201     /**
202      * Try to write a block of data.
203      */
204     protected ptrdiff_t tryWrite(const(ubyte)[] data) {
205         clearError();
206         // const nBytes = this.socket.send(data);
207         version (HUNT_IO_DEBUG)
208             tracef("try to write: %d bytes, fd=%d", data.length, this.handle);
209         const nBytes = write(this.handle, data.ptr, data.length);
210         version (HUNT_IO_DEBUG)
211             tracef("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle);
212 
213         if (nBytes > 0) {
214             return nBytes;
215         }
216 
217         if (nBytes == Socket.ERROR) {
218             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38
219             // check more error status
220             // EPIPE/Broken pipe:
221             // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll
222 
223             if(errno == EAGAIN) {
224                 version (HUNT_IO_DEBUG) {
225                     warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle,
226                         errno, getErrorMessage(errno));
227                 }
228             } else if(errno == EINTR || errno == EWOULDBLOCK) {
229                 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait
230                 debug warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle,
231                         errno, getErrorMessage(errno));
232                 // eventLoop.update(this);
233             } else {
234                 this._error = true;
235                 this._errorMessage = getErrorMessage(errno);
236                 if(errno == ECONNRESET) {
237                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
238                     onDisconnected();
239                     errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
240                 } else if(errno == EPIPE) {
241                     // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe
242                     // Handle SIGPIPE signal
243                     onDisconnected();
244                     errorOccurred(ErrorCode.BROKENPIPE , "Broken pipe detected!");
245                 }
246 
247             }
248         } else {
249             version (HUNT_DEBUG) {
250                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
251                 assert(false, "Undefined behavior!");
252             } else {
253                 this._error = true;
254             }
255         }
256 
257         return 0;
258     }
259 
260     private bool tryNextWrite(ByteBuffer buffer) {
261         const(ubyte)[] data = cast(const(ubyte)[])buffer.peekRemaining();
262         version (HUNT_IO_DEBUG) {
263             tracef("writting from a buffer [fd=%d], %d bytes, buffer: %s",
264                 this.handle, data.length, buffer.toString());
265         }
266 
267         ptrdiff_t remaining = data.length;
268         if(data.length == 0)
269             return true;
270 
271         while(remaining > 0 && !_error && !isClosing() && !_isWriteCancelling) {
272             ptrdiff_t nBytes = tryWrite(data);
273             version (HUNT_IO_DEBUG)
274             {
275                 tracef("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s",
276                     this.handle, nBytes, data.length, remaining, buffer.toString());
277             }
278 
279             if (nBytes > 0) {
280                 remaining -= nBytes;
281                 data = data[nBytes .. $];
282             }
283         }
284 
285         version (HUNT_IO_DEBUG) {
286             if(remaining == 0) {
287                     tracef("A buffer is written out. fd=%d", this.handle);
288                 return true;
289             } else {
290                 warningf("Writing cancelled or an error ocurred. fd=%d", this.handle);
291                 return false;
292             }
293         } else {
294             return remaining == 0;
295         }
296     }
297 
298     void resetWriteStatus() {
299         if(_writeQueue !is null)
300             _writeQueue.clear();
301         atomicStore(_isWritting, false);
302         _isWriteCancelling = false;
303     }
304 
305     /**
306      * Should be thread-safe.
307      */
308     override void onWrite() {
309         version (HUNT_IO_DEBUG)
310         {
311             tracef("checking status, isWritting: %s, writeBuffer: %s",
312                 _isWritting, writeBuffer is null ? "null" : writeBuffer.toString());
313         }
314 
315         if(!_isWritting) {
316             version (HUNT_IO_DEBUG)
317             infof("No data needs to be written out. fd=%d", this.handle);
318             return;
319         }
320 
321         if(isClosing() && _isWriteCancelling) {
322             version (HUNT_DEBUG) infof("Write cancelled or closed, fd=%d", this.handle);
323             resetWriteStatus();
324             return;
325         }
326 
327         // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00
328         // More tests are needed
329         // keep thread-safe here
330         if(!cas(&_isBusyWritting, false, true)) {
331             // version (HUNT_IO_DEBUG)
332             version(HUNT_DEBUG) warningf("busy writing. fd=%d", this.handle);
333             return;
334         }
335 
336         scope(exit) {
337             _isBusyWritting = false;
338         }
339 
340         if(writeBuffer !is null) {
341             if(tryNextWrite(writeBuffer)) {
342                 writeBuffer = null;
343             } else {
344                 version (HUNT_IO_DEBUG)
345                 {
346                     infof("waiting to try again... fd=%d, writeBuffer: %s",
347                         this.handle, writeBuffer.toString());
348                 }
349                 // eventLoop.update(this);
350                 return;
351             }
352             version (HUNT_IO_DEBUG)
353                 tracef("running here, fd=%d", this.handle);
354         }
355 
356         if(checkAllWriteDone()) {
357             return;
358         }
359 
360         version (HUNT_IO_DEBUG) {
361             tracef("start to write [fd=%d], writeBuffer %s empty", this.handle, writeBuffer is null ? "is" : "is not");
362         }
363 
364         if(_writeQueue.tryDequeue(writeBuffer)) {
365             if(tryNextWrite(writeBuffer)) {
366                 writeBuffer = null;
367                 checkAllWriteDone();
368             } else {
369             version (HUNT_IO_DEBUG)
370                 infof("waiting to try again: fd=%d, writeBuffer: %s", this.handle, writeBuffer.toString());
371 
372                 // eventLoop.update(this);
373             }
374             version (HUNT_IO_DEBUG) {
375                 warningf("running here, fd=%d", this.handle);
376             }
377         }
378     }
379     private shared bool _isBusyWritting = false;
380 
381     protected bool checkAllWriteDone() {
382         version (HUNT_IO_DEBUG) {
383             import std.conv;
384             tracef("checking remaining: fd=%d, writeQueue empty: %s", this.handle,
385                _writeQueue is null ||  _writeQueue.isEmpty().to!string());
386         }
387 
388         if(_writeQueue is null || _writeQueue.isEmpty()) {
389             resetWriteStatus();
390             version (HUNT_IO_DEBUG)
391                 infof("All data are written out: fd=%d", this.handle);
392             if(dataWriteDoneHandler !is null)
393                 dataWriteDoneHandler(this);
394             return true;
395         }
396 
397         return false;
398     }
399 
400     protected void initializeWriteQueue() {
401         if (_writeQueue is null) {
402             _writeQueue = new WritingBufferQueue();
403         }
404     }
405 
406     protected bool doConnect(Address addr) {
407         try {
408             this.socket.connect(addr);
409         } catch (SocketOSException e) {
410             error(e.msg);
411             version(HUNT_DEBUG) error(e);
412             return false;
413         }
414         return true;
415     }
416 
417     void cancelWrite() {
418         _isWriteCancelling = true;
419     }
420 
421     bool isWriteCancelling() {
422         return _isWriteCancelling;
423     }
424 
425     DataReceivedHandler getDataReceivedHandler() {
426         return dataReceivedHandler;
427     }
428 
429 }