1 module hunt.io.channel.posix.AbstractStream;
2 
3 // dfmt off
4 version(Posix):
5 // dfmt on
6 
7 import hunt.event.selector.Selector;
8 import hunt.Functions;
9 import hunt.io.BufferUtils;
10 import hunt.io.ByteBuffer;
11 import hunt.io.channel.AbstractSocketChannel;
12 import hunt.io.channel.ChannelTask;
13 import hunt.io.channel.Common;
14 import hunt.io.IoError;
15 import hunt.io.SimpleQueue;
16 import hunt.logging;
17 import hunt.system.Error;
18 import hunt.util.worker;
19 
20 
21 import std.format;
22 import std.socket;
23 
24 import core.atomic;
25 import core.stdc.errno;
26 import core.stdc.string;
27 import core.sys.posix.sys.socket : accept;
28 import core.sys.posix.unistd;
29 
30 
31 /**
32 TCP Peer
33 */
34 abstract class AbstractStream : AbstractSocketChannel {
35     private size_t _bufferSize = 4096;
36     private const(ubyte)[] _readBuffer;
37     private ByteBuffer writeBuffer;
38     private ChannelTask _task = null;
39 
40     /**
41     * Warning: The received data is stored a inner buffer. For a data safe,
42     * you would make a copy of it.
43     */
44     protected DataReceivedHandler dataReceivedHandler;
45     protected SimpleEventHandler disconnectionHandler;
46     protected SimpleActionHandler dataWriteDoneHandler;
47 
48     protected AddressFamily _family;
49     // protected ByteBuffer _bufferForRead;
50     protected WritingBufferQueue _writeQueue;
51     protected bool _isWriteCancelling = false;
52 
53     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) {
54         this._family = family;
55         _bufferSize = bufferSize;
56         // _bufferForRead = BufferUtils.allocate(bufferSize);
57         // _bufferForRead.limit(cast(int)bufferSize);
58         // _readBuffer = cast(ubyte[])_bufferForRead.array();
59         // _writeQueue = new WritingBufferQueue();
60         super(loop, ChannelType.TCP);
61         setFlag(ChannelFlag.Read, true);
62         setFlag(ChannelFlag.Write, true);
63         setFlag(ChannelFlag.ETMode, true);
64     }
65 
66     abstract bool isClient();
67     abstract bool isConnected() nothrow;
68     abstract protected void onDisconnected();
69 
70     private void onDataReceived(ByteBuffer buffer) {
71 
72         if (dataReceivedHandler is null) 
73             return;
74 
75         // _bufferForRead.limit(cast(int)len);
76         // _bufferForRead.position(0);
77 
78         if(taskWorker is null) {
79             // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00
80             // Using memory pool
81             // ByteBuffer bufferCopy = BufferUtils.clone(_bufferForRead);
82             // dataReceivedHandler(bufferCopy);
83             dataReceivedHandler(buffer);
84         } else {
85             ChannelTask task = _task;
86 
87             // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00
88             // More tests needed
89             if(task is null || task.isFinishing()) {
90                 task = createChannelTask();
91                 _task = task;
92 
93             } else {
94                 version(HUNT_METRIC) {
95                     warningf("Request peeding... Task status: %s", task.status);
96                 }
97             }
98 
99             task.put(buffer);
100         }
101     }
102 
103     private ChannelTask createChannelTask() {
104         ChannelTask task = new ChannelTask();
105         task.dataReceivedHandler = dataReceivedHandler;
106         taskWorker.put(task);
107         return task;
108     }
109 
110     /**
111      *
112      */
113     protected bool tryRead() {
114         bool isDone = true;
115         this.clearError();
116 
117         // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00
118         // Using memory pool        
119         // if(taskWorker !is null) {
120         ByteBuffer    _bufferForRead = BufferUtils.allocate(_bufferSize);
121             _bufferForRead.limit(cast(int)_bufferSize);
122         ubyte[]   _readBuffer = cast(ubyte[])_bufferForRead.array();
123         // }
124         ptrdiff_t len = read(this.handle, cast(void*) _readBuffer.ptr, _readBuffer.length);
125 
126         // ubyte[] rb = new ubyte[BufferSize];
127         // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length);
128         version (HUNT_IO_DEBUG) {
129             tracef("reading[fd=%d]: %d bytes", this.handle, len);
130         }
131 
132         if (len > 0) {
133             version(HUNT_IO_DEBUG) {
134                 if (len <= 32)
135                     infof("fd: %d, %d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. len]);
136                 else
137                     infof("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. 32]);
138             }
139 
140             _bufferForRead.limit(cast(int)len);
141             _bufferForRead.position(0);
142             onDataReceived(_bufferForRead);
143 
144             // It's prossible that there are more data waitting for read in the read I/O space.
145             if (len == _readBuffer.length) {
146                 version (HUNT_IO_DEBUG) infof("Read buffer is full read %d bytes. Need to read again.", len);
147                 isDone = false;
148             }
149         } else if (len == Socket.ERROR) {
150             // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call
151             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13
152             // check more error status
153             this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK;
154             if (_error) {
155                 this._errorMessage = getErrorMessage(errno);
156 
157                 version(HUNT_NET_DEBUG) {
158                     warningf("Error occurred on read, code: %d, msg: %s, isClosing: %s, isClosed: %s", 
159                         errno, _errorMessage, isClosing(), isClosed());
160                 }
161 
162                 if(errno == ECONNRESET) {
163                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
164                     onDisconnected();
165                     errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
166                 } else {
167                     if(!isClosed()) {
168                         errorOccurred(ErrorCode.INTERRUPTED , format("Error occurred on read, code: %d", errno));
169                     }
170                 }
171             } else {
172                 debug warningf("warning on read: fd=%d, errno=%d, message=%s", this.handle,
173                         errno, getErrorMessage(errno));
174             }
175 
176         } else {
177             version (HUNT_DEBUG)
178                 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle);
179             onDisconnected();
180         }
181 
182         return isDone;
183     }
184 
185     override protected void doClose() {
186         version (HUNT_IO_DEBUG) {
187             infof("peer socket %s closing: fd=%d", this.remoteAddress.toString(), this.handle);
188         }
189         if(this.socket is null) {
190           import core.sys.posix.unistd;
191           core.sys.posix.unistd.close(this.handle);
192         } else {
193           this.socket.shutdown(SocketShutdown.BOTH);
194           this.socket.close();
195         }
196             
197         version (HUNT_IO_DEBUG) {
198             infof("peer socket %s closed: fd=%d", this.remoteAddress.toString, this.handle);
199         }
200 
201         Task task = _task;
202         if(task !is null) {
203             task.stop();
204         }
205     }
206 
207 
208     /**
209      * Try to write a block of data.
210      */
211     protected ptrdiff_t tryWrite(const(ubyte)[] data) {
212         clearError();
213         // const nBytes = this.socket.send(data);
214         version (HUNT_IO_DEBUG)
215             tracef("try to write: %d bytes, fd=%d", data.length, this.handle);
216         const nBytes = write(this.handle, data.ptr, data.length);
217         version (HUNT_IO_DEBUG)
218             tracef("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle);
219 
220         if (nBytes > 0) {
221             return nBytes;
222         }
223 
224         if (nBytes == Socket.ERROR) {
225             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38
226             // check more error status
227             // EPIPE/Broken pipe:
228             // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll
229 
230             if(errno == EAGAIN) {
231                 version (HUNT_IO_DEBUG) {
232                     warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle,
233                         errno, getErrorMessage(errno));
234                 }
235             } else if(errno == EINTR || errno == EWOULDBLOCK) {
236                 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait
237                 debug warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle,
238                         errno, getErrorMessage(errno));
239                 // eventLoop.update(this);
240             } else {
241                 this._error = true;
242                 this._errorMessage = getErrorMessage(errno);
243                 if(errno == ECONNRESET) {
244                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
245                     onDisconnected();
246                     errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
247                 } else if(errno == EPIPE) {
248                     // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe
249                     // Handle SIGPIPE signal
250                     onDisconnected();
251                     errorOccurred(ErrorCode.BROKENPIPE , "Broken pipe detected!");
252                 }
253 
254             }
255         } else {
256             version (HUNT_DEBUG) {
257                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
258                 assert(false, "Undefined behavior!");
259             } else {
260                 this._error = true;
261             }
262         }
263 
264         return 0;
265     }
266 
267     private bool tryNextWrite(ByteBuffer buffer) {
268         const(ubyte)[] data = cast(const(ubyte)[])buffer.peekRemaining();
269         version (HUNT_IO_DEBUG) {
270             tracef("writting from a buffer [fd=%d], %d bytes, buffer: %s",
271                 this.handle, data.length, buffer.toString());
272         }
273 
274         ptrdiff_t remaining = data.length;
275         if(data.length == 0)
276             return true;
277 
278         while(remaining > 0 && !_error && !isClosing() && !_isWriteCancelling) {
279             ptrdiff_t nBytes = tryWrite(data);
280             version (HUNT_IO_DEBUG)
281             {
282                 tracef("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s",
283                     this.handle, nBytes, data.length, remaining, buffer.toString());
284             }
285 
286             if (nBytes > 0) {
287                 remaining -= nBytes;
288                 data = data[nBytes .. $];
289             }
290         }
291 
292         version (HUNT_IO_DEBUG) {
293             if(remaining == 0) {
294                     tracef("A buffer is written out. fd=%d", this.handle);
295                 return true;
296             } else {
297                 warningf("Writing cancelled or an error ocurred. fd=%d", this.handle);
298                 return false;
299             }
300         } else {
301             return remaining == 0;
302         }
303     }
304 
305     void resetWriteStatus() {
306         if(_writeQueue !is null)
307             _writeQueue.clear();
308         atomicStore(_isWritting, false);
309         _isWriteCancelling = false;
310     }
311 
312     /**
313      * Should be thread-safe.
314      */
315     override void onWrite() {
316         version (HUNT_IO_DEBUG)
317         {
318             tracef("checking status, isWritting: %s, writeBuffer: %s",
319                 _isWritting, writeBuffer is null ? "null" : writeBuffer.toString());
320         }
321 
322         if(!_isWritting) {
323             version (HUNT_IO_DEBUG)
324             infof("No data needs to be written out. fd=%d", this.handle);
325             return;
326         }
327 
328         if(isClosing() && _isWriteCancelling) {
329             version (HUNT_DEBUG) infof("Write cancelled or closed, fd=%d", this.handle);
330             resetWriteStatus();
331             return;
332         }
333 
334         // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00
335         // More tests are needed
336         // keep thread-safe here
337         if(!cas(&_isBusyWritting, false, true)) {
338             // version (HUNT_IO_DEBUG)
339             version(HUNT_DEBUG) warningf("busy writing. fd=%d", this.handle);
340             return;
341         }
342 
343         scope(exit) {
344             _isBusyWritting = false;
345         }
346 
347         if(writeBuffer !is null) {
348             if(tryNextWrite(writeBuffer)) {
349                 writeBuffer = null;
350             } else {
351                 version (HUNT_IO_DEBUG)
352                 {
353                     infof("waiting to try again... fd=%d, writeBuffer: %s",
354                         this.handle, writeBuffer.toString());
355                 }
356                 // eventLoop.update(this);
357                 return;
358             }
359             version (HUNT_IO_DEBUG)
360                 tracef("running here, fd=%d", this.handle);
361         }
362 
363         if(checkAllWriteDone()) {
364             return;
365         }
366 
367         version (HUNT_IO_DEBUG) {
368             tracef("start to write [fd=%d], writeBuffer %s empty", this.handle, writeBuffer is null ? "is" : "is not");
369         }
370 
371         if(_writeQueue.tryDequeue(writeBuffer)) {
372             if(tryNextWrite(writeBuffer)) {
373                 writeBuffer = null;
374                 checkAllWriteDone();
375             } else {
376             version (HUNT_IO_DEBUG)
377                 infof("waiting to try again: fd=%d, writeBuffer: %s", this.handle, writeBuffer.toString());
378 
379                 // eventLoop.update(this);
380             }
381             version (HUNT_IO_DEBUG) {
382                 warningf("running here, fd=%d", this.handle);
383             }
384         }
385     }
386     private shared bool _isBusyWritting = false;
387 
388     protected bool checkAllWriteDone() {
389         version (HUNT_IO_DEBUG) {
390             import std.conv;
391             tracef("checking remaining: fd=%d, writeQueue empty: %s", this.handle,
392                _writeQueue is null ||  _writeQueue.isEmpty().to!string());
393         }
394 
395         if(_writeQueue is null || _writeQueue.isEmpty()) {
396             resetWriteStatus();
397             version (HUNT_IO_DEBUG)
398                 infof("All data are written out: fd=%d", this.handle);
399             if(dataWriteDoneHandler !is null)
400                 dataWriteDoneHandler(this);
401             return true;
402         }
403 
404         return false;
405     }
406 
407     protected void initializeWriteQueue() {
408         if (_writeQueue is null) {
409             _writeQueue = new WritingBufferQueue();
410         }
411     }
412 
413     protected bool doConnect(Address addr) {
414         try {
415             this.socket.connect(addr);
416         } catch (SocketOSException e) {
417             error(e.msg);
418             version(HUNT_DEBUG) error(e);
419             return false;
420         }
421         return true;
422     }
423 
424     void cancelWrite() {
425         _isWriteCancelling = true;
426     }
427 
428     bool isWriteCancelling() {
429         return _isWriteCancelling;
430     }
431 
432     DataReceivedHandler getDataReceivedHandler() {
433         return dataReceivedHandler;
434     }
435 
436 }