1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.FuturePromise; 13 14 import hunt.concurrency.Future; 15 import hunt.concurrency.Promise; 16 17 import hunt.Exceptions; 18 import hunt.logging.ConsoleLogger; 19 20 import core.atomic; 21 import core.thread; 22 import std.format; 23 import std.datetime; 24 25 /** 26 * 27 */ 28 class FuturePromise(T) : Future!T, Promise!T { 29 private __gshared Exception COMPLETED; 30 private shared bool _done = false; 31 private bool _isResultAvaliable = false; 32 private Exception _cause; 33 private string _id; 34 shared static this() { 35 COMPLETED = new Exception(""); 36 } 37 38 this() { 39 } 40 41 string id() { 42 return _id; 43 } 44 45 void id(string id) { 46 _id = id; 47 } 48 49 static if(is(T == void)) { 50 51 /** 52 * TODO: 53 * 1) keep this operation atomic 54 * 2) return a flag to indicate whether this option is successful. 55 */ 56 void succeeded() { 57 if (cas(&_done, false, true)) { 58 _cause = COMPLETED; 59 _isResultAvaliable = true; 60 } else { 61 warning("This promise has been done, and can't be set again."); 62 } 63 } 64 65 } else { 66 67 /** 68 * TODO: 69 * 1) keep this operation atomic 70 * 2) return a flag to indicate whether this option is successful. 71 */ 72 void succeeded(T result) { 73 if (cas(&_done, false, true)) { 74 _result = result; 75 _cause = COMPLETED; 76 _isResultAvaliable = true; 77 } else { 78 warning("This promise has been done, and can't be set again."); 79 } 80 } 81 private T _result; 82 } 83 84 /** 85 * TODO: 86 * 1) keep this operation atomic 87 * 2) return a flag to indicate whether this option is successful. 88 */ 89 void failed(Exception cause) { 90 if (cas(&_done, false, true)) { 91 _cause = cause; 92 _isResultAvaliable = true; 93 } else { 94 warning("This promise has been done, and can't be set again."); 95 } 96 } 97 98 bool cancel(bool mayInterruptIfRunning) { 99 if (cas(&_done, false, true)) { 100 static if(!is(T == void)) { 101 _result = T.init; 102 } 103 _cause = new CancellationException(""); 104 _isResultAvaliable = true; 105 return true; 106 } 107 return false; 108 } 109 110 bool isCancelled() { 111 if (_done) { 112 try { 113 // _latch.await(); 114 // TODO: Tasks pending completion -@zhangxueping at 2019-12-26T15:18:42+08:00 115 // 116 } catch (InterruptedException e) { 117 throw new RuntimeException(e.msg); 118 } 119 return typeid(_cause) == typeid(CancellationException); 120 } 121 return false; 122 } 123 124 bool isDone() { 125 return _done; 126 } 127 128 T get() { 129 // waitting for the result 130 version (HUNT_DEBUG) info("Waiting for a promise..."); 131 while(!_isResultAvaliable) { 132 Thread.yield(); 133 version(HUNT_DANGER_DEBUG) trace("Waiting for a promise"); 134 } 135 136 version (HUNT_DEBUG) info("Got a promise"); 137 assert(_cause !is null); 138 139 if (_cause is COMPLETED) { 140 static if(is(T == void)) { 141 return; 142 } else { 143 return _result; 144 } 145 } 146 147 CancellationException c = cast(CancellationException) _cause; 148 if (c !is null) { 149 version(HUNT_DEBUG) info("A promise cancelled."); 150 throw c; 151 } 152 153 debug warning("Get a exception in a promise: ", _cause.msg); 154 version (HUNT_DEBUG) warning(_cause); 155 throw new ExecutionException(_cause); 156 } 157 158 T get(Duration timeout) { 159 // waitting for the result 160 if(!_isResultAvaliable) { 161 version (HUNT_DEBUG) { 162 infof("Waiting for a promise in %s...", timeout); 163 } 164 auto start = Clock.currTime; 165 while (!_isResultAvaliable && Clock.currTime < start + timeout) { 166 Thread.yield(); 167 } 168 169 if (!_isResultAvaliable) { 170 debug warningf("Timeout for a promise in %s...", timeout); 171 failed(new TimeoutException("Timeout in " ~ timeout.toString())); 172 } 173 174 version (HUNT_DEBUG) { 175 auto dur = Clock.currTime - start; 176 if(dur > 5.seconds) { 177 warningf("Got a promise in %s", dur); 178 } else { 179 // infof("Got a promise in %s", dur); 180 } 181 } 182 } 183 184 if (_cause is COMPLETED) { 185 static if(is(T == void)) { 186 return; 187 } else { 188 return _result; 189 } 190 } 191 192 TimeoutException t = cast(TimeoutException) _cause; 193 if (t !is null) 194 throw t; 195 196 CancellationException c = cast(CancellationException) _cause; 197 if (c !is null) 198 throw c; 199 200 throw new ExecutionException(_cause.msg); 201 } 202 203 override string toString() { 204 static if(is(T == void)) { 205 return format("FutureCallback@%x{%b, %b, void}", toHash(), _done, _cause is COMPLETED); 206 } else { 207 return format("FutureCallback@%x{%b, %b, %s}", toHash(), _done, _cause is COMPLETED, _result); 208 } 209 } 210 }