CountDownLatch - 原理解读

(3) 2024-05-20 10:12

Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说CountDownLatch - 原理解读,希望能够帮助你!!!。

CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

一、使用场景

  • 用法一:某一线程在开始运行前等待n个线程执行完毕

    将CountDownLatch的计数器初始化为n:new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减1, countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行

  • 用法二:实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行

    初始化一个共享的CountDownLatch(1),将其计数器初始化为1,多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。

二、源码解读

CountDownLatch的内部类Sync继承了AQS(AbstractQueuedSynchronizer)抽象类,该内部类实现了 tryAcquireShared()方法 和 tryReleaseShared()方法,说明了底层使用了AQS的共享锁(AQS框架的实现分两类,一类是独占锁,一类是共享锁,详细了解请参考 AQS - 原理解读)

CountDownLatch原理:

  • 内部维护了AQS的一个int类型的变量state,初始化将该变量设置为传入的int值,使用该并发工具主要使用其中的两个方法:await()和countDown()
  • 调用await()方法的线程会进行一个自旋,当调用countDown()的线程数量达到state个时,调用await()方法的线程将继续运行。

CountDownLatch - 原理解读_https://bianchenghao6.com/blog__第1张
废话不多说,开始一步步分析代码(代码解读采用注释的方式):


  1. CountDownLatch构造函数
public CountDownLatch(int count) { 
   
	if (count < 0) throw new IllegalArgumentException("count < 0");
	//实例化Sync(AbstractQueuedSynchronizer的子类),接下来看Sync构造函数
	this.sync = new Sync(count);
}

  1. Sync构造函数
Sync(int count) { 
   
	//设置状态量state,该状态量是一个volatile的int类型变量,
	//AQS框架的一个多功能状态量,在不同的AQS实现的并发工具类中有不同的含义 
    setState(count);
}

  1. await()方法
public void await() throws InterruptedException { 
   
	//AQS的方法,下边进入acquireSharedInterruptibly()方法
	sync.acquireSharedInterruptibly(1);
}

  1. acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
		throws InterruptedException { 
   
	//判断是否已被中断,中断则抛出异常
	if (Thread.interrupted())
		throw new InterruptedException();
	//tryAcquireShared是调用Sync覆盖AQS的方法,以下【5】介绍该方法
	if (tryAcquireShared(arg) < 0)
		//【6】介绍该方法
		doAcquireSharedInterruptibly(arg);
}

  1. tryAcquireShared()方法【获取共享锁】
protected int tryAcquireShared(int acquires) { 
   
	//我们在构造函数中设置的state状态量一般是大于等于1的一个正数
	//比如我们new CountDownLatch(6),那么此时getState()应返回6,
	//所以当我们进入await()方法时,此处的tryAcquireShared()返回-1,
	//那么以上【4】方法中下一步会进入doAcquireSharedInterruptibly()方法
	return (getState() == 0) ? 1 : -1;
}

  1. doAcquireSharedInterruptibly()方法【最核心的方法】
private void doAcquireSharedInterruptibly(int arg)
		throws InterruptedException { 
   
	//将当前线程封装的Node节点加入到AQS的同步队列尾部,这里不做详细描述
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try { 
   
		//进入无限循环
		for (;;) { 
   
			//node的前节点
			final AbstractQueuedSynchronizer.Node p = node.predecessor();
			//当node的前节点为头节点时,这里你可能会有疑问(
			//为什么它的前节点是头节点时才执行里边的代码?)
			//在AQS同步队列数据结构中,它的头节点是一个空节点,用作占位,
			//head指向该节点。所以当node的前节点为head指向的这个节点时,
			//表明该节点已经到了同步队列的头部,可以执行了
			if (p == head) { 
   
				//获取共享锁
				int r = tryAcquireShared(arg);
				//退出循环的唯一条件,在这里是r=1时
				if (r >= 0) { 
   
					//以下2行主要作用将该node节点设为
					//队头节点(head指向的这个空节点),并唤醒该线程
					setHeadAndPropagate(node, r);//【14】
					p.next = null; // help GC
					failed = false;
					return;
				}
			}
			//shouldParkAfterFailedAcquire()判断是否应该阻塞【7】
			//parkAndCheckInterrupt()阻塞当前线程【8】
			if (shouldParkAfterFailedAcquire(p, node) &&
					parkAndCheckInterrupt())
				throw new InterruptedException();
		}
	} finally { 
   
		if (failed)
			//如果失败或出现异常,失败 取消该节点,以便唤醒后续节点
			cancelAcquire(node);
	}
}

  1. shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 
   
	//获取前节点的状态
	int ws = pred.waitStatus;
	//表明前节点是阻塞的,那么就要直接跟在前边这个节点后阻塞,此时返回true
	if (ws == Node.SIGNAL)
		/* * This node has already set status asking a release * to signal it, so it can safely park. */
		return true;
	//表明前一个节点已经是被取消的节点,
	//它就不会跟在这个被取消节点的后边,
	//它要往链表的前边走,直到找到一个前节点状态不大于0的,跟在它后边
	if (ws > 0) { 
   
		/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
		do { 
   
			//将前节点的前节点作为node的前节点 ,并将node的prev指向新的前节点 
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		//新的前节点next指向node
		pred.next = node;
	} else { 
   
		/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
		 //waitStatus状态为0或-3,将该前节点直接设置为阻塞状态
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}

  1. parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() { 
   
	//阻塞该线程
	LockSupport.park(this);
	//返回是否中断
	return Thread.interrupted();
}

  1. countDown()
//触发唤醒条件的方法
public void countDown() { 
   
	sync.releaseShared(1);
}

  1. releaseShared()
public final boolean releaseShared(int arg) { 
   
	//当tryReleaseShared()方法返回true时,state状态值为0,
	//假如new CountDownLatch(6),则前五次执行countDown(),
	//会执行到这里的tryReleaseShared(),返回false;
	//最后一次执行countDown()时才会进入doReleaseShared()方法
	//去唤醒被await()阻塞的线程,返回true
	//【11】
	if (tryReleaseShared(arg)) { 
   
		//唤醒被await()阻塞的线程【12】
		doReleaseShared();
		return true;
	}
	return false;
}

  1. tryReleaseShared()
protected boolean tryReleaseShared(int releases) { 
   
	// Decrement count; signal when transition to zero
	for (;;) { 
   
	//获取state状态值
		int c = getState();
		//state为0返回false,表明被await()阻塞的线程已唤醒
		if (c == 0)
			return false;
		//state>0,将状态减1,表明被await()阻塞的线程未唤醒
		int nextc = c-1;
		//CAS设置state状态为减1后的新值
		if (compareAndSetState(c, nextc))
			//state-1后等于0,表明要去唤醒被await()阻塞的线程
			return nextc == 0;
	}
}

  1. doReleaseShared()
private void doReleaseShared() { 
   
	/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
	for (;;) { 
   
		//头节点引用赋值临时变量h
		Node h = head;
		//
		if (h != null && h != tail) { 
   
			//头节点状态
			int ws = h.waitStatus;
			//头节点状态为-1,表示在阻塞
			if (ws == Node.SIGNAL) { 
   
				//CAS将头节点状态设为0,设置失败则跳过以下操作
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
					continue;            // loop to recheck cases
				//唤醒头节点的下一个节点
				unparkSuccessor(h);
			}
			//头节点状态为0,CAS将头节点状态设为-3,设置失败则跳过
			else if (ws == 0 &&
					!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
				continue;                // loop on failed CAS
		}
		//保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新head,就会再次循环。目的当然是为了再次执行unparkSuccessor(h),即唤醒队列中第一个等待的线程。
		if (h == head)                   // loop if head changed
			break;
	}
}

  1. unparkSuccessor()
private void unparkSuccessor(Node node) { 
   
	/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
	//node,在本文中也就是头节点的状态
	int ws = node.waitStatus;
	//将node节点状态设置为0
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);

	/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
	//node节点的下一个节点,第一个真实存储线程的节点
	Node s = node.next;
	//为null表示同步队列已空, s.waitStatus > 0表示该节点是被取消的节点,
	if (s == null || s.waitStatus > 0) { 
   
		//将node节点的下一个节点置空,
		s = null;
		//从队尾开始向前找,找到node节点后的第二个节点,
		//并将该节点赋值给将要被执行唤醒操作的节点s
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	if (s != null)
		//唤醒node节点下一个阻塞节点的线程
		LockSupport.unpark(s.thread);
}

  1. setHeadAndPropagate()【唤醒线程后执行】
private void setHeadAndPropagate(Node node, int propagate) { 
   
	Node h = head; // Record old head for check below
	//将node设置为头节点,细节不再描述,类似于出队操作
	setHead(node);
	/* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */
	if (propagate > 0 || h == null || h.waitStatus < 0 ||
			(h = head) == null || h.waitStatus < 0) { 
   
		Node s = node.next;
		if (s == null || s.isShared())
			//唤醒下一个线程
			doReleaseShared();
	}
}

如有问题请扫描以下二维码(备注:CSDN)与我讨论
CountDownLatch - 原理解读_https://bianchenghao6.com/blog__第2张

今天的分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

上一篇

已是最后文章

下一篇

已是最新文章

发表回复