1 module hunt.io.channel.ChannelTask;
2 
3 import hunt.event.selector.Selector;
4 import hunt.Functions;
5 import hunt.io.BufferUtils;
6 import hunt.io.ByteBuffer;
7 import hunt.io.channel.AbstractSocketChannel;
8 import hunt.io.channel.Common;
9 import hunt.io.IoError;
10 // import hunt.io.SimpleQueue;
11 import hunt.logging;
12 import hunt.system.Error;
13 import hunt.util.queue;
14 import hunt.util.worker;
15 
16 
17 import std.format;
18 import std.socket;
19 
20 import core.atomic;
21 
22 /**
23  * 
24  */
25 class ChannelTask : Task {
26     DataReceivedHandler dataReceivedHandler;
27     private shared bool _isFinishing = false;
28     private Queue!(ByteBuffer) _buffers;
29 
30     this() {
31         _buffers = new SimpleQueue!(ByteBuffer);
32     }
33 
34     void put(ByteBuffer buffer) {
35         _buffers.push(buffer);
36     }
37 
38     bool isFinishing () {
39         return _isFinishing;
40     }
41 
42     override protected void doExecute() {
43 
44         ByteBuffer buffer;
45         DataHandleStatus handleStatus = DataHandleStatus.Pending;
46 
47         do {
48             buffer = _buffers.pop();
49             if(buffer is null) {
50                 version(HUNT_IO_DEBUG) {
51                     warning("A null buffer poped");
52                 }
53                 break;
54             }
55 
56             version(HUNT_IO_DEBUG) {
57                 tracef("buffer: %s", buffer.toString());
58             }
59 
60             handleStatus = dataReceivedHandler(buffer);
61 
62             version(HUNT_IO_DEBUG) {
63                 tracef("Handle status: %s, buffer: %s", handleStatus, buffer.toString());
64             }
65             
66             _isFinishing = isTerminated();
67             if(!_isFinishing) {
68                 _isFinishing = handleStatus == DataHandleStatus.Done && !buffer.hasRemaining() && _buffers.isEmpty();
69             }
70 
71             if(_isFinishing) {
72                 version(HUNT_DEBUG) {
73                     if(buffer.hasRemaining() || !_buffers.isEmpty()) {
74                         warningf("The buffered data lost");
75                     }
76                 }
77                 break;
78             }
79         } while(true);
80     }
81 }
82