Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说CountDownLatch - 原理解读,希望能够帮助你!!!。
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。
用法一:某一线程在开始运行前等待n个线程执行完毕
将CountDownLatch的计数器初始化为n:new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减1, countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行
用法二:实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行
初始化一个共享的CountDownLatch(1),将其计数器初始化为1,多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
CountDownLatch的内部类Sync继承了AQS(AbstractQueuedSynchronizer)抽象类,该内部类实现了 tryAcquireShared()方法 和 tryReleaseShared()方法,说明了底层使用了AQS的共享锁(AQS框架的实现分两类,一类是独占锁,一类是共享锁,详细了解请参考 AQS - 原理解读)
CountDownLatch原理:
- 内部维护了AQS的一个int类型的变量state,初始化将该变量设置为传入的int值,使用该并发工具主要使用其中的两个方法:await()和countDown()
- 调用await()方法的线程会进行一个自旋,当调用countDown()的线程数量达到state个时,调用await()方法的线程将继续运行。
废话不多说,开始一步步分析代码(代码解读采用注释的方式):
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
//实例化Sync(AbstractQueuedSynchronizer的子类),接下来看Sync构造函数
this.sync = new Sync(count);
}
Sync(int count) {
//设置状态量state,该状态量是一个volatile的int类型变量,
//AQS框架的一个多功能状态量,在不同的AQS实现的并发工具类中有不同的含义
setState(count);
}
public void await() throws InterruptedException {
//AQS的方法,下边进入acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判断是否已被中断,中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquireShared是调用Sync覆盖AQS的方法,以下【5】介绍该方法
if (tryAcquireShared(arg) < 0)
//【6】介绍该方法
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
//我们在构造函数中设置的state状态量一般是大于等于1的一个正数
//比如我们new CountDownLatch(6),那么此时getState()应返回6,
//所以当我们进入await()方法时,此处的tryAcquireShared()返回-1,
//那么以上【4】方法中下一步会进入doAcquireSharedInterruptibly()方法
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前线程封装的Node节点加入到AQS的同步队列尾部,这里不做详细描述
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//进入无限循环
for (;;) {
//node的前节点
final AbstractQueuedSynchronizer.Node p = node.predecessor();
//当node的前节点为头节点时,这里你可能会有疑问(
//为什么它的前节点是头节点时才执行里边的代码?)
//在AQS同步队列数据结构中,它的头节点是一个空节点,用作占位,
//head指向该节点。所以当node的前节点为head指向的这个节点时,
//表明该节点已经到了同步队列的头部,可以执行了
if (p == head) {
//获取共享锁
int r = tryAcquireShared(arg);
//退出循环的唯一条件,在这里是r=1时
if (r >= 0) {
//以下2行主要作用将该node节点设为
//队头节点(head指向的这个空节点),并唤醒该线程
setHeadAndPropagate(node, r);//【14】
p.next = null; // help GC
failed = false;
return;
}
}
//shouldParkAfterFailedAcquire()判断是否应该阻塞【7】
//parkAndCheckInterrupt()阻塞当前线程【8】
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
//如果失败或出现异常,失败 取消该节点,以便唤醒后续节点
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前节点的状态
int ws = pred.waitStatus;
//表明前节点是阻塞的,那么就要直接跟在前边这个节点后阻塞,此时返回true
if (ws == Node.SIGNAL)
/* * This node has already set status asking a release * to signal it, so it can safely park. */
return true;
//表明前一个节点已经是被取消的节点,
//它就不会跟在这个被取消节点的后边,
//它要往链表的前边走,直到找到一个前节点状态不大于0的,跟在它后边
if (ws > 0) {
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
//将前节点的前节点作为node的前节点 ,并将node的prev指向新的前节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//新的前节点next指向node
pred.next = node;
} else {
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
//waitStatus状态为0或-3,将该前节点直接设置为阻塞状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//阻塞该线程
LockSupport.park(this);
//返回是否中断
return Thread.interrupted();
}
//触发唤醒条件的方法
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//当tryReleaseShared()方法返回true时,state状态值为0,
//假如new CountDownLatch(6),则前五次执行countDown(),
//会执行到这里的tryReleaseShared(),返回false;
//最后一次执行countDown()时才会进入doReleaseShared()方法
//去唤醒被await()阻塞的线程,返回true
//【11】
if (tryReleaseShared(arg)) {
//唤醒被await()阻塞的线程【12】
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//获取state状态值
int c = getState();
//state为0返回false,表明被await()阻塞的线程已唤醒
if (c == 0)
return false;
//state>0,将状态减1,表明被await()阻塞的线程未唤醒
int nextc = c-1;
//CAS设置state状态为减1后的新值
if (compareAndSetState(c, nextc))
//state-1后等于0,表明要去唤醒被await()阻塞的线程
return nextc == 0;
}
}
private void doReleaseShared() {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) {
//头节点引用赋值临时变量h
Node h = head;
//
if (h != null && h != tail) {
//头节点状态
int ws = h.waitStatus;
//头节点状态为-1,表示在阻塞
if (ws == Node.SIGNAL) {
//CAS将头节点状态设为0,设置失败则跳过以下操作
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒头节点的下一个节点
unparkSuccessor(h);
}
//头节点状态为0,CAS将头节点状态设为-3,设置失败则跳过
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新head,就会再次循环。目的当然是为了再次执行unparkSuccessor(h),即唤醒队列中第一个等待的线程。
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
//node,在本文中也就是头节点的状态
int ws = node.waitStatus;
//将node节点状态设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
//node节点的下一个节点,第一个真实存储线程的节点
Node s = node.next;
//为null表示同步队列已空, s.waitStatus > 0表示该节点是被取消的节点,
if (s == null || s.waitStatus > 0) {
//将node节点的下一个节点置空,
s = null;
//从队尾开始向前找,找到node节点后的第二个节点,
//并将该节点赋值给将要被执行唤醒操作的节点s
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒node节点下一个阻塞节点的线程
LockSupport.unpark(s.thread);
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//将node设置为头节点,细节不再描述,类似于出队操作
setHead(node);
/* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//唤醒下一个线程
doReleaseShared();
}
}
如有问题请扫描以下二维码(备注:CSDN)与我讨论
今天的分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
上一篇
已是最后文章
下一篇
已是最新文章