1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.FuturePromise;
13 
14 import hunt.concurrency.Future;
15 import hunt.concurrency.Promise;
16 
17 import hunt.Exceptions;
18 import hunt.logging.ConsoleLogger;
19 
20 import core.atomic;
21 import core.thread;
22 import std.format;
23 import std.datetime;
24 
25 /**
26  * 
27  */
28 class FuturePromise(T) : Future!T, Promise!T {
29 	private __gshared Exception COMPLETED;
30 	private shared bool _done = false;
31 	private bool _isResultAvaliable = false;
32 	private Exception _cause;
33 	private string _id;
34 	shared static this() {
35 		COMPLETED = new Exception("");
36 	}
37 
38 	this() {
39 	}
40 
41 	string id() {
42 		return _id;
43 	}
44 
45 	void id(string id) {
46 		_id = id;
47 	}
48 
49 static if(is(T == void)) {
50 	
51 	/**
52 	 * TODO: 
53 	 * 	1) keep this operation atomic
54 	 * 	2) return a flag to indicate whether this option is successful.
55 	 */
56 	void succeeded() {
57 		if (cas(&_done, false, true)) {
58 			_cause = COMPLETED;
59 			_isResultAvaliable = true;
60 		} else {
61 			warning("This promise has been done, and can't be set again.");
62 		}
63 	}
64 
65 } else {
66 
67 	/**
68 	 * TODO: 
69 	 * 	1) keep this operation atomic
70 	 * 	2) return a flag to indicate whether this option is successful.
71 	 */
72 	void succeeded(T result) {
73 		if (cas(&_done, false, true)) {
74 			_result = result;
75 			_cause = COMPLETED;
76 			_isResultAvaliable = true;
77 		} else {
78 			warning("This promise has been done, and can't be set again.");
79 		}
80 	}
81 	private T _result;
82 }
83 
84 	/**
85 	 * TODO: 
86 	 * 	1) keep this operation atomic
87 	 * 	2) return a flag to indicate whether this option is successful.
88 	 */
89 	void failed(Exception cause) {
90 		if (cas(&_done, false, true)) {
91 			_cause = cause;
92 			_isResultAvaliable = true;
93 		} else {
94 			warning("This promise has been done, and can't be set again.");
95 		}
96 	}
97 
98 	bool cancel(bool mayInterruptIfRunning) {
99 		if (cas(&_done, false, true)) {
100 			static if(!is(T == void)) {
101 				_result = T.init;
102 			}
103 			_cause = new CancellationException("");
104 			_isResultAvaliable = true;
105 			return true;
106 		}
107 		return false;
108 	}
109 
110 	bool isCancelled() {
111 		if (_done) {
112 			try {
113 				// _latch.await();
114 				// TODO: Tasks pending completion -@zhangxueping at 2019-12-26T15:18:42+08:00
115 				// 
116 			} catch (InterruptedException e) {
117 				throw new RuntimeException(e.msg);
118 			}
119 			return typeid(_cause) == typeid(CancellationException);
120 		}
121 		return false;
122 	}
123 
124 	bool isDone() {
125 		return _done;
126 	}
127 
128 	T get() {
129 		// waitting for the result
130 		version (HUNT_DEBUG) info("Waiting for a promise...");
131 		while(!_isResultAvaliable) {
132             Thread.yield();
133 			version(HUNT_DANGER_DEBUG) trace("Waiting for a promise");
134 		}
135 
136 		version (HUNT_DEBUG) info("Got a promise");
137 		assert(_cause !is null);
138 
139 		if (_cause is COMPLETED) {
140 			static if(is(T == void)) {
141 				return;
142 			} else {
143 				return _result;
144 			}
145 		}
146 
147 		CancellationException c = cast(CancellationException) _cause;
148 		if (c !is null) {
149 			version(HUNT_DEBUG) info("A promise cancelled.");
150 			throw c;
151 		}
152 		
153 		debug warning("Get a exception in a promise: ", _cause.msg);
154 		version (HUNT_DEBUG) warning(_cause);
155 		throw new ExecutionException(_cause);
156 	}
157 
158 	T get(Duration timeout) {
159 		// waitting for the result
160 		if(!_isResultAvaliable) {
161 			version (HUNT_DEBUG) {
162 				infof("Waiting for a promise in %s...", timeout);
163 			}
164             auto start = Clock.currTime;
165             while (!_isResultAvaliable && Clock.currTime < start + timeout) {
166                 Thread.yield();
167             }
168 
169 			if (!_isResultAvaliable) {
170 				debug warningf("Timeout for a promise in %s...", timeout);
171 				failed(new TimeoutException("Timeout in " ~ timeout.toString()));
172             }
173 
174 			version (HUNT_DEBUG) {
175 				auto dur = Clock.currTime - start;
176 				if(dur > 5.seconds) {
177 					warningf("Got a promise in %s", dur);
178 				} else {
179 					// infof("Got a promise in %s", dur);
180 				}
181 			}
182 		}		
183 
184 		if (_cause is COMPLETED) {
185 			static if(is(T == void)) {
186 				return;
187 			} else {
188 				return _result;
189 			}
190 		}
191 
192 		TimeoutException t = cast(TimeoutException) _cause;
193 		if (t !is null)
194 			throw t;
195 
196 		CancellationException c = cast(CancellationException) _cause;
197 		if (c !is null)
198 			throw c;
199 
200 		throw new ExecutionException(_cause.msg);
201 	}
202 
203 	override string toString() {
204 		static if(is(T == void)) {
205 			return format("FutureCallback@%x{%b, %b, void}", toHash(), _done, _cause is COMPLETED);
206 		} else {
207 			return format("FutureCallback@%x{%b, %b, %s}", toHash(), _done, _cause is COMPLETED, _result);
208 		}
209 	}
210 }