1 module hunt.util.worker.Task; 2 3 import hunt.util.queue; 4 import hunt.logging; 5 6 import core.atomic; 7 import std.datetime; 8 import std.format; 9 10 enum TaskStatus : ubyte { 11 Ready, 12 Processing, 13 Terminated, 14 Done 15 } 16 17 alias TaskQueue = Queue!Task; 18 alias MemoryTaskQueue = SimpleQueue!Task; 19 20 /** 21 * 22 */ 23 abstract class Task { 24 protected shared TaskStatus _status; 25 26 size_t id; 27 28 private MonoTime _createTime; 29 private MonoTime _startTime; 30 private MonoTime _endTime; 31 32 this() { 33 _status = TaskStatus.Ready; 34 _createTime = MonoTime.currTime; 35 } 36 37 Duration survivalTime() { 38 return _endTime - _createTime; 39 } 40 41 Duration executionTime() { 42 return _endTime - _startTime; 43 } 44 45 Duration lifeTime() { 46 if(_endTime > _createTime) { 47 return survivalTime(); 48 } else { 49 return MonoTime.currTime - _createTime; 50 } 51 } 52 53 TaskStatus status() { 54 return _status; 55 } 56 57 bool isReady() { 58 return _status == TaskStatus.Ready; 59 } 60 61 bool isProcessing() { 62 return _status == TaskStatus.Processing; 63 } 64 65 bool isTerminated() { 66 return _status == TaskStatus.Terminated; 67 } 68 69 bool isDone() { 70 return _status == TaskStatus.Done; 71 } 72 73 void stop() { 74 75 version(HUNT_IO_DEBUG) { 76 tracef("The task status: %s", _status); 77 } 78 79 if(!cas(&_status, TaskStatus.Processing, TaskStatus.Terminated) && 80 !cas(&_status, TaskStatus.Ready, TaskStatus.Terminated)) { 81 version(HUNT_IO_DEBUG) { 82 warningf("The task status: %s", _status); 83 } 84 } 85 } 86 87 void finish() { 88 version(HUNT_IO_DEBUG) { 89 tracef("The task %d status: %s", id, _status); 90 } 91 92 if(cas(&_status, TaskStatus.Processing, TaskStatus.Done) || 93 cas(&_status, TaskStatus.Ready, TaskStatus.Done)) { 94 95 _endTime = MonoTime.currTime; 96 version(HUNT_IO_DEBUG) { 97 infof("The task %d done.", id); 98 } 99 } else { 100 version(HUNT_IO_DEBUG) { 101 warningf("The task %d status: %s", _status, id); 102 warningf("Failed to set the task status to Done: %s", _status); 103 } 104 } 105 } 106 107 protected void doExecute(); 108 109 void execute() { 110 if(cas(&_status, TaskStatus.Ready, TaskStatus.Processing)) { 111 version(HUNT_IO_DEBUG) { 112 tracef("Task %d executing... status: %s", id, _status); 113 } 114 _startTime = MonoTime.currTime; 115 scope(exit) { 116 finish(); 117 version(HUNT_IO_DEBUG) { 118 infof("Task %d Done!", id); 119 } 120 } 121 doExecute(); 122 } else { 123 warningf("Failed to execute task %d. Its status is: %s", id, _status); 124 } 125 } 126 127 override string toString() { 128 return format("id: %d, status: %s", id, _status); 129 } 130 131 }