// 自旋 // 前驱为头节点时尝试获取锁 // 检查是否应该 park // 返回值为中断状态 finalbooleanacquireQueued(final Node node, int arg) { booleaninterrupted=false; try { for (;;) { // 前驱 finalNodep= node.predecessor(); // 如果前驱是头节点,尝试获取锁 if (p == head && tryAcquire(arg)) { // 当前线程持有锁,为新的头节点 setHead(node); p.next = null; // help GC return interrupted; } // 检查是否应该阻塞 (park) if (shouldParkAfterFailedAcquire(p, node)) // 阻塞并且检测终端状态 interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } }
...
// 检查并更新获取失败的节点的状态。如果线程应该阻塞则返回 true。这是轮询中的主要信号控制 privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) { intws= pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 前驱已经设置状态为 SIGNAL,因此当前节点可以安全地停放。 returntrue; if (ws > 0) { /* * 前驱被取消了,不断往前直到找到第一个未被取消的节点(头节点必然未被取消) */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 尝试将前驱设置状态为 SIGNAL // 前驱节点的 waitStatus=-1 依赖于后继节点设置 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } returnfalse; }
// unpark 后继节点,如果 后继节点被取消了或者为空 就从尾节点向前找第一个未被取消的 privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; if (ws < 0) // 清除 node.compareAndSetWaitStatus(ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // unpark 后继节点。但如果后继被取消或者 **后继为null** // 则从 **尾部向前** 遍历以找到实际的未取消后继者。 Nodes= node.next; if (s == null || s.waitStatus > 0) { s = null; for (Nodep= tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) // 唤醒后继节点 LockSupport.unpark(s.thread); }
非公平锁
在调用 lock 时,两种锁都会先通过一次 CAS 也就是 tryAcquire 来尝试获取锁。
在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,返回 CAS 结果;但是公平锁会判断等待队列是否有线程处于等待状态,如果没有也是返回 CAS 结果,如果有则不去抢锁,直接返回失败。
// 返回节点是否已经被转移到同步队列。由其他线程通过 signal 将该节点转移到同步队列 finalbooleanisOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) // 如果 waitStatus 为 CONDITION // 或者 prev 为空的话(因为如果在同步队列的话后继可能为空, // 但是 prev 一定不为空,因为有 head) // 就必然还在条件队列中 returnfalse; if (node.next != null) // If has successor, it must be on queue // 如果 next 不为空,就必然在同步队列 // 条件队列的后继用的是 Node#nextWaiter 字段,不会用 next returntrue; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 从尾部开始遍历同步队列并比较,确保确实成功了 // 这是因为当 node.prev 不为空时,可能由于 CAS 失败导致暂时还未加入同步队列成功 return findNodeFromTail(node); }
这么做的原因其实是同步队列还需要原先的节点 (其实就是 head) ,我们回想前文同步队列的相关知识,当线程释放锁时,唤醒后继节点,相当于后继节点获得了 CAS 争用锁的资格,但实际上争用的这个动作不是发生在当前线程,并且是否成功或者哪个线程成功,哪个节点作为新的 head ,其实都是未定且未发生的,也就是说异步发生的,所以在这个时候还是需要保留先前的 head 节点。
publicfinalvoidsignal() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); Nodefirst= firstWaiter; if (first != null) doSignal(first); }
...
// 尝试将条件队列的第一个节点进行转移,如果失败的话就向后面的节点尝试 privatevoiddoSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
...
// 将节点从条件队列转移到同步队列。如果成功则返回 true。 finalbooleantransferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ // 清除条件状态 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) returnfalse;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 加入同步队列,返回其前驱节点 Nodep= enq(node); intws= p.waitStatus; // 将前驱的状态设置为 SIGNAL ,以让其唤醒该节点的线程 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 如果前驱被取消了或者状态设置失败,由当前线程唤醒该节点的线程 LockSupport.unpark(node.thread); returntrue; }
publicstaticvoidmain(String[] args)throws InterruptedException { intN=10; CountDownLatchstartSignal=newCountDownLatch(1); CountDownLatchdoneSignal=newCountDownLatch(N); for (inti=0; i < N-2; ++i){ // create and start threads newThread(newWorker(startSignal, doneSignal)).start(); }
doSomethingElse(); //don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); //wait for all to finish }