1 module hunt.io.channel.ChannelTask; 2 3 import hunt.event.selector.Selector; 4 import hunt.Functions; 5 import hunt.io.BufferUtils; 6 import hunt.io.ByteBuffer; 7 import hunt.io.channel.AbstractSocketChannel; 8 import hunt.io.channel.Common; 9 import hunt.io.IoError; 10 // import hunt.io.SimpleQueue; 11 import hunt.logging; 12 import hunt.system.Error; 13 import hunt.util.queue; 14 import hunt.util.worker; 15 16 17 import std.format; 18 import std.socket; 19 20 import core.atomic; 21 22 /** 23 * 24 */ 25 class ChannelTask : Task { 26 DataReceivedHandler dataReceivedHandler; 27 private shared bool _isFinishing = false; 28 private Queue!(ByteBuffer) _buffers; 29 30 this() { 31 _buffers = new SimpleQueue!(ByteBuffer); 32 } 33 34 void put(ByteBuffer buffer) { 35 _buffers.push(buffer); 36 } 37 38 bool isFinishing () { 39 return _isFinishing; 40 } 41 42 override protected void doExecute() { 43 44 ByteBuffer buffer; 45 DataHandleStatus handleStatus = DataHandleStatus.Pending; 46 47 do { 48 buffer = _buffers.pop(); 49 if(buffer is null) { 50 version(HUNT_IO_DEBUG) { 51 warning("A null buffer poped"); 52 } 53 break; 54 } 55 56 version(HUNT_IO_DEBUG) { 57 tracef("buffer: %s", buffer.toString()); 58 } 59 60 handleStatus = dataReceivedHandler(buffer); 61 62 version(HUNT_IO_DEBUG) { 63 tracef("Handle status: %s, buffer: %s", handleStatus, buffer.toString()); 64 } 65 66 _isFinishing = isTerminated(); 67 if(!_isFinishing) { 68 _isFinishing = handleStatus == DataHandleStatus.Done && !buffer.hasRemaining() && _buffers.isEmpty(); 69 } 70 71 if(_isFinishing) { 72 version(HUNT_DEBUG) { 73 if(buffer.hasRemaining() || !_buffers.isEmpty()) { 74 warningf("The buffered data lost"); 75 } 76 } 77 break; 78 } 79 } while(true); 80 } 81 } 82