Java并发(二):队列
老样子,我们还是从一些例子开始慢慢熟悉各种并发队列。以看小说看故事的心态来学习不会显得那么枯燥而且更容易记忆深刻。
阻塞队列的等待?
阻塞队列最适合做的事情就是做为生产消费者的中间存储,以抵抗生产者消费者速率不匹配的问题,不但是在速率不匹配的时候能够有地方暂存任务,而且能在队列满或空的时候让线程进行阻塞,让出CPU的时间。这里对于阻塞两字加粗,是因为其实Java的线程在这个时候是等待(WAITING)状态而不是阻塞(BLOCKED),这个容易引起歧义。
下面我们来写一个程序比较一下阻塞和等待:
在上面的代码里,我们开启了三个线程:
- 一个是等待锁
- 一个是等待从队列获取数据
- 一个是等待加入数据到队列
运行程序之后,我们看一下线程的状态,可以看到:
- 等待锁的block线程,处于BLOCKED状态
- 还有两个被阻塞队列阻塞的线程,处于WAITING状态
我们来查看一下线程这两种状态的定义:
java感觉基础东西很多很杂
通俗一点说,BLOCKED就是线程自己想做事情,但是很无奈只能等别人先把事情干完,所以说是被阻塞,被动的,WAITING就是线程自己主动愿意放弃CPU时间进行等待,等别人在合适的时候通知自己来继续干活,所以说是等待中,主动的。Blocking Queue其实是让线程Waiting而不是Block。
生产消费
现在,我们使用阻塞队列尝试实现生产者消费者的功能。
首先,实现一个基类,通过一个开关来控制生产者消费者的执行:
然后实现生产者:
只要开关开启,生产者会无限进行数据生产,把数据加入队列,生产者每100ms生产一个数据,这里有一个计数器来提供要生产的数据。
下面实现消费者:
同样,消费者也是在开关开启或队列中有数据的时候,会不断进行数据消费。这里我们有一个计数器用来统计开关关闭之后,消费者还能消费多少数据。消费者消费速度是200ms消费一次,明显比生产者慢一半。通过这个配置我们可以想到,如果使用有界阻塞队列的话,因为消费速度比生产速度慢,所以队列会慢慢堆积一直到队列满,然后生产者线程被阻塞,我们来写一个测试程序看看是不是这样:
在这段代码里:
- 我们使用了容量为50的有界阻塞队列ArrayBlockingQueue作为容器
- 生产者10个线程
- 消费者4个线程
- 2秒后关闭生产者和消费者(这个时候生产者应该不会继续生产,但是消费者还会继续消费)
- 主线程等待所有生产者消费者执行完成
- 最后输出关闭后,消费者还能消费多少数据
部分运行结果如下:
从结果看到几个结论:
- 在队列满之前,生产者可以任意按照自己的速度生产,满了之后只能等消费者消费后才能进行生产,符合预期
- 关闭开启设置后,生产者很快就都完成了,但是最后消费者只退出了3个,有一个卡住了,线程状态如下:
- 两行靠在一起的代码就是能在一个原子操作内完成的,不是这样的,在之后的文章中我们会继续看到更有意思的一个错觉
- 既然使用了线程安全的队列,那么所有操作都是线程安全的一致的,这个说法也是一个误区,首先,我们无法确保所有操作都是线程安全以及一致的,具体需要参考JDK的文档说明,比如迭代操作,比如size()操作,很对线程安全的并发类型也无法提供一致性的保证,有的时候只是估算;其次,所谓所有操作仅限于单个操作,一般而言容器无法确保你两个操作两行代码之间不能有其它线程来继续操作这个容器
这个Bug是很容易忽略的,我们可以改一下消费者代码,利用有超时等待的poll()来解决这个问题:
- 这次Consumer3没有永远卡住,而是在等待了1秒后超时了,没有拿到数据
- 最后输出的totalConsumedAfterShutdown是60而不是最大队列50,这个也很容易想到为什么,enable=false之后,之前那10个生产者当前的循环还会继续执行,把数据加入队列,但是这个结果永远只会是60(50+10生产者)吗?你可以想想
队列各种方法执行速度比拼
前面我们也看到了,队列消费的操作可以take()可以poll(),各种操作的区别如下:
- 抛出异常就是在操作失败的时候直接抛出异常
- 特殊值就是不能执行操作的时候返回false或null
- 阻塞就是线程进行等待状态等待可以操作为止
- 超时就是等待一定时间不行的话再放弃
这些操作之间的性能是否有区别呢,我们写一个简单的程序测试一下
有几个地方值得注意:
- ConcurrentLinkedQueue以及LinkedTransferQueue的size()操作特别慢,见JDK说明:
所以我们在使用这两种队列的时候特别需要注意 - 总体上来说,add相对于offer,poll相对于remove没有什么性能差异,根据自己的需求使用对应的方法即可
下面我们稍微改下代码测试一下BlockingQueue的put()和take():
各种场景下各种队列的吞吐测试
在这次的测试中,我们模拟一下场景:
模拟一下不同的消费者生产者线程数量配比的情况下,各种队列完成一定数量元素的存取操作总共的耗时。我们定义三种模式:
- ProducerAndConsumerShareThread:也就是存取操作在一个线程中完成,先存后取
- ProducerAndThenConsumer:也就是先把队列用生产者填充完毕,然后再用消费者去全部读取出来
- ConcurrentProducerAndConsumer:也就是生产者和消费者同时操作队列,同时进行存和取操作
我们定义的所有测试场景如下:
十几种测试,覆盖这些场景:
- 同时存取模式下不同生产者和消费者线程数量的情况
- 同时存取模式下生产者和消费者数量不均衡的情况
- 先存后取模式下不同生产者和消费者线程数量的情况
- 存取操作在一个线程依次操作模式下不同线程数量的情况
主要测试三种队列,每一种队列测试之间GC一次尽量排除干扰:
生产者:
这次的测试,我们预先根据线程数量算好执行次数,而不是像之前的测试一样所有的任务统一由线程池调度,这样更容易测试出队列本身的性能,排除干扰。这里可以看到如果是存取共享模式的话,生产者直接做存取操作,其它模式的话,生产者仅仅做存的操作。
消费者:
生产者和消费者我们都用了两个CountDownLatch来做拦截,一个startCountDownLatch用来在所有线程都启动后由主线程通知一下子放开所有的线程,一个finishCountDownLatch用来让主线程等待线程的执行完毕。
主要的测试代码如下:
可以看到三种模式的处理不同:
- 对于存取共享线程的话,我们只有生产者线程
- 对于先存后取模式的话,在所有生产者线程执行完成后我们再开启消费者线程
- 对于并发存取模式的话,我们同时开启两组线程
整个测试结果汇总如下(这个测试是在12核阿里云跑出来的,元素数1000万):
说实话这个测试的结果不是我想象的那样,我想象的是随着并发的增多队列性能会急剧下降,而且各种队列之间有显著的性能差异,这个结果是这样这也可以说明这些队列性能都是很不错的,没有明显的短板。
可以大概得出几个结论:
- 随着并发的增多会降低一些吞吐,不过也都还好,并发太小吞吐也上不去
- ArrayBlockingQueue性能稳定,而且性能也几乎是最好的
- 在生产者数量大大小于消费者数量的时候,LinkedBlockingQueue表现出最好的吞吐,而且比其它两个好很多,这点我还没细究,有待研究是为什么
一般而言,阻塞队列中,无界队列可以选择LinkedBlockingQueue,有界队列可以选择ArrayBlockingQueue,后者还有公平参数可以开启公平特性,有关这个特性下面我们也会来观察。
通过同步队列观察公平特性
SynchronousQueue是没有容量的阻塞队列,只有等另一个线程移出元素后才能插入元素成功。这里我们写一段代码来测试,沿用之前的消费者和生产者类,只是修改了2秒后关闭队列的地方,这里我们加上了interrupt()操作,否则生产者是无法退出的:
延迟队列
这里给出一个延迟队列的例子,我们往队列提交10次延迟消息,每次提交2条一样的消息,消息的绝对延迟时间从1到10秒。
输出如下:
可以看到每过1秒输出2条日志,符合预期。
一个真实的队列误用的血案
之前生产上遇到过一个OOM的问题,排查下来是队列使用不当,这里我们就来看下这个问题,代码逻辑是:
- 我们有一个10个线程的线程池
- 我们使用了LinkedTransferQueue阻塞队列
- 我们通过线程池异步向这个队列提交4000个任务
- 我们通过线程池异步从这个队列获取4000个任务
比较特殊的是,使用了transfer()方法,开发的小伙伴可能觉得LinkedTransferQueue比较酷炫,所以选择了这个队列,并且认为transfer()可以直接把任务交给消费者性能较高,所以使用了这个方法。
代码如下:
于是,他没多想把线程池修改为了newCachedThreadPool,程序可以正常执行了,看看运行结果:
即使我们把代码修改为使用LinkedBlockingQueue,配合newCachedThreadPool也会创建几十个线程(如果元素数量足够多,几百个几千个也有可能)。因为一旦阻塞,newCachedThreadPool就会毫不犹豫创建新线程。
对于生产者消费者这种任务,还是建议直接使用线程来实现,生产者消费者的阻塞不相互干扰,而且线程池也是使用队列来管理任务的,用了线程池相当于两次队列,没有必要。
回顾总结
我们来看一下这次实验涉及到的一些阻塞队列:
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列PriorityQueue实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
DelayQueue、SynchronousQueue和PriorityBlockingQueue是特种队列,有特殊用途根据需要选择。
LinkedTransferQueue也算是特种队列,它可以实现类似背压的效果,在特殊场景下使用。
ArrayBlockingQueue和LinkedBlockingQueue背后的数据结构不同,它们可能是我们最常用的队列了,区别如下:
- ArrayBlockingQueue有公平特性,开启公平特性会降低吞吐,次操作结果如下,前面一个是关闭公平,后面一个是开启公平
- ArrayBlockingQueue会预分配存储,但是这也意味着会一下子占用大块内存,LinkedBlockingQueue不是这样的
- 如果需要无界的话只能选择LinkedBlockingQueue(当然LinkedBlockingQueue也可以有界)
非阻塞队列ConcurrentLinkedQueue比较特殊,首先它不是阻塞队列,其次它不使用锁,而是使用CAS,在超高并发的场景下,显然它可以到达更好的性能。
这里利用之前的代码最后做了一次对比测试,这里我们没有测试并发存取模式,因为消费者不知道何时消费完毕,在消费不到数据的时候进行死循环意义不大:
所以在特殊的场景下,比如生产者生产好了数据扔到队列中,有N多个消费者需要并发消费这个时或许可以发挥ConcurrentLinkedQueue的威力(但是,之前也说过了,它的size()比较坑爹),常年处于空的队列不太适合,这个时候使用阻塞队列更合适。
好吧,看来90%的时候还是用ArrayBlockingQueue和LinkedBlockingQueue太平,有界用前者,需要无界用后者,但是认真考虑下,你真的需要无界吗。通过我们的测试可以发现这些队列在高并发下都有着百万以上的QPS性能,一般而言用哪个都不会出现瓶颈,反而是我们更应该注意因为阻塞导致的线程数量增多和队列的容量占用的内存。
本文中,我们还花式使用了各种方式来测试队列:
- 普通线程池
- ForkJoin
- 独立线程
这里想说的是,对于生产消费这样的任务最好还是使用阻塞队列配置独立的消费线程,生产者可以直接是业务线程,而不是去使用线程池,没有这个必要。
同样,代码见我的Github
版权声明:
本文来源网络,所有图片文章版权属于原作者,如有侵权,联系删除。
本文网址:https://www.bianchenghao6.com/h6javajc/25333.html