1 module hunt.io.channel.posix.AbstractStream;
2 
3 // dfmt off
4 version(Posix):
5 // dfmt on
6 
7 import hunt.collection.BufferUtils;
8 import hunt.collection.ByteBuffer;
9 import hunt.event.selector.Selector;
10 import hunt.Functions;
11 import hunt.io.channel.AbstractSocketChannel;
12 import hunt.io.channel.Common;
13 import hunt.io.IoError;
14 import hunt.logging.ConsoleLogger;
15 import hunt.system.Error;
16 
17 import std.format;
18 import std.socket;
19 
20 import core.atomic;
21 import core.stdc.errno;
22 import core.stdc.string;
23 import core.sys.posix.sys.socket : accept;
24 import core.sys.posix.unistd;
25 
26 enum string ResponseData = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nConnection: Keep-Alive\r\nContent-Type: text/plain\r\nServer: Hunt/1.0\r\nDate: Wed, 17 Apr 2013 12:00:00 GMT\r\n\r\nHello, World!";
27 
28 /**
29 TCP Peer
30 */
31 abstract class AbstractStream : AbstractSocketChannel {
32     enum BufferSize = 4096;
33     private const(ubyte)[] _readBuffer;
34     private ByteBuffer writeBuffer;
35 
36     /**
37     * Warning: The received data is stored a inner buffer. For a data safe,
38     * you would make a copy of it.
39     */
40     protected DataReceivedHandler dataReceivedHandler;
41     protected SimpleEventHandler disconnectionHandler;
42     protected SimpleActionHandler dataWriteDoneHandler;
43 
44     protected AddressFamily _family;
45     protected ByteBuffer _bufferForRead;
46     protected WritingBufferQueue _writeQueue;
47     protected bool isWriteCancelling = false;
48 
49     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) {
50         this._family = family;
51         _bufferForRead = BufferUtils.allocate(bufferSize);
52         _bufferForRead.limit(cast(int)bufferSize);
53         _readBuffer = cast(ubyte[])_bufferForRead.array();
54         // _writeQueue = new WritingBufferQueue();
55         super(loop, ChannelType.TCP);
56         setFlag(ChannelFlag.Read, true);
57         setFlag(ChannelFlag.Write, true);
58         setFlag(ChannelFlag.ETMode, true);
59     }
60 
61     abstract bool isClient();
62     abstract bool isConnected() nothrow;
63     abstract protected void onDisconnected();
64 
65     /**
66      *
67      */
68     protected bool tryRead() {
69         bool isDone = true;
70         this.clearError();
71         ptrdiff_t len = read(this.handle, cast(void*) _readBuffer.ptr, _readBuffer.length);
72 
73         // ubyte[] rb = new ubyte[BufferSize];
74         // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length);
75         version (HUNT_IO_DEBUG) {
76             tracef("reading[fd=%d]: %d bytes", this.handle, len);
77         }
78 
79         if (len > 0) {
80             version(HUNT_IO_DEBUG) {
81                 if (len <= 32)
82                     infof("fd: %d, %d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. len]);
83                 else
84                     infof("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. 32]);
85             }
86 
87             if (dataReceivedHandler !is null) {
88                 _bufferForRead.limit(cast(int)len);
89                 _bufferForRead.position(0);
90                 dataReceivedHandler(_bufferForRead);
91 
92                 // ByteBuffer bb = BufferUtils.wrap(cast(byte[])rb[0..len]);
93                 // dataReceivedHandler(bb);
94             }
95 
96             // size_t nBytes = tryWrite(cast(ubyte[])ResponseData);
97 
98             // if(nBytes < ResponseData.length) {
99             //     warning("data lost");
100             // }
101 
102             // It's prossible that there are more data waitting for read in the read I/O space.
103             if (len == _readBuffer.length) {
104                 version (HUNT_IO_DEBUG) infof("Read buffer is full read %d bytes. Need to read again.", len);
105                 isDone = false;
106             }
107         } else if (len == Socket.ERROR) {
108             // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call
109             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13
110             // check more error status
111             this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK;
112             if (_error) {
113                 this._errorMessage = getErrorMessage(errno);
114                 errorOccurred(ErrorCode.INTERRUPTED , "read buffer error");
115             } else {
116                 debug warningf("warning on read: fd=%d, errno=%d, message=%s", this.handle,
117                         errno, getErrorMessage(errno));
118             }
119 
120             if(errno == ECONNRESET) {
121                 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
122                 onDisconnected();
123                 errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
124             }
125         } else {
126             version (HUNT_DEBUG)
127                 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle);
128             onDisconnected();
129         }
130 
131         return isDone;
132     }
133 
134     override protected void doClose() {
135         version (HUNT_IO_DEBUG) {
136             infof("peer socket closing: fd=%d", this.handle);
137         }
138         if(this.socket is null) {
139             import core.sys.posix.unistd;
140             core.sys.posix.unistd.close(this.handle);
141         } else {
142             this.socket.shutdown(SocketShutdown.BOTH);
143             this.socket.close();
144         }
145 
146         version (HUNT_IO_DEBUG) {
147             infof("peer socket closed: fd=%d", this.handle);
148         }
149     }
150 
151 
152     /**
153      * Try to write a block of data.
154      */
155     protected ptrdiff_t tryWrite(const ubyte[] data) {
156         clearError();
157         // const nBytes = this.socket.send(data);
158         version (HUNT_IO_DEBUG)
159             tracef("try to write: %d bytes, fd=%d", data.length, this.handle);
160         const nBytes = write(this.handle, data.ptr, data.length);
161         // version (HUNT_IO_DEBUG)
162             // tracef("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle);
163 
164         if (nBytes > 0) {
165             return nBytes;
166         }
167 
168         if (nBytes == Socket.ERROR) {
169             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38
170             // check more error status
171             // EPIPE/Broken pipe:
172             // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe
173             // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll
174 
175             if(errno == EAGAIN) {
176                 version (HUNT_IO_DEBUG) {
177                     warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle,
178                         errno, getErrorMessage(errno));
179                 }
180             } else if(errno == EINTR || errno == EWOULDBLOCK) {
181                 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait
182                 debug warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle,
183                         errno, getErrorMessage(errno));
184                 // eventLoop.update(this);
185             } else {
186                 this._error = true;
187                 this._errorMessage = getErrorMessage(errno);
188                 if(errno == ECONNRESET) {
189                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
190                     onDisconnected();
191                     errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
192                 }
193             }
194         } else {
195             version (HUNT_DEBUG) {
196                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
197                 assert(false, "Undefined behavior!");
198             } else {
199                 this._error = true;
200             }
201         }
202 
203         //if (this._error) {
204         //    this._errorMessage = getErrorMessage(errno);
205         //    string msg = format("Socket error on write: fd=%d, code: %d, message=%s",
206         //            this.handle, errno, this.errorMessage);
207         //    debug errorf(msg);
208         //    errorOccurred(msg);
209         //}
210 
211         return 0;
212     }
213 
214     private bool tryNextWrite(ByteBuffer buffer) {
215         const(ubyte)[] data = cast(const(ubyte)[])buffer.getRemaining();
216         version (HUNT_IO_DEBUG) {
217             tracef("writting from a buffer [fd=%d], %d bytes, buffer: %s",
218                 this.handle, data.length, buffer.toString());
219         }
220 
221         ptrdiff_t remaining = data.length;
222         if(data.length == 0)
223             return true;
224 
225         while(remaining > 0 && !_error && !isClosing() && !isWriteCancelling) {
226             ptrdiff_t nBytes = tryWrite(data);
227             version (HUNT_IO_DEBUG)
228             {
229                 tracef("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s",
230                     this.handle, nBytes, data.length, remaining, buffer.toString());
231             }
232 
233             if (nBytes > 0) {
234                 remaining -= nBytes;
235                 data = data[nBytes .. $];
236             }
237         }
238 
239         version (HUNT_IO_DEBUG) {
240             if(remaining == 0) {
241                     tracef("A buffer is written out. fd=%d", this.handle);
242                 return true;
243             } else {
244                 warningf("Writing cancelled or an error ocurred. fd=%d", this.handle);
245                 return false;
246             }
247         } else {
248             return remaining == 0;
249         }
250     }
251 
252     void resetWriteStatus() {
253         if(_writeQueue !is null)
254             _writeQueue.clear();
255         atomicStore(_isWritting, false);
256         isWriteCancelling = false;
257     }
258 
259     /**
260      * Should be thread-safe.
261      */
262     override void onWrite() {
263         version (HUNT_IO_DEBUG)
264         {
265             tracef("checking status, isWritting: %s, writeBuffer: %s",
266                 _isWritting, writeBuffer is null ? "null" : writeBuffer.toString());
267         }
268 
269         if(!_isWritting) {
270             version (HUNT_IO_DEBUG)
271             infof("No data needs to be written out. fd=%d", this.handle);
272             return;
273         }
274 
275         if(isClosing() && isWriteCancelling) {
276             version (HUNT_DEBUG) infof("Write cancelled or closed, fd=%d", this.handle);
277             resetWriteStatus();
278             return;
279         }
280 
281         // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00
282         // More tests are needed
283         // keep thread-safe here
284         if(!cas(&_isBusyWritting, false, true)) {
285             // version (HUNT_IO_DEBUG)
286             version(HUNT_DEBUG) warningf("busy writing. fd=%d", this.handle);
287             return;
288         }
289 
290         scope(exit) {
291             _isBusyWritting = false;
292         }
293 
294         if(writeBuffer !is null) {
295             if(tryNextWrite(writeBuffer)) {
296                 writeBuffer = null;
297             } else {
298                 version (HUNT_IO_DEBUG)
299                 {
300                     infof("waiting to try again... fd=%d, writeBuffer: %s",
301                         this.handle, writeBuffer.toString());
302                 }
303                 // eventLoop.update(this);
304                 return;
305             }
306             version (HUNT_IO_DEBUG)
307                 tracef("running here, fd=%d", this.handle);
308         }
309 
310         if(checkAllWriteDone()) {
311             return;
312         }
313 
314         version (HUNT_IO_DEBUG) {
315             tracef("start to write [fd=%d], writeBuffer %s empty", this.handle, writeBuffer is null ? "is" : "is not");
316         }
317 
318         if(_writeQueue.tryDequeue(writeBuffer)) {
319             if(tryNextWrite(writeBuffer)) {
320                 writeBuffer = null;
321                 checkAllWriteDone();
322             } else {
323             version (HUNT_IO_DEBUG)
324                 infof("waiting to try again: fd=%d, writeBuffer: %s", this.handle, writeBuffer.toString());
325 
326                 // eventLoop.update(this);
327             }
328             version (HUNT_IO_DEBUG) {
329                 warningf("running here, fd=%d", this.handle);
330             }
331         }
332     }
333     private shared bool _isBusyWritting = false;
334 
335     protected bool checkAllWriteDone() {
336         version (HUNT_IO_DEBUG) {
337             import std.conv;
338             tracef("checking remaining: fd=%d, writeQueue empty: %s", this.handle,
339                _writeQueue is null ||  _writeQueue.isEmpty().to!string());
340         }
341 
342         if(_writeQueue is null || _writeQueue.isEmpty()) {
343             resetWriteStatus();
344             version (HUNT_IO_DEBUG)
345                 infof("All data are written out: fd=%d", this.handle);
346             if(dataWriteDoneHandler !is null)
347                 dataWriteDoneHandler(this);
348             return true;
349         }
350 
351         return false;
352     }
353 
354     protected void initializeWriteQueue() {
355         if (_writeQueue is null) {
356             _writeQueue = new WritingBufferQueue();
357         }
358     }
359 
360     protected bool doConnect(Address addr) {
361         try {
362             this.socket.connect(addr);
363         } catch (SocketOSException e) {
364             error(e.msg);
365             version(HUNT_DEBUG) error(e);
366             return false;
367         }
368         return true;
369     }
370 
371     void cancelWrite() {
372         isWriteCancelling = true;
373     }
374 }