1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.thread.LockSupport;
13 
14 import core.thread;
15 import core.time;
16 
17 import hunt.concurrency.thread.ThreadEx;
18 import hunt.util.DateTime;
19 import hunt.Exceptions;
20 
21 import hunt.logging.ConsoleLogger;
22 
23 
24 /**
25  * Basic thread blocking primitives for creating locks and other
26  * synchronization classes.
27  *
28  * <p>This class associates, with each thread that uses it, a permit
29  * (in the sense of the {@link java.util.concurrent.Semaphore
30  * Semaphore} class). A call to {@code park} will return immediately
31  * if the permit is available, consuming it in the process; otherwise
32  * it <em>may</em> block.  A call to {@code unpark} makes the permit
33  * available, if it was not already available. (Unlike with Semaphores
34  * though, permits do not accumulate. There is at most one.)
35  * Reliable usage requires the use of volatile (or atomic) variables
36  * to control when to park or unpark.  Orderings of calls to these
37  * methods are maintained with respect to volatile variable accesses,
38  * but not necessarily non-volatile variable accesses.
39  *
40  * <p>Methods {@code park} and {@code unpark} provide efficient
41  * means of blocking and unblocking threads that do not encounter the
42  * problems that cause the deprecated methods {@code Thread.suspend}
43  * and {@code Thread.resume} to be unusable for such purposes: Races
44  * between one thread invoking {@code park} and another thread trying
45  * to {@code unpark} it will preserve liveness, due to the
46  * permit. Additionally, {@code park} will return if the caller's
47  * thread was interrupted, and timeout versions are supported. The
48  * {@code park} method may also return at any other time, for "no
49  * reason", so in general must be invoked within a loop that rechecks
50  * conditions upon return. In this sense {@code park} serves as an
51  * optimization of a "busy wait" that does not waste as much time
52  * spinning, but must be paired with an {@code unpark} to be
53  * effective.
54  *
55  * <p>The three forms of {@code park} each also support a
56  * {@code blocker} object parameter. This object is recorded while
57  * the thread is blocked to permit monitoring and diagnostic tools to
58  * identify the reasons that threads are blocked. (Such tools may
59  * access blockers using method {@link #getBlocker(Thread)}.)
60  * The use of these forms rather than the original forms without this
61  * parameter is strongly encouraged. The normal argument to supply as
62  * a {@code blocker} within a lock implementation is {@code this}.
63  *
64  * <p>These methods are designed to be used as tools for creating
65  * higher-level synchronization utilities, and are not in themselves
66  * useful for most concurrency control applications.  The {@code park}
67  * method is designed for use only in constructions of the form:
68  *
69  * <pre> {@code
70  * while (!canProceed()) {
71  *   // ensure request to unpark is visible to other threads
72  *   ...
73  *   LockSupport.park(this);
74  * }}</pre>
75  *
76  * where no actions by the thread publishing a request to unpark,
77  * prior to the call to {@code park}, entail locking or blocking.
78  * Because only one permit is associated with each thread, any
79  * intermediary uses of {@code park}, including implicitly via class
80  * loading, could lead to an unresponsive thread (a "lost unpark").
81  *
82  * <p><b>Sample Usage.</b> Here is a sketch of a first-in-first-out
83  * non-reentrant lock class:
84  * <pre> {@code
85  * class FIFOMutex {
86  *   private final AtomicBoolean locked = new AtomicBoolean(false);
87  *   private final Queue!(Thread) waiters
88  *     = new ConcurrentLinkedQueue<>();
89  *
90  *   void lock() {
91  *     boolean wasInterrupted = false;
92  *     // publish current thread for unparkers
93  *     waiters.add(Thread.currentThread());
94  *
95  *     // Block while not first in queue or cannot acquire lock
96  *     while (waiters.peek() != Thread.currentThread() ||
97  *            !locked.compareAndSet(false, true)) {
98  *       LockSupport.park(this);
99  *       // ignore interrupts while waiting
100  *       if (Thread.interrupted())
101  *         wasInterrupted = true;
102  *     }
103  *
104  *     waiters.remove();
105  *     // ensure correct interrupt status on return
106  *     if (wasInterrupted)
107  *       Thread.currentThread().interrupt();
108  *   }
109  *
110  *   void unlock() {
111  *     locked.set(false);
112  *     LockSupport.unpark(waiters.peek());
113  *   }
114  *
115  *   static {
116  *     // Reduce the risk of "lost unpark" due to classloading
117  *     Class<?> ensureLoaded = LockSupport.class;
118  *   }
119  * }}</pre>
120  *
121  * @since 1.5
122  */
123 class LockSupport {
124     private this() {} // Cannot be instantiated.
125 
126     private static void setBlocker(ThreadEx t, Object arg) {
127         // Even though volatile, hotspot doesn't need a write barrier here.
128         t.parkBlocker = arg;
129         // implementationMissing(false);
130     }
131 
132     /**
133      * Makes available the permit for the given thread, if it
134      * was not already available.  If the thread was blocked on
135      * {@code park} then it will unblock.  Otherwise, its next call
136      * to {@code park} is guaranteed not to block. This operation
137      * is not guaranteed to have any effect at all if the given
138      * thread has not been started.
139      *
140      * @param thread the thread to unpark, or {@code null}, in which case
141      *        this operation has no effect
142      */
143     static void unpark(Thread thread) {
144         ThreadEx tx = cast(ThreadEx)thread;
145         if (tx !is null) 
146             tx.parker().unpark();
147     }
148 
149 
150     /**
151      * Disables the current thread for thread scheduling purposes unless the
152      * permit is available.
153      *
154      * <p>If the permit is available then it is consumed and the call
155      * returns immediately; otherwise the current thread becomes disabled
156      * for thread scheduling purposes and lies dormant until one of three
157      * things happens:
158      *
159      * <ul>
160      *
161      * <li>Some other thread invokes {@link #unpark unpark} with the
162      * current thread as the target; or
163      *
164      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
165      * the current thread; or
166      *
167      * <li>The call spuriously (that is, for no reason) returns.
168      * </ul>
169      *
170      * <p>This method does <em>not</em> report which of these caused the
171      * method to return. Callers should re-check the conditions which caused
172      * the thread to park in the first place. Callers may also determine,
173      * for example, the interrupt status of the thread upon return.
174      */
175     static void park() {
176         ThreadEx tx = cast(ThreadEx)Thread.getThis();
177         if (tx !is null) {
178             tx.parker().park(Duration.zero);
179         } else {
180             warning("The current thread is not ThreadEx!");
181             // TODO: Tasks pending completion -@zxp at 11/7/2018, 10:08:21 AM
182             // upgrade Thread to ThreadEx
183             Thread.sleep(Duration.zero);
184         }
185     }
186 
187     static void park(Duration time) {        
188         ThreadEx tx = cast(ThreadEx)Thread.getThis();
189         if (!time.isNegative && tx !is null) {
190             tx.parker().park(time);
191         } else {
192             warning("The current thread is not ThreadEx!");
193             Thread.sleep(time);
194         }
195     }
196 
197     /**
198      * Disables the current thread for thread scheduling purposes unless the
199      * permit is available.
200      *
201      * <p>If the permit is available then it is consumed and the call returns
202      * immediately; otherwise
203      * the current thread becomes disabled for thread scheduling
204      * purposes and lies dormant until one of three things happens:
205      *
206      * <ul>
207      * <li>Some other thread invokes {@link #unpark unpark} with the
208      * current thread as the target; or
209      *
210      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
211      * the current thread; or
212      *
213      * <li>The call spuriously (that is, for no reason) returns.
214      * </ul>
215      *
216      * <p>This method does <em>not</em> report which of these caused the
217      * method to return. Callers should re-check the conditions which caused
218      * the thread to park in the first place. Callers may also determine,
219      * for example, the interrupt status of the thread upon return.
220      *
221      * @param blocker the synchronization object responsible for this
222      *        thread parking
223      * @since 1.6
224      */
225     static void park(Object blocker) {
226         park(blocker, Duration.zero);
227     }
228 
229     static void park(Object blocker, Duration time) {
230         ThreadEx tx = cast(ThreadEx)Thread.getThis();
231         if (time >= Duration.zero && tx !is null) {
232             setBlocker(tx, blocker);
233             tx.parker().park(time);
234             setBlocker(tx, null);
235         } else {
236             warning("The current thread is not ThreadEx!");
237         }
238     }
239 
240     /**
241      * Disables the current thread for thread scheduling purposes, for up to
242      * the specified waiting time, unless the permit is available.
243      *
244      * <p>If the permit is available then it is consumed and the call
245      * returns immediately; otherwise the current thread becomes disabled
246      * for thread scheduling purposes and lies dormant until one of four
247      * things happens:
248      *
249      * <ul>
250      * <li>Some other thread invokes {@link #unpark unpark} with the
251      * current thread as the target; or
252      *
253      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
254      * the current thread; or
255      *
256      * <li>The specified waiting time elapses; or
257      *
258      * <li>The call spuriously (that is, for no reason) returns.
259      * </ul>
260      *
261      * <p>This method does <em>not</em> report which of these caused the
262      * method to return. Callers should re-check the conditions which caused
263      * the thread to park in the first place. Callers may also determine,
264      * for example, the interrupt status of the thread, or the elapsed time
265      * upon return.
266      *
267      * @param blocker the synchronization object responsible for this
268      *        thread parking
269      * @param nanos the maximum number of nanoseconds to wait
270      * @since 1.6
271      */
272     static void parkNanos(Object blocker, long nanos) {
273         if(nanos > 0) {
274             park(blocker, dur!(TimeUnit.Nanosecond)(nanos));
275         }
276     }
277 
278     /**
279      * Disables the current thread for thread scheduling purposes, until
280      * the specified deadline, unless the permit is available.
281      *
282      * <p>If the permit is available then it is consumed and the call
283      * returns immediately; otherwise the current thread becomes disabled
284      * for thread scheduling purposes and lies dormant until one of four
285      * things happens:
286      *
287      * <ul>
288      * <li>Some other thread invokes {@link #unpark unpark} with the
289      * current thread as the target; or
290      *
291      * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
292      * current thread; or
293      *
294      * <li>The specified deadline passes; or
295      *
296      * <li>The call spuriously (that is, for no reason) returns.
297      * </ul>
298      *
299      * <p>This method does <em>not</em> report which of these caused the
300      * method to return. Callers should re-check the conditions which caused
301      * the thread to park in the first place. Callers may also determine,
302      * for example, the interrupt status of the thread, or the current time
303      * upon return.
304      *
305      * @param blocker the synchronization object responsible for this
306      *        thread parking
307      * @param deadline the absolute time, in milliseconds from the Epoch,
308      *        to wait until
309      * @since 1.6
310      */
311     static void parkUntil(Object blocker, long deadline) {
312         ThreadEx t = ThreadEx.currentThread();
313         setBlocker(t, blocker);
314         // U.park(true, deadline);
315         implementationMissing(false);
316         setBlocker(t, null);
317     }
318 
319     /**
320      * Disables the current thread for thread scheduling purposes, for up to
321      * the specified waiting time, unless the permit is available.
322      *
323      * <p>If the permit is available then it is consumed and the call
324      * returns immediately; otherwise the current thread becomes disabled
325      * for thread scheduling purposes and lies dormant until one of four
326      * things happens:
327      *
328      * <ul>
329      * <li>Some other thread invokes {@link #unpark unpark} with the
330      * current thread as the target; or
331      *
332      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
333      * the current thread; or
334      *
335      * <li>The specified waiting time elapses; or
336      *
337      * <li>The call spuriously (that is, for no reason) returns.
338      * </ul>
339      *
340      * <p>This method does <em>not</em> report which of these caused the
341      * method to return. Callers should re-check the conditions which caused
342      * the thread to park in the first place. Callers may also determine,
343      * for example, the interrupt status of the thread, or the elapsed time
344      * upon return.
345      *
346      * @param nanos the maximum number of nanoseconds to wait
347      */
348     static void parkNanos(long nanos) {        
349         ThreadEx tx = cast(ThreadEx)Thread.getThis();
350         if (nanos > 0 && tx !is null) {
351             tx.parker().park(dur!(TimeUnit.Nanosecond)(nanos));
352         }
353     }
354 
355     /**
356      * Disables the current thread for thread scheduling purposes, until
357      * the specified deadline, unless the permit is available.
358      *
359      * <p>If the permit is available then it is consumed and the call
360      * returns immediately; otherwise the current thread becomes disabled
361      * for thread scheduling purposes and lies dormant until one of four
362      * things happens:
363      *
364      * <ul>
365      * <li>Some other thread invokes {@link #unpark unpark} with the
366      * current thread as the target; or
367      *
368      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
369      * the current thread; or
370      *
371      * <li>The specified deadline passes; or
372      *
373      * <li>The call spuriously (that is, for no reason) returns.
374      * </ul>
375      *
376      * <p>This method does <em>not</em> report which of these caused the
377      * method to return. Callers should re-check the conditions which caused
378      * the thread to park in the first place. Callers may also determine,
379      * for example, the interrupt status of the thread, or the current time
380      * upon return.
381      *
382      * @param deadline the absolute time, in milliseconds from the Epoch,
383      *        to wait until
384      */
385     static void parkUntil(long deadline) {
386         // U.park(true, deadline);
387         implementationMissing(false);
388     }
389 
390     /**
391      * Returns the blocker object supplied to the most recent
392      * invocation of a park method that has not yet unblocked, or null
393      * if not blocked.  The value returned is just a momentary
394      * snapshot -- the thread may have since unblocked or blocked on a
395      * different blocker object.
396      *
397      * @param t the thread
398      * @return the blocker
399      * @throws NullPointerException if argument is null
400      * @since 1.6
401      */
402     static Object getBlocker(Thread t) {
403          ThreadEx tx = cast(ThreadEx)t;
404         if (tx is null) 
405             throw new NullPointerException();
406         return tx.parkBlocker;
407         
408     }
409 
410     /**
411      * Returns the pseudo-randomly initialized or updated secondary seed.
412      * Copied from ThreadLocalRandom due to package access restrictions.
413      */
414     // static final int nextSecondarySeed() {
415     //     int r;
416     //     Thread t = Thread.currentThread();
417     //     if ((r = U.getInt(t, SECONDARY)) != 0) {
418     //         r ^= r << 13;   // xorshift
419     //         r ^= r >>> 17;
420     //         r ^= r << 5;
421     //     }
422     //     else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
423     //         r = 1; // avoid zero
424     //     U.putInt(t, SECONDARY, r);
425     //     return r;
426     // }
427 
428     /**
429      * Returns the thread id for the given thread.  We must access
430      * this directly rather than via method Thread.getId() because
431      * getId() has been known to be overridden in ways that do not
432      * preserve unique mappings.
433      */
434     // static final long getThreadId(Thread thread) {
435     //     return U.getLong(thread, TID);
436     // }
437 
438 }