| AbstractQueuedSynchronizer.java |
1 /*
2 * %W% %E%
3 *
4 * Copyright (c) 2006,2010 Oracle and/or its affiliates. All rights reserved.
5 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6 */
7
8 package java.util.concurrent.locks;
9 import java.util.*;
10 import java.util.concurrent.*;
11 import java.util.concurrent.atomic.*;
12 import sun.misc.Unsafe;
13
14 /**
15 * Provides a framework for implementing blocking locks and related
16 * synchronizers (semaphores, events, etc) that rely on
17 * first-in-first-out (FIFO) wait queues. This class is designed to
18 * be a useful basis for most kinds of synchronizers that rely on a
19 * single atomic <tt>int</tt> value to represent state. Subclasses
20 * must define the protected methods that change this state, and which
21 * define what that state means in terms of this object being acquired
22 * or released. Given these, the other methods in this class carry
23 * out all queuing and blocking mechanics. Subclasses can maintain
24 * other state fields, but only the atomically updated <tt>int</tt>
25 * value manipulated using methods {@link #getState}, {@link
26 * #setState} and {@link #compareAndSetState} is tracked with respect
27 * to synchronization.
28 *
29 * <p>Subclasses should be defined as non-public internal helper
30 * classes that are used to implement the synchronization properties
31 * of their enclosing class. Class
32 * <tt>AbstractQueuedSynchronizer</tt> does not implement any
33 * synchronization interface. Instead it defines methods such as
34 * {@link #acquireInterruptibly} that can be invoked as
35 * appropriate by concrete locks and related synchronizers to
36 * implement their public methods.
37 *
38 * <p>This class supports either or both a default <em>exclusive</em>
39 * mode and a <em>shared</em> mode. When acquired in exclusive mode,
40 * attempted acquires by other threads cannot succeed. Shared mode
41 * acquires by multiple threads may (but need not) succeed. This class
42 * does not "understand" these differences except in the
43 * mechanical sense that when a shared mode acquire succeeds, the next
44 * waiting thread (if one exists) must also determine whether it can
45 * acquire as well. Threads waiting in the different modes share the
46 * same FIFO queue. Usually, implementation subclasses support only
47 * one of these modes, but both can come into play for example in a
48 * {@link ReadWriteLock}. Subclasses that support only exclusive or
49 * only shared modes need not define the methods supporting the unused mode.
50 *
51 * <p>This class defines a nested {@link ConditionObject} class that
52 * can be used as a {@link Condition} implementation by subclasses
53 * supporting exclusive mode for which method {@link
54 * #isHeldExclusively} reports whether synchronization is exclusively
55 * held with respect to the current thread, method {@link #release}
56 * invoked with the current {@link #getState} value fully releases
57 * this object, and {@link #acquire}, given this saved state value,
58 * eventually restores this object to its previous acquired state. No
59 * <tt>AbstractQueuedSynchronizer</tt> method otherwise creates such a
60 * condition, so if this constraint cannot be met, do not use it. The
61 * behavior of {@link ConditionObject} depends of course on the
62 * semantics of its synchronizer implementation.
63 *
64 * <p>This class provides inspection, instrumentation, and monitoring
65 * methods for the internal queue, as well as similar methods for
66 * condition objects. These can be exported as desired into classes
67 * using an <tt>AbstractQueuedSynchronizer</tt> for their
68 * synchronization mechanics.
69 *
70 * <p>Serialization of this class stores only the underlying atomic
71 * integer maintaining state, so deserialized objects have empty
72 * thread queues. Typical subclasses requiring serializability will
73 * define a <tt>readObject</tt> method that restores this to a known
74 * initial state upon deserialization.
75 *
76 * <h3>Usage</h3>
77 *
78 * <p>To use this class as the basis of a synchronizer, redefine the
79 * following methods, as applicable, by inspecting and/or modifying
80 * the synchronization state using {@link #getState}, {@link
81 * #setState} and/or {@link #compareAndSetState}:
82 *
83 * <ul>
84 * <li> {@link #tryAcquire}
85 * <li> {@link #tryRelease}
86 * <li> {@link #tryAcquireShared}
87 * <li> {@link #tryReleaseShared}
88 * <li> {@link #isHeldExclusively}
89 *</ul>
90 *
91 * Each of these methods by default throws {@link
92 * UnsupportedOperationException}. Implementations of these methods
93 * must be internally thread-safe, and should in general be short and
94 * not block. Defining these methods is the <em>only</em> supported
95 * means of using this class. All other methods are declared
96 * <tt>final</tt> because they cannot be independently varied.
97 *
98 * <p>You may also find the inherited methods from {@link
99 * AbstractOwnableSynchronizer} useful to keep track of the thread
100 * owning an exclusive synchronizer. You are encouraged to use them
101 * -- this enables monitoring and diagnostic tools to assist users in
102 * determining which threads hold locks.
103 *
104 * <p>Even though this class is based on an internal FIFO queue, it
105 * does not automatically enforce FIFO acquisition policies. The core
106 * of exclusive synchronization takes the form:
107 *
108 * <pre>
109 * Acquire:
110 * while (!tryAcquire(arg)) {
111 * <em>enqueue thread if it is not already queued</em>;
112 * <em>possibly block current thread</em>;
113 * }
114 *
115 * Release:
116 * if (tryRelease(arg))
117 * <em>unblock the first queued thread</em>;
118 * </pre>
119 *
120 * (Shared mode is similar but may involve cascading signals.)
121 *
122 * <p>Because checks in acquire are invoked before enqueuing, a newly
123 * acquiring thread may <em>barge</em> ahead of others that are
124 * blocked and queued. However, you can, if desired, define
125 * <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to disable
126 * barging by internally invoking one or more of the inspection
127 * methods. In particular, a strict FIFO lock can define
128 * <tt>tryAcquire</tt> to immediately return <tt>false</tt> if {@link
129 * #getFirstQueuedThread} does not return the current thread. A
130 * normally preferable non-strict fair version can immediately return
131 * <tt>false</tt> only if {@link #hasQueuedThreads} returns
132 * <tt>true</tt> and <tt>getFirstQueuedThread</tt> is not the current
133 * thread; or equivalently, that <tt>getFirstQueuedThread</tt> is both
134 * non-null and not the current thread. Further variations are
135 * possible.
136 *
137 * <p>Throughput and scalability are generally highest for the
138 * default barging (also known as <em>greedy</em>,
139 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
140 * While this is not guaranteed to be fair or starvation-free, earlier
141 * queued threads are allowed to recontend before later queued
142 * threads, and each recontention has an unbiased chance to succeed
143 * against incoming threads. Also, while acquires do not
144 * "spin" in the usual sense, they may perform multiple
145 * invocations of <tt>tryAcquire</tt> interspersed with other
146 * computations before blocking. This gives most of the benefits of
147 * spins when exclusive synchronization is only briefly held, without
148 * most of the liabilities when it isn't. If so desired, you can
149 * augment this by preceding calls to acquire methods with
150 * "fast-path" checks, possibly prechecking {@link #hasContended}
151 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer
152 * is likely not to be contended.
153 *
154 * <p>This class provides an efficient and scalable basis for
155 * synchronization in part by specializing its range of use to
156 * synchronizers that can rely on <tt>int</tt> state, acquire, and
157 * release parameters, and an internal FIFO wait queue. When this does
158 * not suffice, you can build synchronizers from a lower level using
159 * {@link java.util.concurrent.atomic atomic} classes, your own custom
160 * {@link java.util.Queue} classes, and {@link LockSupport} blocking
161 * support.
162 *
163 * <h3>Usage Examples</h3>
164 *
165 * <p>Here is a non-reentrant mutual exclusion lock class that uses
166 * the value zero to represent the unlocked state, and one to
167 * represent the locked state. While a non-reentrant lock
168 * does not strictly require recording of the current owner
169 * thread, this class does so anyway to make usage easier to monitor.
170 * It also supports conditions and exposes
171 * one of the instrumentation methods:
172 *
173 * <pre>
174 * class Mutex implements Lock, java.io.Serializable {
175 *
176 * // Our internal helper class
177 * private static class Sync extends AbstractQueuedSynchronizer {
178 * // Report whether in locked state
179 * protected boolean isHeldExclusively() {
180 * return getState() == 1;
181 * }
182 *
183 * // Acquire the lock if state is zero
184 * public boolean tryAcquire(int acquires) {
185 * assert acquires == 1; // Otherwise unused
186 * if (compareAndSetState(0, 1)) {
187 * setExclusiveOwnerThread(Thread.currentThread());
188 * return true;
189 * }
190 * return false;
191 * }
192 *
193 * // Release the lock by setting state to zero
194 * protected boolean tryRelease(int releases) {
195 * assert releases == 1; // Otherwise unused
196 * if (getState() == 0) throw new IllegalMonitorStateException();
197 * setExclusiveOwnerThread(null);
198 * setState(0);
199 * return true;
200 * }
201 *
202 * // Provide a Condition
203 * Condition newCondition() { return new ConditionObject(); }
204 *
205 * // Deserialize properly
206 * private void readObject(ObjectInputStream s)
207 * throws IOException, ClassNotFoundException {
208 * s.defaultReadObject();
209 * setState(0); // reset to unlocked state
210 * }
211 * }
212 *
213 * // The sync object does all the hard work. We just forward to it.
214 * private final Sync sync = new Sync();
215 *
216 * public void lock() { sync.acquire(1); }
217 * public boolean tryLock() { return sync.tryAcquire(1); }
218 * public void unlock() { sync.release(1); }
219 * public Condition newCondition() { return sync.newCondition(); }
220 * public boolean isLocked() { return sync.isHeldExclusively(); }
221 * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
222 * public void lockInterruptibly() throws InterruptedException {
223 * sync.acquireInterruptibly(1);
224 * }
225 * public boolean tryLock(long timeout, TimeUnit unit)
226 * throws InterruptedException {
227 * return sync.tryAcquireNanos(1, unit.toNanos(timeout));
228 * }
229 * }
230 * </pre>
231 *
232 * <p>Here is a latch class that is like a {@link CountDownLatch}
233 * except that it only requires a single <tt>signal</tt> to
234 * fire. Because a latch is non-exclusive, it uses the <tt>shared</tt>
235 * acquire and release methods.
236 *
237 * <pre>
238 * class BooleanLatch {
239 *
240 * private static class Sync extends AbstractQueuedSynchronizer {
241 * boolean isSignalled() { return getState() != 0; }
242 *
243 * protected int tryAcquireShared(int ignore) {
244 * return isSignalled()? 1 : -1;
245 * }
246 *
247 * protected boolean tryReleaseShared(int ignore) {
248 * setState(1);
249 * return true;
250 * }
251 * }
252 *
253 * private final Sync sync = new Sync();
254 * public boolean isSignalled() { return sync.isSignalled(); }
255 * public void signal() { sync.releaseShared(1); }
256 * public void await() throws InterruptedException {
257 * sync.acquireSharedInterruptibly(1);
258 * }
259 * }
260 * </pre>
261 *
262 * @since 1.5
263 * @author Doug Lea
264 */
265 public abstract class AbstractQueuedSynchronizer
266 extends AbstractOwnableSynchronizer
267 implements java.io.Serializable {
268
269 private static final long serialVersionUID = 7373984972572414691L;
270
271 /**
272 * Creates a new <tt>AbstractQueuedSynchronizer</tt> instance
273 * with initial synchronization state of zero.
274 */
275 protected AbstractQueuedSynchronizer() { }
276
277 /**
278 * Wait queue node class.
279 *
280 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
281 * Hagersten) lock queue. CLH locks are normally used for
282 * spinlocks. We instead use them for blocking synchronizers, but
283 * use the same basic tactic of holding some of the control
284 * information about a thread in the predecessor of its node. A
285 * "status" field in each node keeps track of whether a thread
286 * should block. A node is signalled when its predecessor
287 * releases. Each node of the queue otherwise serves as a
288 * specific-notification-style monitor holding a single waiting
289 * thread. The status field does NOT control whether threads are
290 * granted locks etc though. A thread may try to acquire if it is
291 * first in the queue. But being first does not guarantee success;
292 * it only gives the right to contend. So the currently released
293 * contender thread may need to rewait.
294 *
295 * <p>To enqueue into a CLH lock, you atomically splice it in as new
296 * tail. To dequeue, you just set the head field.
297 * <pre>
298 * +------+ prev +-----+ +-----+
299 * head | | <---- | | <---- | | tail
300 * +------+ +-----+ +-----+
301 * </pre>
302 *
303 * <p>Insertion into a CLH queue requires only a single atomic
304 * operation on "tail", so there is a simple atomic point of
305 * demarcation from unqueued to queued. Similarly, dequeing
306 * involves only updating the "head". However, it takes a bit
307 * more work for nodes to determine who their successors are,
308 * in part to deal with possible cancellation due to timeouts
309 * and interrupts.
310 *
311 * <p>The "prev" links (not used in original CLH locks), are mainly
312 * needed to handle cancellation. If a node is cancelled, its
313 * successor is (normally) relinked to a non-cancelled
314 * predecessor. For explanation of similar mechanics in the case
315 * of spin locks, see the papers by Scott and Scherer at
316 * http://www.cs.rochester.edu/u/scott/synchronization/
317 *
318 * <p>We also use "next" links to implement blocking mechanics.
319 * The thread id for each node is kept in its own node, so a
320 * predecessor signals the next node to wake up by traversing
321 * next link to determine which thread it is. Determination of
322 * successor must avoid races with newly queued nodes to set
323 * the "next" fields of their predecessors. This is solved
324 * when necessary by checking backwards from the atomically
325 * updated "tail" when a node's successor appears to be null.
326 * (Or, said differently, the next-links are an optimization
327 * so that we don't usually need a backward scan.)
328 *
329 * <p>Cancellation introduces some conservatism to the basic
330 * algorithms. Since we must poll for cancellation of other
331 * nodes, we can miss noticing whether a cancelled node is
332 * ahead or behind us. This is dealt with by always unparking
333 * successors upon cancellation, allowing them to stabilize on
334 * a new predecessor.
335 *
336 * <p>CLH queues need a dummy header node to get started. But
337 * we don't create them on construction, because it would be wasted
338 * effort if there is never contention. Instead, the node
339 * is constructed and head and tail pointers are set upon first
340 * contention.
341 *
342 * <p>Threads waiting on Conditions use the same nodes, but
343 * use an additional link. Conditions only need to link nodes
344 * in simple (non-concurrent) linked queues because they are
345 * only accessed when exclusively held. Upon await, a node is
346 * inserted into a condition queue. Upon signal, the node is
347 * transferred to the main queue. A special value of status
348 * field is used to mark which queue a node is on.
349 *
350 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
351 * Scherer and Michael Scott, along with members of JSR-166
352 * expert group, for helpful ideas, discussions, and critiques
353 * on the design of this class.
354 */
355 static final class Node {
356 /** waitStatus value to indicate thread has cancelled */
357 static final int CANCELLED = 1;
358 /** waitStatus value to indicate successor's thread needs unparking */
359 static final int SIGNAL = -1;
360 /** waitStatus value to indicate thread is waiting on condition */
361 static final int CONDITION = -2;
362 /**
363 * waitStatus value to indicate the next acquireShared should
364 * unconditionally propagate
365 */
366 static final int PROPAGATE = -3;
367 /** Marker to indicate a node is waiting in shared mode */
368 static final Node SHARED = new Node();
369 /** Marker to indicate a node is waiting in exclusive mode */
370 static final Node EXCLUSIVE = null;
371
372 /**
373 * Status field, taking on only the values:
374 * SIGNAL: The successor of this node is (or will soon be)
375 * blocked (via park), so the current node must
376 * unpark its successor when it releases or
377 * cancels. To avoid races, acquire methods must
378 * first indicate they need a signal,
379 * then retry the atomic acquire, and then,
380 * on failure, block.
381 * CANCELLED: This node is cancelled due to timeout or interrupt.
382 * Nodes never leave this state. In particular,
383 * a thread with cancelled node never again blocks.
384 * CONDITION: This node is currently on a condition queue.
385 * It will not be used as a sync queue node
386 * until transferred, at which time the status
387 * will be set to 0. (Use of this value here has
388 * nothing to do with the other uses of the
389 * field, but simplifies mechanics.)
390 * PROPAGATE: A releaseShared should be propagated to other
391 * nodes. This is set (for head node only) in
392 * doReleaseShared to ensure propagation
393 * continues, even if other operations have
394 * since intervened.
395 * 0: None of the above
396 *
397 * The values are arranged numerically to simplify use.
398 * Non-negative values mean that a node doesn't need to
399 * signal. So, most code doesn't need to check for particular
400 * values, just for sign.
401 *
402 * The field is initialized to 0 for normal sync nodes, and
403 * CONDITION for condition nodes. It is modified only using
404 * CAS.
405 */
406 volatile int waitStatus;
407
408 /**
409 * Link to predecessor node that current node/thread relies on
410 * for checking waitStatus. Assigned during enqueing, and nulled
411 * out (for sake of GC) only upon dequeuing. Also, upon
412 * cancellation of a predecessor, we short-circuit while
413 * finding a non-cancelled one, which will always exist
414 * because the head node is never cancelled: A node becomes
415 * head only as a result of successful acquire. A
416 * cancelled thread never succeeds in acquiring, and a thread only
417 * cancels itself, not any other node.
418 */
419 volatile Node prev;
420
421 /**
422 * Link to the successor node that the current node/thread
423 * unparks upon release. Assigned once during enqueuing, and
424 * nulled out (for sake of GC) when no longer needed. Upon
425 * cancellation, we cannot adjust this field, but can notice
426 * status and bypass the node if cancelled. The enq operation
427 * does not assign next field of a predecessor until after
428 * attachment, so seeing a null next field does not
429 * necessarily mean that node is at end of queue. However, if
430 * a next field appears to be null, we can scan prev's from
431 * the tail to double-check.
432 */
433 volatile Node next;
434
435 /**
436 * The thread that enqueued this node. Initialized on
437 * construction and nulled out after use.
438 */
439 volatile Thread thread;
440
441 /**
442 * Link to next node waiting on condition, or the special
443 * value SHARED. Because condition queues are accessed only
444 * when holding in exclusive mode, we just need a simple
445 * linked queue to hold nodes while they are waiting on
446 * conditions. They are then transferred to the queue to
447 * re-acquire. And because conditions can only be exclusive,
448 * we save a field by using special value to indicate shared
449 * mode.
450 */
451 Node nextWaiter;
452
453 /**
454 * Returns true if node is waiting in shared mode
455 */
456 final boolean isShared() {
457 return nextWaiter == SHARED;
458 }
459
460 /**
461 * Returns previous node, or throws NullPointerException if
462 * null. Use when predecessor cannot be null.
463 * @return the predecessor of this node
464 */
465 final Node predecessor() throws NullPointerException {
466 Node p = prev;
467 if (p == null)
468 throw new NullPointerException();
469 else
470 return p;
471 }
472
473 Node() { // Used to establish initial head or SHARED marker
474 }
475
476 Node(Thread thread, Node mode) { // Used by addWaiter
477 this.nextWaiter = mode;
478 this.thread = thread;
479 }
480
481 Node(Thread thread, int waitStatus) { // Used by Condition
482 this.waitStatus = waitStatus;
483 this.thread = thread;
484 }
485 }
486
487 /**
488 * Head of the wait queue, lazily initialized. Except for
489 * initialization, it is modified only via method setHead. Note:
490 * If head exists, its waitStatus is guaranteed not to be
491 * CANCELLED.
492 */
493 private transient volatile Node head;
494
495 /**
496 * Tail of the wait queue, lazily initialized. Modified only via
497 * method enq to add new wait node.
498 */
499 private transient volatile Node tail;
500
501 /**
502 * The synchronization state.
503 */
504 private volatile int state;
505
506 /**
507 * Returns the current value of synchronization state.
508 * This operation has memory semantics of a <tt>volatile</tt> read.
509 * @return current state value
510 */
511 protected final int getState() {
512 return state;
513 }
514
515 /**
516 * Sets the value of synchronization state.
517 * This operation has memory semantics of a <tt>volatile</tt> write.
518 * @param newState the new state value
519 */
520 protected final void setState(int newState) {
521 state = newState;
522 }
523
524 /**
525 * Atomically sets synchronization state to the given updated
526 * value if the current state value equals the expected value.
527 * This operation has memory semantics of a <tt>volatile</tt> read
528 * and write.
529 *
530 * @param expect the expected value
531 * @param update the new value
532 * @return true if successful. False return indicates that the actual
533 * value was not equal to the expected value.
534 */
535 protected final boolean compareAndSetState(int expect, int update) {
536 // See below for intrinsics setup to support this
537 return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
538 }
539
540 // Queuing utilities
541
542 /**
543 * The number of nanoseconds for which it is faster to spin
544 * rather than to use timed park. A rough estimate suffices
545 * to improve responsiveness with very short timeouts.
546 */
547 static final long spinForTimeoutThreshold = 1000L;
548
549 /**
550 * Inserts node into queue, initializing if necessary. See picture above.
551 * @param node the node to insert
552 * @return node's predecessor
553 */
554 private Node enq(final Node node) {
555 for (;;) {
556 Node t = tail;
557 if (t == null) { // Must initialize
558 Node h = new Node(); // Dummy header
559 h.next = node;
560 node.prev = h;
561 if (compareAndSetHead(h)) {
562 tail = node;
563 return h;
564 }
565 }
566 else {
567 node.prev = t;
568 if (compareAndSetTail(t, node)) {
569 t.next = node;
570 return t;
571 }
572 }
573 }
574 }
575
576 /**
577 * Creates and enqueues node for given thread and mode.
578 *
579 * @param current the thread
580 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
581 * @return the new node
582 */
583 private Node addWaiter(Node mode) {
584 Node node = new Node(Thread.currentThread(), mode);
585 // Try the fast path of enq; backup to full enq on failure
586 Node pred = tail;
587 if (pred != null) {
588 node.prev = pred;
589 if (compareAndSetTail(pred, node)) {
590 pred.next = node;
591 return node;
592 }
593 }
594 enq(node);
595 return node;
596 }
597
598 /**
599 * Sets head of queue to be node, thus dequeuing. Called only by
600 * acquire methods. Also nulls out unused fields for sake of GC
601 * and to suppress unnecessary signals and traversals.
602 *
603 * @param node the node
604 */
605 private void setHead(Node node) {
606 head = node;
607 node.thread = null;
608 node.prev = null;
609 }
610
611 /**
612 * Wakes up node's successor, if one exists.
613 *
614 * @param node the node
615 */
616 private void unparkSuccessor(Node node) {
617 /*
618 * If status is negative (i.e., possibly needing signal) try
619 * to clear in anticipation of signalling. It is OK if this
620 * fails or if status is changed by waiting thread.
621 */
622 int ws = node.waitStatus;
623 if (ws < 0)
624 compareAndSetWaitStatus(node, ws, 0);
625
626 /*
627 * Thread to unpark is held in successor, which is normally
628 * just the next node. But if cancelled or apparently null,
629 * traverse backwards from tail to find the actual
630 * non-cancelled successor.
631 */
632 Node s = node.next;
633 if (s == null || s.waitStatus > 0) {
634 s = null;
635 for (Node t = tail; t != null && t != node; t = t.prev)
636 if (t.waitStatus <= 0)
637 s = t;
638 }
639 if (s != null)
640 LockSupport.unpark(s.thread);
641 }
642
643 /**
644 * Release action for shared mode -- signal successor and ensure
645 * propagation. (Note: For exclusive mode, release just amounts
646 * to calling unparkSuccessor of head if it needs signal.)
647 */
648 private void doReleaseShared() {
649 /*
650 * Ensure that a release propagates, even if there are other
651 * in-progress acquires/releases. This proceeds in the usual
652 * way of trying to unparkSuccessor of head if it needs
653 * signal. But if it does not, status is set to PROPAGATE to
654 * ensure that upon release, propagation continues.
655 * Additionally, we must loop in case a new node is added
656 * while we are doing this. Also, unlike other uses of
657 * unparkSuccessor, we need to know if CAS to reset status
658 * fails, if so rechecking.
659 */
660 for (;;) {
661 Node h = head;
662 if (h != null && h != tail) {
663 int ws = h.waitStatus;
664 if (ws == Node.SIGNAL) {
665 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
666 continue; // loop to recheck cases
667 unparkSuccessor(h);
668 }
669 else if (ws == 0 &&
670 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
671 continue; // loop on failed CAS
672 }
673 if (h == head) // loop if head changed
674 break;
675 }
676 }
677
678 /**
679 * in shared mode, if so propagating if either propagate > 0 or
680 * PROPAGATE status was set.
681 *
682 * @param node the node
683 * @param propagate the return value from a tryAcquireShared
684 */
685 private void setHeadAndPropagate(Node node, int propagate) {
686 Node h = head; // Record old head for check below
687 setHead(node);
688 /*
689 * Try to signal next queued node if:
690 * Propagation was indicated by caller,
691 * or was recorded (as h.waitStatus) by a previous operation
692 * (note: this uses sign-check of waitStatus because
693 * PROPAGATE status may transition to SIGNAL.)
694 * and
695 * The next node is waiting in shared mode,
696 * or we don't know, because it appears null
697 *
698 * The conservatism in both of these checks may cause
699 * unnecessary wake-ups, but only when there are multiple
700 * racing acquires/releases, so most need signals now or soon
701 * anyway.
702 */
703 if (propagate > 0 || h == null || h.waitStatus < 0) {
704 Node s = node.next;
705 if (s == null || s.isShared())
706 doReleaseShared();
707 }
708 }
709
710 // Utilities for various versions of acquire
711
712 /**
713 * Cancels an ongoing attempt to acquire.
714 *
715 * @param node the node
716 */
717 private void cancelAcquire(Node node) {
718 // Ignore if node doesn't exist
719 if (node == null)
720 return;
721
722 node.thread = null;
723
724 // Skip cancelled predecessors
725 Node pred = node.prev;
726 while (pred.waitStatus > 0)
727 node.prev = pred = pred.prev;
728
729 // predNext is the apparent node to unsplice. CASes below will
730 // fail if not, in which case, we lost race vs another cancel
731 // or signal, so no further action is necessary.
732 Node predNext = pred.next;
733
734 // Can use unconditional write instead of CAS here.
735 // After this atomic step, other Nodes can skip past us.
736 // Before, we are free of interference from other threads.
737 node.waitStatus = Node.CANCELLED;
738
739 // If we are the tail, remove ourselves.
740 if (node == tail && compareAndSetTail(node, pred)) {
741 compareAndSetNext(pred, predNext, null);
742 } else {
743 // If successor needs signal, try to set pred's next-link
744 // so it will get one. Otherwise wake it up to propagate.
745 int ws;
746 if (pred != head &&
747 ((ws = pred.waitStatus) == Node.SIGNAL ||
748 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
749 pred.thread != null) {
750 Node next = node.next;
751 if (next != null && next.waitStatus <= 0)
752 compareAndSetNext(pred, predNext, next);
753 } else {
754 unparkSuccessor(node);
755 }
756
757 node.next = node; // help GC
758 }
759 }
760
761 /**
762 * Checks and updates status for a node that failed to acquire.
763 * Returns true if thread should block. This is the main signal
764 * control in all acquire loops. Requires that pred == node.prev
765 *
766 * @param pred node's predecessor holding status
767 * @param node the node
768 * @return {@code true} if thread should block
769 */
770 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
771 int ws = pred.waitStatus;
772 if (ws == Node.SIGNAL)
773 /*
774 * This node has already set status asking a release
775 * to signal it, so it can safely park
776 */
777 return true;
778 if (ws > 0) {
779 /*
780 * Predecessor was cancelled. Skip over predecessors and
781 * indicate retry.
782 */
783 do {
784 node.prev = pred = pred.prev;
785 } while (pred.waitStatus > 0);
786 pred.next = node;
787 } else {
788 /*
789 * waitStatus must be 0 or PROPAGATE. Indicate that we
790 * need a signal, but don't park yet. Caller will need to
791 * retry to make sure it cannot acquire before parking.
792 */
793 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
794 }
795 return false;
796 }
797
798 /**
799 * Convenience method to interrupt current thread.
800 */
801 private static void selfInterrupt() {
802 Thread.currentThread().interrupt();
803 }
804
805 /**
806 * Convenience method to park and then check if interrupted
807 *
808 * @return {@code true} if interrupted
809 */
810 private final boolean parkAndCheckInterrupt() {
811 LockSupport.park(this);
812 return Thread.interrupted();
813 }
814
815 /*
816 * Various flavors of acquire, varying in exclusive/shared and
817 * control modes. Each is mostly the same, but annoyingly
818 * different. Only a little bit of factoring is possible due to
819 * interactions of exception mechanics (including ensuring that we
820 * cancel if tryAcquire throws exception) and other control, at
821 * least not without hurting performance too much.
822 */
823
824 /**
825 * Acquires in exclusive uninterruptible mode for thread already in
826 * queue. Used by condition wait methods as well as acquire.
827 *
828 * @param node the node
829 * @param arg the acquire argument
830 * @return {@code true} if interrupted while waiting
831 */
832 final boolean acquireQueued(final Node node, int arg) {
833 try {
834 boolean interrupted = false;
835 for (;;) {
836 final Node p = node.predecessor();
837 if (p == head && tryAcquire(arg)) {
838 setHead(node);
839 p.next = null; // help GC
840 return interrupted;
841 }
842 if (shouldParkAfterFailedAcquire(p, node) &&
843 parkAndCheckInterrupt())
844 interrupted = true;
845 }
846 } catch (RuntimeException ex) {
847 cancelAcquire(node);
848 throw ex;
849 }
850 }
851
852 /**
853 * Acquires in exclusive interruptible mode.
854 * @param arg the acquire argument
855 */
856 private void doAcquireInterruptibly(int arg)
857 throws InterruptedException {
858 final Node node = addWaiter(Node.EXCLUSIVE);
859 try {
860 for (;;) {
861 final Node p = node.predecessor();
862 if (p == head && tryAcquire(arg)) {
863 setHead(node);
864 p.next = null; // help GC
865 return;
866 }
867 if (shouldParkAfterFailedAcquire(p, node) &&
868 parkAndCheckInterrupt())
869 break;
870 }
871 } catch (RuntimeException ex) {
872 cancelAcquire(node);
873 throw ex;
874 }
875 // Arrive here only if interrupted
876 cancelAcquire(node);
877 throw new InterruptedException();
878 }
879
880 /**
881 * Acquires in exclusive timed mode.
882 *
883 * @param arg the acquire argument
884 * @param nanosTimeout max wait time
885 * @return {@code true} if acquired
886 */
887 private boolean doAcquireNanos(int arg, long nanosTimeout)
888 throws InterruptedException {
889 long lastTime = System.nanoTime();
890 final Node node = addWaiter(Node.EXCLUSIVE);
891 try {
892 for (;;) {
893 final Node p = node.predecessor();
894 if (p == head && tryAcquire(arg)) {
895 setHead(node);
896 p.next = null; // help GC
897 return true;
898 }
899 if (nanosTimeout <= 0) {
900 cancelAcquire(node);
901 return false;
902 }
903 if (nanosTimeout > spinForTimeoutThreshold &&
904 shouldParkAfterFailedAcquire(p, node))
905 LockSupport.parkNanos(this, nanosTimeout);
906 long now = System.nanoTime();
907 nanosTimeout -= now - lastTime;
908 lastTime = now;
909 if (Thread.interrupted())
910 break;
911 }
912 } catch (RuntimeException ex) {
913 cancelAcquire(node);
914 throw ex;
915 }
916 // Arrive here only if interrupted
917 cancelAcquire(node);
918 throw new InterruptedException();
919 }
920
921 /**
922 * Acquires in shared uninterruptible mode.
923 * @param arg the acquire argument
924 */
925 private void doAcquireShared(int arg) {
926 final Node node = addWaiter(Node.SHARED);
927 try {
928 boolean interrupted = false;
929 for (;;) {
930 final Node p = node.predecessor();
931 if (p == head) {
932 int r = tryAcquireShared(arg);
933 if (r >= 0) {
934 setHeadAndPropagate(node, r);
935 p.next = null; // help GC
936 if (interrupted)
937 selfInterrupt();
938 return;
939 }
940 }
941 if (shouldParkAfterFailedAcquire(p, node) &&
942 parkAndCheckInterrupt())
943 interrupted = true;
944 }
945 } catch (RuntimeException ex) {
946 cancelAcquire(node);
947 throw ex;
948 }
949 }
950
951 /**
952 * Acquires in shared interruptible mode.
953 * @param arg the acquire argument
954 */
955 private void doAcquireSharedInterruptibly(int arg)
956 throws InterruptedException {
957 final Node node = addWaiter(Node.SHARED);
958 try {
959 for (;;) {
960 final Node p = node.predecessor();
961 if (p == head) {
962 int r = tryAcquireShared(arg);
963 if (r >= 0) {
964 setHeadAndPropagate(node, r);
965 p.next = null; // help GC
966 return;
967 }
968 }
969 if (shouldParkAfterFailedAcquire(p, node) &&
970 parkAndCheckInterrupt())
971 break;
972 }
973 } catch (RuntimeException ex) {
974 cancelAcquire(node);
975 throw ex;
976 }
977 // Arrive here only if interrupted
978 cancelAcquire(node);
979 throw new InterruptedException();
980 }
981
982 /**
983 * Acquires in shared timed mode.
984 *
985 * @param arg the acquire argument
986 * @param nanosTimeout max wait time
987 * @return {@code true} if acquired
988 */
989 private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
990 throws InterruptedException {
991
992 long lastTime = System.nanoTime();
993 final Node node = addWaiter(Node.SHARED);
994 try {
995 for (;;) {
996 final Node p = node.predecessor();
997 if (p == head) {
998 int r = tryAcquireShared(arg);
999 if (r >= 0) {
1000 setHeadAndPropagate(node, r);
1001 p.next = null; // help GC
1002 return true;
1003 }
1004 }
1005 if (nanosTimeout <= 0) {
1006 cancelAcquire(node);
1007 return false;
1008 }
1009 if (nanosTimeout > spinForTimeoutThreshold &&
1010 shouldParkAfterFailedAcquire(p, node))
1011 LockSupport.parkNanos(this, nanosTimeout);
1012 long now = System.nanoTime();
1013 nanosTimeout -= now - lastTime;
1014 lastTime = now;
1015 if (Thread.interrupted())
1016 break;
1017 }
1018 } catch (RuntimeException ex) {
1019 cancelAcquire(node);
1020 throw ex;
1021 }
1022 // Arrive here only if interrupted
1023 cancelAcquire(node);
1024 throw new InterruptedException();
1025 }
1026
1027 // Main exported methods
1028
1029 /**
1030 * Attempts to acquire in exclusive mode. This method should query
1031 * if the state of the object permits it to be acquired in the
1032 * exclusive mode, and if so to acquire it.
1033 *
1034 * <p>This method is always invoked by the thread performing
1035 * acquire. If this method reports failure, the acquire method
1036 * may queue the thread, if it is not already queued, until it is
1037 * signalled by a release from some other thread. This can be used
1038 * to implement method {@link Lock#tryLock()}.
1039 *
1040 * <p>The default
1041 * implementation throws {@link UnsupportedOperationException}.
1042 *
1043 * @param arg the acquire argument. This value is always the one
1044 * passed to an acquire method, or is the value saved on entry
1045 * to a condition wait. The value is otherwise uninterpreted
1046 * and can represent anything you like.
1047 * @return {@code true} if successful. Upon success, this object has
1048 * been acquired.
1049 * @throws IllegalMonitorStateException if acquiring would place this
1050 * synchronizer in an illegal state. This exception must be
1051 * thrown in a consistent fashion for synchronization to work
1052 * correctly.
1053 * @throws UnsupportedOperationException if exclusive mode is not supported
1054 */
1055 protected boolean tryAcquire(int arg) {
1056 throw new UnsupportedOperationException();
1057 }
1058
1059 /**
1060 * Attempts to set the state to reflect a release in exclusive
1061 * mode.
1062 *
1063 * <p>This method is always invoked by the thread performing release.
1064 *
1065 * <p>The default implementation throws
1066 * {@link UnsupportedOperationException}.
1067 *
1068 * @param arg the release argument. This value is always the one
1069 * passed to a release method, or the current state value upon
1070 * entry to a condition wait. The value is otherwise
1071 * uninterpreted and can represent anything you like.
1072 * @return {@code true} if this object is now in a fully released
1073 * state, so that any waiting threads may attempt to acquire;
1074 * and {@code false} otherwise.
1075 * @throws IllegalMonitorStateException if releasing would place this
1076 * synchronizer in an illegal state. This exception must be
1077 * thrown in a consistent fashion for synchronization to work
1078 * correctly.
1079 * @throws UnsupportedOperationException if exclusive mode is not supported
1080 */
1081 protected boolean tryRelease(int arg) {
1082 throw new UnsupportedOperationException();
1083 }
1084
1085 /**
1086 * Attempts to acquire in shared mode. This method should query if
1087 * the state of the object permits it to be acquired in the shared
1088 * mode, and if so to acquire it.
1089 *
1090 * <p>This method is always invoked by the thread performing
1091 * acquire. If this method reports failure, the acquire method
1092 * may queue the thread, if it is not already queued, until it is
1093 * signalled by a release from some other thread.
1094 *
1095 * <p>The default implementation throws {@link
1096 * UnsupportedOperationException}.
1097 *
1098 * @param arg the acquire argument. This value is always the one
1099 * passed to an acquire method, or is the value saved on entry
1100 * to a condition wait. The value is otherwise uninterpreted
1101 * and can represent anything you like.
1102 * @return a negative value on failure; zero if acquisition in shared
1103 * mode succeeded but no subsequent shared-mode acquire can
1104 * succeed; and a positive value if acquisition in shared
1105 * mode succeeded and subsequent shared-mode acquires might
1106 * also succeed, in which case a subsequent waiting thread
1107 * must check availability. (Support for three different
1108 * return values enables this method to be used in contexts
1109 * where acquires only sometimes act exclusively.) Upon
1110 * success, this object has been acquired.
1111 * @throws IllegalMonitorStateException if acquiring would place this
1112 * synchronizer in an illegal state. This exception must be
1113 * thrown in a consistent fashion for synchronization to work
1114 * correctly.
1115 * @throws UnsupportedOperationException if shared mode is not supported
1116 */
1117 protected int tryAcquireShared(int arg) {
1118 throw new UnsupportedOperationException();
1119 }
1120
1121 /**
1122 * Attempts to set the state to reflect a release in shared mode.
1123 *
1124 * <p>This method is always invoked by the thread performing release.
1125 *
1126 * <p>The default implementation throws
1127 * {@link UnsupportedOperationException}.
1128 *
1129 * @param arg the release argument. This value is always the one
1130 * passed to a release method, or the current state value upon
1131 * entry to a condition wait. The value is otherwise
1132 * uninterpreted and can represent anything you like.
1133 * @return {@code true} if this release of shared mode may permit a
1134 * waiting acquire (shared or exclusive) to succeed; and
1135 * {@code false} otherwise
1136 * @throws IllegalMonitorStateException if releasing would place this
1137 * synchronizer in an illegal state. This exception must be
1138 * thrown in a consistent fashion for synchronization to work
1139 * correctly.
1140 * @throws UnsupportedOperationException if shared mode is not supported
1141 */
1142 protected boolean tryReleaseShared(int arg) {
1143 throw new UnsupportedOperationException();
1144 }
1145
1146 /**
1147 * Returns {@code true} if synchronization is held exclusively with
1148 * respect to the current (calling) thread. This method is invoked
1149 * upon each call to a non-waiting {@link ConditionObject} method.
1150 * (Waiting methods instead invoke {@link #release}.)
1151 *
1152 * <p>The default implementation throws {@link
1153 * UnsupportedOperationException}. This method is invoked
1154 * internally only within {@link ConditionObject} methods, so need
1155 * not be defined if conditions are not used.
1156 *
1157 * @return {@code true} if synchronization is held exclusively;
1158 * {@code false} otherwise
1159 * @throws UnsupportedOperationException if conditions are not supported
1160 */
1161 protected boolean isHeldExclusively() {
1162 throw new UnsupportedOperationException();
1163 }
1164
1165 /**
1166 * Acquires in exclusive mode, ignoring interrupts. Implemented
1167 * by invoking at least once {@link #tryAcquire},
1168 * returning on success. Otherwise the thread is queued, possibly
1169 * repeatedly blocking and unblocking, invoking {@link
1170 * #tryAcquire} until success. This method can be used
1171 * to implement method {@link Lock#lock}.
1172 *
1173 * @param arg the acquire argument. This value is conveyed to
1174 * {@link #tryAcquire} but is otherwise uninterpreted and
1175 * can represent anything you like.
1176 */
1177 public final void acquire(int arg) {
1178 if (!tryAcquire(arg) &&
1179 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
1180 selfInterrupt();
1181 }
1182
1183 /**
1184 * Acquires in exclusive mode, aborting if interrupted.
1185 * Implemented by first checking interrupt status, then invoking
1186 * at least once {@link #tryAcquire}, returning on
1187 * success. Otherwise the thread is queued, possibly repeatedly
1188 * blocking and unblocking, invoking {@link #tryAcquire}
1189 * until success or the thread is interrupted. This method can be
1190 * used to implement method {@link Lock#lockInterruptibly}.
1191 *
1192 * @param arg the acquire argument. This value is conveyed to
1193 * {@link #tryAcquire} but is otherwise uninterpreted and
1194 * can represent anything you like.
1195 * @throws InterruptedException if the current thread is interrupted
1196 */
1197 public final void acquireInterruptibly(int arg) throws InterruptedException {
1198 if (Thread.interrupted())
1199 throw new InterruptedException();
1200 if (!tryAcquire(arg))
1201 doAcquireInterruptibly(arg);
1202 }
1203
1204 /**
1205 * Attempts to acquire in exclusive mode, aborting if interrupted,
1206 * and failing if the given timeout elapses. Implemented by first
1207 * checking interrupt status, then invoking at least once {@link
1208 * #tryAcquire}, returning on success. Otherwise, the thread is
1209 * queued, possibly repeatedly blocking and unblocking, invoking
1210 * {@link #tryAcquire} until success or the thread is interrupted
1211 * or the timeout elapses. This method can be used to implement
1212 * method {@link Lock#tryLock(long, TimeUnit)}.
1213 *
1214 * @param arg the acquire argument. This value is conveyed to
1215 * {@link #tryAcquire} but is otherwise uninterpreted and
1216 * can represent anything you like.
1217 * @param nanosTimeout the maximum number of nanoseconds to wait
1218 * @return {@code true} if acquired; {@code false} if timed out
1219 * @throws InterruptedException if the current thread is interrupted
1220 */
1221 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
1222 if (Thread.interrupted())
1223 throw new InterruptedException();
1224 return tryAcquire(arg) ||
1225 doAcquireNanos(arg, nanosTimeout);
1226 }
1227
1228 /**
1229 * Releases in exclusive mode. Implemented by unblocking one or
1230 * more threads if {@link #tryRelease} returns true.
1231 * This method can be used to implement method {@link Lock#unlock}.
1232 *
1233 * @param arg the release argument. This value is conveyed to
1234 * {@link #tryRelease} but is otherwise uninterpreted and
1235 * can represent anything you like.
1236 * @return the value returned from {@link #tryRelease}
1237 */
1238 public final boolean release(int arg) {
1239 if (tryRelease(arg)) {
1240 Node h = head;
1241 if (h != null && h.waitStatus != 0)
1242 unparkSuccessor(h);
1243 return true;
1244 }
1245 return false;
1246 }
1247
1248 /**
1249 * Acquires in shared mode, ignoring interrupts. Implemented by
1250 * first invoking at least once {@link #tryAcquireShared},
1251 * returning on success. Otherwise the thread is queued, possibly
1252 * repeatedly blocking and unblocking, invoking {@link
1253 * #tryAcquireShared} until success.
1254 *
1255 * @param arg the acquire argument. This value is conveyed to
1256 * {@link #tryAcquireShared} but is otherwise uninterpreted
1257 * and can represent anything you like.
1258 */
1259 public final void acquireShared(int arg) {
1260 if (tryAcquireShared(arg) < 0)
1261 doAcquireShared(arg);
1262 }
1263
1264 /**
1265 * Acquires in shared mode, aborting if interrupted. Implemented
1266 * by first checking interrupt status, then invoking at least once
1267 * {@link #tryAcquireShared}, returning on success. Otherwise the
1268 * thread is queued, possibly repeatedly blocking and unblocking,
1269 * invoking {@link #tryAcquireShared} until success or the thread
1270 * is interrupted.
1271 * @param arg the acquire argument.
1272 * This value is conveyed to {@link #tryAcquireShared} but is
1273 * otherwise uninterpreted and can represent anything
1274 * you like.
1275 * @throws InterruptedException if the current thread is interrupted
1276 */
1277 public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
1278 if (Thread.interrupted())
1279 throw new InterruptedException();
1280 if (tryAcquireShared(arg) < 0)
1281 doAcquireSharedInterruptibly(arg);
1282 }
1283
1284 /**
1285 * Attempts to acquire in shared mode, aborting if interrupted, and
1286 * failing if the given timeout elapses. Implemented by first
1287 * checking interrupt status, then invoking at least once {@link
1288 * #tryAcquireShared}, returning on success. Otherwise, the
1289 * thread is queued, possibly repeatedly blocking and unblocking,
1290 * invoking {@link #tryAcquireShared} until success or the thread
1291 * is interrupted or the timeout elapses.
1292 *
1293 * @param arg the acquire argument. This value is conveyed to
1294 * {@link #tryAcquireShared} but is otherwise uninterpreted
1295 * and can represent anything you like.
1296 * @param nanosTimeout the maximum number of nanoseconds to wait
1297 * @return {@code true} if acquired; {@code false} if timed out
1298 * @throws InterruptedException if the current thread is interrupted
1299 */
1300 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
1301 if (Thread.interrupted())
1302 throw new InterruptedException();
1303 return tryAcquireShared(arg) >= 0 ||
1304 doAcquireSharedNanos(arg, nanosTimeout);
1305 }
1306
1307 /**
1308 * Releases in shared mode. Implemented by unblocking one or more
1309 * threads if {@link #tryReleaseShared} returns true.
1310 *
1311 * @param arg the release argument. This value is conveyed to
1312 * {@link #tryReleaseShared} but is otherwise uninterpreted
1313 * and can represent anything you like.
1314 * @return the value returned from {@link #tryReleaseShared}
1315 */
1316 public final boolean releaseShared(int arg) {
1317 if (tryReleaseShared(arg)) {
1318 doReleaseShared();
1319 return true;
1320 }
1321 return false;
1322 }
1323
1324 // Queue inspection methods
1325
1326 /**
1327 * Queries whether any threads are waiting to acquire. Note that
1328 * because cancellations due to interrupts and timeouts may occur
1329 * at any time, a {@code true} return does not guarantee that any
1330 * other thread will ever acquire.
1331 *
1332 * <p>In this implementation, this operation returns in
1333 * constant time.
1334 *
1335 * @return {@code true} if there may be other threads waiting to acquire
1336 */
1337 public final boolean hasQueuedThreads() {
1338 return head != tail;
1339 }
1340
1341 /**
1342 * Queries whether any threads have ever contended to acquire this
1343 * synchronizer; that is if an acquire method has ever blocked.
1344 *
1345 * <p>In this implementation, this operation returns in
1346 * constant time.
1347 *
1348 * @return {@code true} if there has ever been contention
1349 */
1350 public final boolean hasContended() {
1351 return head != null;
1352 }
1353
1354 /**
1355 * Returns the first (longest-waiting) thread in the queue, or
1356 * {@code null} if no threads are currently queued.
1357 *
1358 * <p>In this implementation, this operation normally returns in
1359 * constant time, but may iterate upon contention if other threads are
1360 * concurrently modifying the queue.
1361 *
1362 * @return the first (longest-waiting) thread in the queue, or
1363 * {@code null} if no threads are currently queued
1364 */
1365 public final Thread getFirstQueuedThread() {
1366 // handle only fast path, else relay
1367 return (head == tail)? null : fullGetFirstQueuedThread();
1368 }
1369
1370 /**
1371 * Version of getFirstQueuedThread called when fastpath fails
1372 */
1373 private Thread fullGetFirstQueuedThread() {
1374 /*
1375 * The first node is normally h.next. Try to get its
1376 * thread field, ensuring consistent reads: If thread
1377 * field is nulled out or s.prev is no longer head, then
1378 * some other thread(s) concurrently performed setHead in
1379 * between some of our reads. We try this twice before
1380 * resorting to traversal.
1381 */
1382 Node h, s;
1383 Thread st;
1384 if (((h = head) != null && (s = h.next) != null &&
1385 s.prev == head && (st = s.thread) != null) ||
1386 ((h = head) != null && (s = h.next) != null &&
1387 s.prev == head && (st = s.thread) != null))
1388 return st;
1389
1390 /*
1391 * Head's next field might not have been set yet, or may have
1392 * been unset after setHead. So we must check to see if tail
1393 * is actually first node. If not, we continue on, safely
1394 * traversing from tail back to head to find first,
1395 * guaranteeing termination.
1396 */
1397
1398 Node t = tail;
1399 Thread firstThread = null;
1400 while (t != null && t != head) {
1401 Thread tt = t.thread;
1402 if (tt != null)
1403 firstThread = tt;
1404 t = t.prev;
1405 }
1406 return firstThread;
1407 }
1408
1409 /**
1410 * Returns true if the given thread is currently queued.
1411 *
1412 * <p>This implementation traverses the queue to determine
1413 * presence of the given thread.
1414 *
1415 * @param thread the thread
1416 * @return {@code true} if the given thread is on the queue
1417 * @throws NullPointerException if the thread is null
1418 */
1419 public final boolean isQueued(Thread thread) {
1420 if (thread == null)
1421 throw new NullPointerException();
1422 for (Node p = tail; p != null; p = p.prev)
1423 if (p.thread == thread)
1424 return true;
1425 return false;
1426 }
1427
1428 /**
1429 * Return {@code true} if the apparent first queued thread, if one
1430 * exists, is not waiting in exclusive mode. Used only as a heuristic
1431 * in ReentrantReadWriteLock.
1432 */
1433 final boolean apparentlyFirstQueuedIsExclusive() {
1434 Node h, s;
1435 return ((h = head) != null && (s = h.next) != null &&
1436 s.nextWaiter != Node.SHARED);
1437 }
1438
1439 /**
1440 * Return {@code true} if the queue is empty or if the given thread
1441 * is at the head of the queue. This is reliable only if
1442 * <tt>current</tt> is actually Thread.currentThread() of caller.
1443 */
1444 final boolean isFirst(Thread current) {
1445 Node h, s;
1446 return ((h = head) == null ||
1447 ((s = h.next) != null && s.thread == current) ||
1448 fullIsFirst(current));
1449 }
1450
1451 final boolean fullIsFirst(Thread current) {
1452 // same idea as fullGetFirstQueuedThread
1453 Node h, s;
1454 Thread firstThread = null;
1455 if (((h = head) != null && (s = h.next) != null &&
1456 s.prev == head && (firstThread = s.thread) != null))
1457 return firstThread == current;
1458 Node t = tail;
1459 while (t != null && t != head) {
1460 Thread tt = t.thread;
1461 if (tt != null)
1462 firstThread = tt;
1463 t = t.prev;
1464 }
1465 return firstThread == current || firstThread == null;
1466 }
1467
1468
1469 // Instrumentation and monitoring methods
1470
1471 /**
1472 * Returns an estimate of the number of threads waiting to
1473 * acquire. The value is only an estimate because the number of
1474 * threads may change dynamically while this method traverses
1475 * internal data structures. This method is designed for use in
1476 * monitoring system state, not for synchronization
1477 * control.
1478 *
1479 * @return the estimated number of threads waiting to acquire
1480 */
1481 public final int getQueueLength() {
1482 int n = 0;
1483 for (Node p = tail; p != null; p = p.prev) {
1484 if (p.thread != null)
1485 ++n;
1486 }
1487 return n;
1488 }
1489
1490 /**
1491 * Returns a collection containing threads that may be waiting to
1492 * acquire. Because the actual set of threads may change
1493 * dynamically while constructing this result, the returned
1494 * collection is only a best-effort estimate. The elements of the
1495 * returned collection are in no particular order. This method is
1496 * designed to facilitate construction of subclasses that provide
1497 * more extensive monitoring facilities.
1498 *
1499 * @return the collection of threads
1500 */
1501 public final Collection<Thread> getQueuedThreads() {
1502 ArrayList<Thread> list = new ArrayList<Thread>();
1503 for (Node p = tail; p != null; p = p.prev) {
1504 Thread t = p.thread;
1505 if (t != null)
1506 list.add(t);
1507 }
1508 return list;
1509 }
1510
1511 /**
1512 * Returns a collection containing threads that may be waiting to
1513 * acquire in exclusive mode. This has the same properties
1514 * as {@link #getQueuedThreads} except that it only returns
1515 * those threads waiting due to an exclusive acquire.
1516 *
1517 * @return the collection of threads
1518 */
1519 public final Collection<Thread> getExclusiveQueuedThreads() {
1520 ArrayList<Thread> list = new ArrayList<Thread>();
1521 for (Node p = tail; p != null; p = p.prev) {
1522 if (!p.isShared()) {
1523 Thread t = p.thread;
1524 if (t != null)
1525 list.add(t);
1526 }
1527 }
1528 return list;
1529 }
1530
1531 /**
1532 * Returns a collection containing threads that may be waiting to
1533 * acquire in shared mode. This has the same properties
1534 * as {@link #getQueuedThreads} except that it only returns
1535 * those threads waiting due to a shared acquire.
1536 *
1537 * @return the collection of threads
1538 */
1539 public final Collection<Thread> getSharedQueuedThreads() {
1540 ArrayList<Thread> list = new ArrayList<Thread>();
1541 for (Node p = tail; p != null; p = p.prev) {
1542 if (p.isShared()) {
1543 Thread t = p.thread;
1544 if (t != null)
1545 list.add(t);
1546 }
1547 }
1548 return list;
1549 }
1550
1551 /**
1552 * Returns a string identifying this synchronizer, as well as its state.
1553 * The state, in brackets, includes the String {@code "State ="}
1554 * followed by the current value of {@link #getState}, and either
1555 * {@code "nonempty"} or {@code "empty"} depending on whether the
1556 * queue is empty.
1557 *
1558 * @return a string identifying this synchronizer, as well as its state
1559 */
1560 public String toString() {
1561 int s = getState();
1562 String q = hasQueuedThreads()? "non" : "";
1563 return super.toString() +
1564 "[State = " + s + ", " + q + "empty queue]";
1565 }
1566
1567
1568 // Internal support methods for Conditions
1569
1570 /**
1571 * Returns true if a node, always one that was initially placed on
1572 * a condition queue, is now waiting to reacquire on sync queue.
1573 * @param node the node
1574 * @return true if is reacquiring
1575 */
1576 final boolean isOnSyncQueue(Node node) {
1577 if (node.waitStatus == Node.CONDITION || node.prev == null)
1578 return false;
1579 if (node.next != null) // If has successor, it must be on queue
1580 return true;
1581 /*
1582 * node.prev can be non-null, but not yet on queue because
1583 * the CAS to place it on queue can fail. So we have to
1584 * traverse from tail to make sure it actually made it. It
1585 * will always be near the tail in calls to this method, and
1586 * unless the CAS failed (which is unlikely), it will be
1587 * there, so we hardly ever traverse much.
1588 */
1589 return findNodeFromTail(node);
1590 }
1591
1592 /**
1593 * Returns true if node is on sync queue by searching backwards from tail.
1594 * Called only when needed by isOnSyncQueue.
1595 * @return true if present
1596 */
1597 private boolean findNodeFromTail(Node node) {
1598 Node t = tail;
1599 for (;;) {
1600 if (t == node)
1601 return true;
1602 if (t == null)
1603 return false;
1604 t = t.prev;
1605 }
1606 }
1607
1608 /**
1609 * Transfers a node from a condition queue onto sync queue.
1610 * Returns true if successful.
1611 * @param node the node
1612 * @return true if successfully transferred (else the node was
1613 * cancelled before signal).
1614 */
1615 final boolean transferForSignal(Node node) {
1616 /*
1617 * If cannot change waitStatus, the node has been cancelled.
1618 */
1619 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
1620 return false;
1621
1622 /*
1623 * Splice onto queue and try to set waitStatus of predecessor to
1624 * indicate that thread is (probably) waiting. If cancelled or
1625 * attempt to set waitStatus fails, wake up to resync (in which
1626 * case the waitStatus can be transiently and harmlessly wrong).
1627 */
1628 Node p = enq(node);
1629 int ws = p.waitStatus;
1630 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
1631 LockSupport.unpark(node.thread);
1632 return true;
1633 }
1634
1635 /**
1636 * Transfers node, if necessary, to sync queue after a cancelled
1637 * wait. Returns true if thread was cancelled before being
1638 * signalled.
1639 * @param current the waiting thread
1640 * @param node its node
1641 * @return true if cancelled before the node was signalled.
1642 */
1643 final boolean transferAfterCancelledWait(Node node) {
1644 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
1645 enq(node);
1646 return true;
1647 }
1648 /*
1649 * If we lost out to a signal(), then we can't proceed
1650 * until it finishes its enq(). Cancelling during an
1651 * incomplete transfer is both rare and transient, so just
1652 * spin.
1653 */
1654 while (!isOnSyncQueue(node))
1655 Thread.yield();
1656 return false;
1657 }
1658
1659 /**
1660 * Invokes release with current state value; returns saved state.
1661 * Cancels node and throws exception on failure.
1662 * @param node the condition node for this wait
1663 * @return previous sync state
1664 */
1665 final int fullyRelease(Node node) {
1666 try {
1667 int savedState = getState();
1668 if (release(savedState))
1669 return savedState;
1670 } catch (RuntimeException ex) {
1671 node.waitStatus = Node.CANCELLED;
1672 throw ex;
1673 }
1674 // reach here if release fails
1675 node.waitStatus = Node.CANCELLED;
1676 throw new IllegalMonitorStateException();
1677 }
1678
1679 // Instrumentation methods for conditions
1680
1681 /**
1682 * Queries whether the given ConditionObject
1683 * uses this synchronizer as its lock.
1684 *
1685 * @param condition the condition
1686 * @return <tt>true</tt> if owned
1687 * @throws NullPointerException if the condition is null
1688 */
1689 public final boolean owns(ConditionObject condition) {
1690 if (condition == null)
1691 throw new NullPointerException();
1692 return condition.isOwnedBy(this);
1693 }
1694
1695 /**
1696 * Queries whether any threads are waiting on the given condition
1697 * associated with this synchronizer. Note that because timeouts
1698 * and interrupts may occur at any time, a <tt>true</tt> return
1699 * does not guarantee that a future <tt>signal</tt> will awaken
1700 * any threads. This method is designed primarily for use in
1701 * monitoring of the system state.
1702 *
1703 * @param condition the condition
1704 * @return <tt>true</tt> if there are any waiting threads
1705 * @throws IllegalMonitorStateException if exclusive synchronization
1706 * is not held
1707 * @throws IllegalArgumentException if the given condition is
1708 * not associated with this synchronizer
1709 * @throws NullPointerException if the condition is null
1710 */
1711 public final boolean hasWaiters(ConditionObject condition) {
1712 if (!owns(condition))
1713 throw new IllegalArgumentException("Not owner");
1714 return condition.hasWaiters();
1715 }
1716
1717 /**
1718 * Returns an estimate of the number of threads waiting on the
1719 * given condition associated with this synchronizer. Note that
1720 * because timeouts and interrupts may occur at any time, the
1721 * estimate serves only as an upper bound on the actual number of
1722 * waiters. This method is designed for use in monitoring of the
1723 * system state, not for synchronization control.
1724 *
1725 * @param condition the condition
1726 * @return the estimated number of waiting threads
1727 * @throws IllegalMonitorStateException if exclusive synchronization
1728 * is not held
1729 * @throws IllegalArgumentException if the given condition is
1730 * not associated with this synchronizer
1731 * @throws NullPointerException if the condition is null
1732 */
1733 public final int getWaitQueueLength(ConditionObject condition) {
1734 if (!owns(condition))
1735 throw new IllegalArgumentException("Not owner");
1736 return condition.getWaitQueueLength();
1737 }
1738
1739 /**
1740 * Returns a collection containing those threads that may be
1741 * waiting on the given condition associated with this
1742 * synchronizer. Because the actual set of threads may change
1743 * dynamically while constructing this result, the returned
1744 * collection is only a best-effort estimate. The elements of the
1745 * returned collection are in no particular order.
1746 *
1747 * @param condition the condition
1748 * @return the collection of threads
1749 * @throws IllegalMonitorStateException if exclusive synchronization
1750 * is not held
1751 * @throws IllegalArgumentException if the given condition is
1752 * not associated with this synchronizer
1753 * @throws NullPointerException if the condition is null
1754 */
1755 public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
1756 if (!owns(condition))
1757 throw new IllegalArgumentException("Not owner");
1758 return condition.getWaitingThreads();
1759 }
1760
1761 /**
1762 * Condition implementation for a {@link
1763 * AbstractQueuedSynchronizer} serving as the basis of a {@link
1764 * Lock} implementation.
1765 *
1766 * <p>Method documentation for this class describes mechanics,
1767 * not behavioral specifications from the point of view of Lock
1768 * and Condition users. Exported versions of this class will in
1769 * general need to be accompanied by documentation describing
1770 * condition semantics that rely on those of the associated
1771 * <tt>AbstractQueuedSynchronizer</tt>.
1772 *
1773 * <p>This class is Serializable, but all fields are transient,
1774 * so deserialized conditions have no waiters.
1775 */
1776 public class ConditionObject implements Condition, java.io.Serializable {
1777 private static final long serialVersionUID = 1173984872572414699L;
1778 /** First node of condition queue. */
1779 private transient Node firstWaiter;
1780 /** Last node of condition queue. */
1781 private transient Node lastWaiter;
1782
1783 /**
1784 * Creates a new <tt>ConditionObject</tt> instance.
1785 */
1786 public ConditionObject() { }
1787
1788 // Internal methods
1789
1790 /**
1791 * Adds a new waiter to wait queue.
1792 * @return its new wait node
1793 */
1794 private Node addConditionWaiter() {
1795 Node t = lastWaiter;
1796 // If lastWaiter is cancelled, clean out.
1797 if (t != null && t.waitStatus != Node.CONDITION) {
1798 unlinkCancelledWaiters();
1799 t = lastWaiter;
1800 }
1801 Node node = new Node(Thread.currentThread(), Node.CONDITION);
1802 if (t == null)
1803 firstWaiter = node;
1804 else
1805 t.nextWaiter = node;
1806 lastWaiter = node;
1807 return node;
1808 }
1809
1810 /**
1811 * Removes and transfers nodes until hit non-cancelled one or
1812 * null. Split out from signal in part to encourage compilers
1813 * to inline the case of no waiters.
1814 * @param first (non-null) the first node on condition queue
1815 */
1816 private void doSignal(Node first) {
1817 do {
1818 if ( (firstWaiter = first.nextWaiter) == null)
1819 lastWaiter = null;
1820 first.nextWaiter = null;
1821 } while (!transferForSignal(first) &&
1822 (first = firstWaiter) != null);
1823 }
1824
1825 /**
1826 * Removes and transfers all nodes.
1827 * @param first (non-null) the first node on condition queue
1828 */
1829 private void doSignalAll(Node first) {
1830 lastWaiter = firstWaiter = null;
1831 do {
1832 Node next = first.nextWaiter;
1833 first.nextWaiter = null;
1834 transferForSignal(first);
1835 first = next;
1836 } while (first != null);
1837 }
1838
1839 /**
1840 * Unlinks cancelled waiter nodes from condition queue.
1841 * Called only while holding lock. This is called when
1842 * cancellation occurred during condition wait, and upon
1843 * insertion of a new waiter when lastWaiter is seen to have
1844 * been cancelled. This method is needed to avoid garbage
1845 * retention in the absence of signals. So even though it may
1846 * require a full traversal, it comes into play only when
1847 * timeouts or cancellations occur in the absence of
1848 * signals. It traverses all nodes rather than stopping at a
1849 * particular target to unlink all pointers to garbage nodes
1850 * without requiring many re-traversals during cancellation
1851 * storms.
1852 */
1853 private void unlinkCancelledWaiters() {
1854 Node t = firstWaiter;
1855 Node trail = null;
1856 while (t != null) {
1857 Node next = t.nextWaiter;
1858 if (t.waitStatus != Node.CONDITION) {
1859 t.nextWaiter = null;
1860 if (trail == null)
1861 firstWaiter = next;
1862 else
1863 trail.nextWaiter = next;
1864 if (next == null)
1865 lastWaiter = trail;
1866 }
1867 else
1868 trail = t;
1869 t = next;
1870 }
1871 }
1872
1873 // public methods
1874
1875 /**
1876 * Moves the longest-waiting thread, if one exists, from the
1877 * wait queue for this condition to the wait queue for the
1878 * owning lock.
1879 *
1880 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1881 * returns {@code false}
1882 */
1883 public final void signal() {
1884 if (!isHeldExclusively())
1885 throw new IllegalMonitorStateException();
1886 Node first = firstWaiter;
1887 if (first != null)
1888 doSignal(first);
1889 }
1890
1891 /**
1892 * Moves all threads from the wait queue for this condition to
1893 * the wait queue for the owning lock.
1894 *
1895 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1896 * returns {@code false}
1897 */
1898 public final void signalAll() {
1899 if (!isHeldExclusively())
1900 throw new IllegalMonitorStateException();
1901 Node first = firstWaiter;
1902 if (first != null)
1903 doSignalAll(first);
1904 }
1905
1906 /**
1907 * Implements uninterruptible condition wait.
1908 * <ol>
1909 * <li> Save lock state returned by {@link #getState}
1910 * <li> Invoke {@link #release} with
1911 * saved state as argument, throwing
1912 * IllegalMonitorStateException if it fails.
1913 * <li> Block until signalled
1914 * <li> Reacquire by invoking specialized version of
1915 * {@link #acquire} with saved state as argument.
1916 * </ol>
1917 */
1918 public final void awaitUninterruptibly() {
1919 Node node = addConditionWaiter();
1920 int savedState = fullyRelease(node);
1921 boolean interrupted = false;
1922 while (!isOnSyncQueue(node)) {
1923 LockSupport.park(this);
1924 if (Thread.interrupted())
1925 interrupted = true;
1926 }
1927 if (acquireQueued(node, savedState) || interrupted)
1928 selfInterrupt();
1929 }
1930
1931 /*
1932 * For interruptible waits, we need to track whether to throw
1933 * InterruptedException, if interrupted while blocked on
1934 * condition, versus reinterrupt current thread, if
1935 * interrupted while blocked waiting to re-acquire.
1936 */
1937
1938 /** Mode meaning to reinterrupt on exit from wait */
1939 private static final int REINTERRUPT = 1;
1940 /** Mode meaning to throw InterruptedException on exit from wait */
1941 private static final int THROW_IE = -1;
1942
1943 /**
1944 * Checks for interrupt, returning THROW_IE if interrupted
1945 * before signalled, REINTERRUPT if after signalled, or
1946 * 0 if not interrupted.
1947 */
1948 private int checkInterruptWhileWaiting(Node node) {
1949 return (Thread.interrupted()) ?
1950 ((transferAfterCancelledWait(node))? THROW_IE : REINTERRUPT) :
1951 0;
1952 }
1953
1954 /**
1955 * Throws InterruptedException, reinterrupts current thread, or
1956 * does nothing, depending on mode.
1957 */
1958 private void reportInterruptAfterWait(int interruptMode)
1959 throws InterruptedException {
1960 if (interruptMode == THROW_IE)
1961 throw new InterruptedException();
1962 else if (interruptMode == REINTERRUPT)
1963 selfInterrupt();
1964 }
1965
1966 /**
1967 * Implements interruptible condition wait.
1968 * <ol>
1969 * <li> If current thread is interrupted, throw InterruptedException
1970 * <li> Save lock state returned by {@link #getState}
1971 * <li> Invoke {@link #release} with
1972 * saved state as argument, throwing
1973 * IllegalMonitorStateException if it fails.
1974 * <li> Block until signalled or interrupted
1975 * <li> Reacquire by invoking specialized version of
1976 * {@link #acquire} with saved state as argument.
1977 * <li> If interrupted while blocked in step 4, throw exception
1978 * </ol>
1979 */
1980 public final void await() throws InterruptedException {
1981 if (Thread.interrupted())
1982 throw new InterruptedException();
1983 Node node = addConditionWaiter();
1984 int savedState = fullyRelease(node);
1985 int interruptMode = 0;
1986 while (!isOnSyncQueue(node)) {
1987 LockSupport.park(this);
1988 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1989 break;
1990 }
1991 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1992 interruptMode = REINTERRUPT;
1993 if (node.nextWaiter != null)
1994 unlinkCancelledWaiters();
1995 if (interruptMode != 0)
1996 reportInterruptAfterWait(interruptMode);
1997 }
1998
1999 /**
2000 * Implements timed condition wait.
2001 * <ol>
2002 * <li> If current thread is interrupted, throw InterruptedException
2003 * <li> Save lock state returned by {@link #getState}
2004 * <li> Invoke {@link #release} with
2005 * saved state as argument, throwing
2006 * IllegalMonitorStateException if it fails.
2007 * <li> Block until signalled, interrupted, or timed out
2008 * <li> Reacquire by invoking specialized version of
2009 * {@link #acquire} with saved state as argument.
2010 * <li> If interrupted while blocked in step 4, throw InterruptedException
2011 * </ol>
2012 */
2013 public final long awaitNanos(long nanosTimeout) throws InterruptedException {
2014 if (Thread.interrupted())
2015 throw new InterruptedException();
2016 Node node = addConditionWaiter();
2017 int savedState = fullyRelease(node);
2018 long lastTime = System.nanoTime();
2019 int interruptMode = 0;
2020 while (!isOnSyncQueue(node)) {
2021 if (nanosTimeout <= 0L) {
2022 transferAfterCancelledWait(node);
2023 break;
2024 }
2025 LockSupport.parkNanos(this, nanosTimeout);
2026 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2027 break;
2028
2029 long now = System.nanoTime();
2030 nanosTimeout -= now - lastTime;
2031 lastTime = now;
2032 }
2033 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
2034 interruptMode = REINTERRUPT;
2035 if (node.nextWaiter != null)
2036 unlinkCancelledWaiters();
2037 if (interruptMode != 0)
2038 reportInterruptAfterWait(interruptMode);
2039 return nanosTimeout - (System.nanoTime() - lastTime);
2040 }
2041
2042 /**
2043 * Implements absolute timed condition wait.
2044 * <ol>
2045 * <li> If current thread is interrupted, throw InterruptedException
2046 * <li> Save lock state returned by {@link #getState}
2047 * <li> Invoke {@link #release} with
2048 * saved state as argument, throwing
2049 * IllegalMonitorStateException if it fails.
2050 * <li> Block until signalled, interrupted, or timed out
2051 * <li> Reacquire by invoking specialized version of
2052 * {@link #acquire} with saved state as argument.
2053 * <li> If interrupted while blocked in step 4, throw InterruptedException
2054 * <li> If timed out while blocked in step 4, return false, else true
2055 * </ol>
2056 */
2057 public final boolean awaitUntil(Date deadline) throws InterruptedException {
2058 if (deadline == null)
2059 throw new NullPointerException();
2060 long abstime = deadline.getTime();
2061 if (Thread.interrupted())
2062 throw new InterruptedException();
2063 Node node = addConditionWaiter();
2064 int savedState = fullyRelease(node);
2065 boolean timedout = false;
2066 int interruptMode = 0;
2067 while (!isOnSyncQueue(node)) {
2068 if (System.currentTimeMillis() > abstime) {
2069 timedout = transferAfterCancelledWait(node);
2070 break;
2071 }
2072 LockSupport.parkUntil(this, abstime);
2073 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2074 break;
2075 }
2076 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
2077 interruptMode = REINTERRUPT;
2078 if (node.nextWaiter != null)
2079 unlinkCancelledWaiters();
2080 if (interruptMode != 0)
2081 reportInterruptAfterWait(interruptMode);
2082 return !timedout;
2083 }
2084
2085 /**
2086 * Implements timed condition wait.
2087 * <ol>
2088 * <li> If current thread is interrupted, throw InterruptedException
2089 * <li> Save lock state returned by {@link #getState}
2090 * <li> Invoke {@link #release} with
2091 * saved state as argument, throwing
2092 * IllegalMonitorStateException if it fails.
2093 * <li> Block until signalled, interrupted, or timed out
2094 * <li> Reacquire by invoking specialized version of
2095 * {@link #acquire} with saved state as argument.
2096 * <li> If interrupted while blocked in step 4, throw InterruptedException
2097 * <li> If timed out while blocked in step 4, return false, else true
2098 * </ol>
2099 */
2100 public final boolean await(long time, TimeUnit unit) throws InterruptedException {
2101 if (unit == null)
2102 throw new NullPointerException();
2103 long nanosTimeout = unit.toNanos(time);
2104 if (Thread.interrupted())
2105 throw new InterruptedException();
2106 Node node = addConditionWaiter();
2107 int savedState = fullyRelease(node);
2108 long lastTime = System.nanoTime();
2109 boolean timedout = false;
2110 int interruptMode = 0;
2111 while (!isOnSyncQueue(node)) {
2112 if (nanosTimeout <= 0L) {
2113 timedout = transferAfterCancelledWait(node);
2114 break;
2115 }
2116 LockSupport.parkNanos(this, nanosTimeout);
2117 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2118 break;
2119 long now = System.nanoTime();
2120 nanosTimeout -= now - lastTime;
2121 lastTime = now;
2122 }
2123 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
2124 interruptMode = REINTERRUPT;
2125 if (node.nextWaiter != null)
2126 unlinkCancelledWaiters();
2127 if (interruptMode != 0)
2128 reportInterruptAfterWait(interruptMode);
2129 return !timedout;
2130 }
2131
2132 // support for instrumentation
2133
2134 /**
2135 * Returns true if this condition was created by the given
2136 * synchronization object.
2137 *
2138 * @return {@code true} if owned
2139 */
2140 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
2141 return sync == AbstractQueuedSynchronizer.this;
2142 }
2143
2144 /**
2145 * Queries whether any threads are waiting on this condition.
2146 * Implements {@link AbstractQueuedSynchronizer#hasWaiters}.
2147 *
2148 * @return {@code true} if there are any waiting threads
2149 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2150 * returns {@code false}
2151 */
2152 protected final boolean hasWaiters() {
2153 if (!isHeldExclusively())
2154 throw new IllegalMonitorStateException();
2155 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2156 if (w.waitStatus == Node.CONDITION)
2157 return true;
2158 }
2159 return false;
2160 }
2161
2162 /**
2163 * Returns an estimate of the number of threads waiting on
2164 * this condition.
2165 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength}.
2166 *
2167 * @return the estimated number of waiting threads
2168 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2169 * returns {@code false}
2170 */
2171 protected final int getWaitQueueLength() {
2172 if (!isHeldExclusively())
2173 throw new IllegalMonitorStateException();
2174 int n = 0;
2175 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2176 if (w.waitStatus == Node.CONDITION)
2177 ++n;
2178 }
2179 return n;
2180 }
2181
2182 /**
2183 * Returns a collection containing those threads that may be
2184 * waiting on this Condition.
2185 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads}.
2186 *
2187 * @return the collection of threads
2188 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2189 * returns {@code false}
2190 */
2191 protected final Collection<Thread> getWaitingThreads() {
2192 if (!isHeldExclusively())
2193 throw new IllegalMonitorStateException();
2194 ArrayList<Thread> list = new ArrayList<Thread>();
2195 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2196 if (w.waitStatus == Node.CONDITION) {
2197 Thread t = w.thread;
2198 if (t != null)
2199 list.add(t);
2200 }
2201 }
2202 return list;
2203 }
2204 }
2205
2206 /**
2207 * Setup to support compareAndSet. We need to natively implement
2208 * this here: For the sake of permitting future enhancements, we
2209 * cannot explicitly subclass AtomicInteger, which would be
2210 * efficient and useful otherwise. So, as the lesser of evils, we
2211 * natively implement using hotspot intrinsics API. And while we
2212 * are at it, we do the same for other CASable fields (which could
2213 * otherwise be done with atomic field updaters).
2214 */
2215 private static final Unsafe unsafe = Unsafe.getUnsafe();
2216 private static final long stateOffset;
2217 private static final long headOffset;
2218 private static final long tailOffset;
2219 private static final long waitStatusOffset;
2220 private static final long nextOffset;
2221
2222 static {
2223 try {
2224 stateOffset = unsafe.objectFieldOffset
2225 (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
2226 headOffset = unsafe.objectFieldOffset
2227 (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
2228 tailOffset = unsafe.objectFieldOffset
2229 (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
2230 waitStatusOffset = unsafe.objectFieldOffset
2231 (Node.class.getDeclaredField("waitStatus"));
2232 nextOffset = unsafe.objectFieldOffset
2233 (Node.class.getDeclaredField("next"));
2234
2235 } catch (Exception ex) { throw new Error(ex); }
2236 }
2237
2238 /**
2239 * CAS head field. Used only by enq
2240 */
2241 private final boolean compareAndSetHead(Node update) {
2242 return unsafe.compareAndSwapObject(this, headOffset, null, update);
2243 }
2244
2245 /**
2246 * CAS tail field. Used only by enq
2247 */
2248 private final boolean compareAndSetTail(Node expect, Node update) {
2249 return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
2250 }
2251
2252 /**
2253 * CAS waitStatus field of a node.
2254 */
2255 private final static boolean compareAndSetWaitStatus(Node node,
2256 int expect,
2257 int update) {
2258 return unsafe.compareAndSwapInt(node, waitStatusOffset,
2259 expect, update);
2260 }
2261
2262 /**
2263 * CAS next field of a node.
2264 */
2265 private final static boolean compareAndSetNext(Node node,
2266 Node expect,
2267 Node update) {
2268 return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
2269 }
2270}
2271