1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.thread.LockSupport; 13 14 import core.thread; 15 import core.time; 16 17 import hunt.concurrency.thread.ThreadEx; 18 import hunt.util.DateTime; 19 import hunt.Exceptions; 20 21 import hunt.logging.ConsoleLogger; 22 23 24 /** 25 * Basic thread blocking primitives for creating locks and other 26 * synchronization classes. 27 * 28 * <p>This class associates, with each thread that uses it, a permit 29 * (in the sense of the {@link java.util.concurrent.Semaphore 30 * Semaphore} class). A call to {@code park} will return immediately 31 * if the permit is available, consuming it in the process; otherwise 32 * it <em>may</em> block. A call to {@code unpark} makes the permit 33 * available, if it was not already available. (Unlike with Semaphores 34 * though, permits do not accumulate. There is at most one.) 35 * Reliable usage requires the use of volatile (or atomic) variables 36 * to control when to park or unpark. Orderings of calls to these 37 * methods are maintained with respect to volatile variable accesses, 38 * but not necessarily non-volatile variable accesses. 39 * 40 * <p>Methods {@code park} and {@code unpark} provide efficient 41 * means of blocking and unblocking threads that do not encounter the 42 * problems that cause the deprecated methods {@code Thread.suspend} 43 * and {@code Thread.resume} to be unusable for such purposes: Races 44 * between one thread invoking {@code park} and another thread trying 45 * to {@code unpark} it will preserve liveness, due to the 46 * permit. Additionally, {@code park} will return if the caller's 47 * thread was interrupted, and timeout versions are supported. The 48 * {@code park} method may also return at any other time, for "no 49 * reason", so in general must be invoked within a loop that rechecks 50 * conditions upon return. In this sense {@code park} serves as an 51 * optimization of a "busy wait" that does not waste as much time 52 * spinning, but must be paired with an {@code unpark} to be 53 * effective. 54 * 55 * <p>The three forms of {@code park} each also support a 56 * {@code blocker} object parameter. This object is recorded while 57 * the thread is blocked to permit monitoring and diagnostic tools to 58 * identify the reasons that threads are blocked. (Such tools may 59 * access blockers using method {@link #getBlocker(Thread)}.) 60 * The use of these forms rather than the original forms without this 61 * parameter is strongly encouraged. The normal argument to supply as 62 * a {@code blocker} within a lock implementation is {@code this}. 63 * 64 * <p>These methods are designed to be used as tools for creating 65 * higher-level synchronization utilities, and are not in themselves 66 * useful for most concurrency control applications. The {@code park} 67 * method is designed for use only in constructions of the form: 68 * 69 * <pre> {@code 70 * while (!canProceed()) { 71 * // ensure request to unpark is visible to other threads 72 * ... 73 * LockSupport.park(this); 74 * }}</pre> 75 * 76 * where no actions by the thread publishing a request to unpark, 77 * prior to the call to {@code park}, entail locking or blocking. 78 * Because only one permit is associated with each thread, any 79 * intermediary uses of {@code park}, including implicitly via class 80 * loading, could lead to an unresponsive thread (a "lost unpark"). 81 * 82 * <p><b>Sample Usage.</b> Here is a sketch of a first-in-first-out 83 * non-reentrant lock class: 84 * <pre> {@code 85 * class FIFOMutex { 86 * private final AtomicBoolean locked = new AtomicBoolean(false); 87 * private final Queue!(Thread) waiters 88 * = new ConcurrentLinkedQueue<>(); 89 * 90 * void lock() { 91 * boolean wasInterrupted = false; 92 * // publish current thread for unparkers 93 * waiters.add(Thread.currentThread()); 94 * 95 * // Block while not first in queue or cannot acquire lock 96 * while (waiters.peek() != Thread.currentThread() || 97 * !locked.compareAndSet(false, true)) { 98 * LockSupport.park(this); 99 * // ignore interrupts while waiting 100 * if (Thread.interrupted()) 101 * wasInterrupted = true; 102 * } 103 * 104 * waiters.remove(); 105 * // ensure correct interrupt status on return 106 * if (wasInterrupted) 107 * Thread.currentThread().interrupt(); 108 * } 109 * 110 * void unlock() { 111 * locked.set(false); 112 * LockSupport.unpark(waiters.peek()); 113 * } 114 * 115 * static { 116 * // Reduce the risk of "lost unpark" due to classloading 117 * Class<?> ensureLoaded = LockSupport.class; 118 * } 119 * }}</pre> 120 * 121 * @since 1.5 122 */ 123 class LockSupport { 124 private this() {} // Cannot be instantiated. 125 126 private static void setBlocker(ThreadEx t, Object arg) { 127 // Even though volatile, hotspot doesn't need a write barrier here. 128 t.parkBlocker = arg; 129 // implementationMissing(false); 130 } 131 132 /** 133 * Makes available the permit for the given thread, if it 134 * was not already available. If the thread was blocked on 135 * {@code park} then it will unblock. Otherwise, its next call 136 * to {@code park} is guaranteed not to block. This operation 137 * is not guaranteed to have any effect at all if the given 138 * thread has not been started. 139 * 140 * @param thread the thread to unpark, or {@code null}, in which case 141 * this operation has no effect 142 */ 143 static void unpark(Thread thread) { 144 ThreadEx tx = cast(ThreadEx)thread; 145 if (tx !is null) 146 tx.parker().unpark(); 147 } 148 149 150 /** 151 * Disables the current thread for thread scheduling purposes unless the 152 * permit is available. 153 * 154 * <p>If the permit is available then it is consumed and the call 155 * returns immediately; otherwise the current thread becomes disabled 156 * for thread scheduling purposes and lies dormant until one of three 157 * things happens: 158 * 159 * <ul> 160 * 161 * <li>Some other thread invokes {@link #unpark unpark} with the 162 * current thread as the target; or 163 * 164 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 165 * the current thread; or 166 * 167 * <li>The call spuriously (that is, for no reason) returns. 168 * </ul> 169 * 170 * <p>This method does <em>not</em> report which of these caused the 171 * method to return. Callers should re-check the conditions which caused 172 * the thread to park in the first place. Callers may also determine, 173 * for example, the interrupt status of the thread upon return. 174 */ 175 static void park() { 176 ThreadEx tx = cast(ThreadEx)Thread.getThis(); 177 if (tx !is null) { 178 tx.parker().park(Duration.zero); 179 } else { 180 warning("The current thread is not ThreadEx!"); 181 // TODO: Tasks pending completion -@zxp at 11/7/2018, 10:08:21 AM 182 // upgrade Thread to ThreadEx 183 Thread.sleep(Duration.zero); 184 } 185 } 186 187 static void park(Duration time) { 188 ThreadEx tx = cast(ThreadEx)Thread.getThis(); 189 if (!time.isNegative && tx !is null) { 190 tx.parker().park(time); 191 } else { 192 warning("The current thread is not ThreadEx!"); 193 Thread.sleep(time); 194 } 195 } 196 197 /** 198 * Disables the current thread for thread scheduling purposes unless the 199 * permit is available. 200 * 201 * <p>If the permit is available then it is consumed and the call returns 202 * immediately; otherwise 203 * the current thread becomes disabled for thread scheduling 204 * purposes and lies dormant until one of three things happens: 205 * 206 * <ul> 207 * <li>Some other thread invokes {@link #unpark unpark} with the 208 * current thread as the target; or 209 * 210 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 211 * the current thread; or 212 * 213 * <li>The call spuriously (that is, for no reason) returns. 214 * </ul> 215 * 216 * <p>This method does <em>not</em> report which of these caused the 217 * method to return. Callers should re-check the conditions which caused 218 * the thread to park in the first place. Callers may also determine, 219 * for example, the interrupt status of the thread upon return. 220 * 221 * @param blocker the synchronization object responsible for this 222 * thread parking 223 * @since 1.6 224 */ 225 static void park(Object blocker) { 226 park(blocker, Duration.zero); 227 } 228 229 static void park(Object blocker, Duration time) { 230 ThreadEx tx = cast(ThreadEx)Thread.getThis(); 231 if (time >= Duration.zero && tx !is null) { 232 setBlocker(tx, blocker); 233 tx.parker().park(time); 234 setBlocker(tx, null); 235 } else { 236 warning("The current thread is not ThreadEx!"); 237 } 238 } 239 240 /** 241 * Disables the current thread for thread scheduling purposes, for up to 242 * the specified waiting time, unless the permit is available. 243 * 244 * <p>If the permit is available then it is consumed and the call 245 * returns immediately; otherwise the current thread becomes disabled 246 * for thread scheduling purposes and lies dormant until one of four 247 * things happens: 248 * 249 * <ul> 250 * <li>Some other thread invokes {@link #unpark unpark} with the 251 * current thread as the target; or 252 * 253 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 254 * the current thread; or 255 * 256 * <li>The specified waiting time elapses; or 257 * 258 * <li>The call spuriously (that is, for no reason) returns. 259 * </ul> 260 * 261 * <p>This method does <em>not</em> report which of these caused the 262 * method to return. Callers should re-check the conditions which caused 263 * the thread to park in the first place. Callers may also determine, 264 * for example, the interrupt status of the thread, or the elapsed time 265 * upon return. 266 * 267 * @param blocker the synchronization object responsible for this 268 * thread parking 269 * @param nanos the maximum number of nanoseconds to wait 270 * @since 1.6 271 */ 272 static void parkNanos(Object blocker, long nanos) { 273 if(nanos > 0) { 274 park(blocker, dur!(TimeUnit.Nanosecond)(nanos)); 275 } 276 } 277 278 /** 279 * Disables the current thread for thread scheduling purposes, until 280 * the specified deadline, unless the permit is available. 281 * 282 * <p>If the permit is available then it is consumed and the call 283 * returns immediately; otherwise the current thread becomes disabled 284 * for thread scheduling purposes and lies dormant until one of four 285 * things happens: 286 * 287 * <ul> 288 * <li>Some other thread invokes {@link #unpark unpark} with the 289 * current thread as the target; or 290 * 291 * <li>Some other thread {@linkplain Thread#interrupt interrupts} the 292 * current thread; or 293 * 294 * <li>The specified deadline passes; or 295 * 296 * <li>The call spuriously (that is, for no reason) returns. 297 * </ul> 298 * 299 * <p>This method does <em>not</em> report which of these caused the 300 * method to return. Callers should re-check the conditions which caused 301 * the thread to park in the first place. Callers may also determine, 302 * for example, the interrupt status of the thread, or the current time 303 * upon return. 304 * 305 * @param blocker the synchronization object responsible for this 306 * thread parking 307 * @param deadline the absolute time, in milliseconds from the Epoch, 308 * to wait until 309 * @since 1.6 310 */ 311 static void parkUntil(Object blocker, long deadline) { 312 ThreadEx t = ThreadEx.currentThread(); 313 setBlocker(t, blocker); 314 // U.park(true, deadline); 315 implementationMissing(false); 316 setBlocker(t, null); 317 } 318 319 /** 320 * Disables the current thread for thread scheduling purposes, for up to 321 * the specified waiting time, unless the permit is available. 322 * 323 * <p>If the permit is available then it is consumed and the call 324 * returns immediately; otherwise the current thread becomes disabled 325 * for thread scheduling purposes and lies dormant until one of four 326 * things happens: 327 * 328 * <ul> 329 * <li>Some other thread invokes {@link #unpark unpark} with the 330 * current thread as the target; or 331 * 332 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 333 * the current thread; or 334 * 335 * <li>The specified waiting time elapses; or 336 * 337 * <li>The call spuriously (that is, for no reason) returns. 338 * </ul> 339 * 340 * <p>This method does <em>not</em> report which of these caused the 341 * method to return. Callers should re-check the conditions which caused 342 * the thread to park in the first place. Callers may also determine, 343 * for example, the interrupt status of the thread, or the elapsed time 344 * upon return. 345 * 346 * @param nanos the maximum number of nanoseconds to wait 347 */ 348 static void parkNanos(long nanos) { 349 ThreadEx tx = cast(ThreadEx)Thread.getThis(); 350 if (nanos > 0 && tx !is null) { 351 tx.parker().park(dur!(TimeUnit.Nanosecond)(nanos)); 352 } 353 } 354 355 /** 356 * Disables the current thread for thread scheduling purposes, until 357 * the specified deadline, unless the permit is available. 358 * 359 * <p>If the permit is available then it is consumed and the call 360 * returns immediately; otherwise the current thread becomes disabled 361 * for thread scheduling purposes and lies dormant until one of four 362 * things happens: 363 * 364 * <ul> 365 * <li>Some other thread invokes {@link #unpark unpark} with the 366 * current thread as the target; or 367 * 368 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 369 * the current thread; or 370 * 371 * <li>The specified deadline passes; or 372 * 373 * <li>The call spuriously (that is, for no reason) returns. 374 * </ul> 375 * 376 * <p>This method does <em>not</em> report which of these caused the 377 * method to return. Callers should re-check the conditions which caused 378 * the thread to park in the first place. Callers may also determine, 379 * for example, the interrupt status of the thread, or the current time 380 * upon return. 381 * 382 * @param deadline the absolute time, in milliseconds from the Epoch, 383 * to wait until 384 */ 385 static void parkUntil(long deadline) { 386 // U.park(true, deadline); 387 implementationMissing(false); 388 } 389 390 /** 391 * Returns the blocker object supplied to the most recent 392 * invocation of a park method that has not yet unblocked, or null 393 * if not blocked. The value returned is just a momentary 394 * snapshot -- the thread may have since unblocked or blocked on a 395 * different blocker object. 396 * 397 * @param t the thread 398 * @return the blocker 399 * @throws NullPointerException if argument is null 400 * @since 1.6 401 */ 402 static Object getBlocker(Thread t) { 403 ThreadEx tx = cast(ThreadEx)t; 404 if (tx is null) 405 throw new NullPointerException(); 406 return tx.parkBlocker; 407 408 } 409 410 /** 411 * Returns the pseudo-randomly initialized or updated secondary seed. 412 * Copied from ThreadLocalRandom due to package access restrictions. 413 */ 414 // static final int nextSecondarySeed() { 415 // int r; 416 // Thread t = Thread.currentThread(); 417 // if ((r = U.getInt(t, SECONDARY)) != 0) { 418 // r ^= r << 13; // xorshift 419 // r ^= r >>> 17; 420 // r ^= r << 5; 421 // } 422 // else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) 423 // r = 1; // avoid zero 424 // U.putInt(t, SECONDARY, r); 425 // return r; 426 // } 427 428 /** 429 * Returns the thread id for the given thread. We must access 430 * this directly rather than via method Thread.getId() because 431 * getId() has been known to be overridden in ways that do not 432 * preserve unique mappings. 433 */ 434 // static final long getThreadId(Thread thread) { 435 // return U.getLong(thread, TID); 436 // } 437 438 }