Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说
java高并发线程池_多线程和并发,希望能够帮助你!!!。
作者:丁威
原文链接:https://mp.weixin..com/s/p_Z8q09bewQ6aESY7VcsEw
如果大家从事的是业务开发,在工作中如果涉及到并发,通常是引入线程池来实现并发。
但如果从事的是基础框架的开发,通常并不会直接使用线程池,会按需创建单个线程,并且为了职责的单一与提升性能,通常单个线程只会负责一个流程中的部分功能,多个线程紧密配合。
那线程与线程之间如何协作呢?如果子线程出现异常,如何通知主线程呢?
可以明确的告诉大家,子线程抛出异常,主线程是无法捕获到异常的。
请带着上述问题,开始本文的学习。
这里我想拿RocketMQ中的一个场景与大家来分享,通常这也是两个线程进行协作十分经典的使用示例。
在RocketMQ消费端,PUSH模式消费者在消费之前需要提前做好准备:
这里简单介绍一下相关实现:在RocketMQ消费者PUSH模式启动后,消费组中的成员发生了变化,需要进行重平衡,即进行队列重新负载,主要的依据是查询当前队列的总个数与当前消费者个数,然后使用负载均衡算法(例如平均分配),各个消费者获取分配后的队列后,依次向Broker服务端拉取消息,然后提交消费线程池消费。
从单一指责设计原则出发,根据负载均衡、消息拉取两个不通的职责来看,需要设计两个线程分别处理,并且这两个线程必须相互协作,因为负责队列负载的线程需要指引消息拉取线程具体拉取哪些队列。
从上述流程图中可以看出,RocketMQ的给出的实践要点如下:
大家有没有发现,这不就是典型的生产者-消费者模式吗?没错,这就是。
介绍了线程之间如何协作,接下来再介绍一下如何复用一个线程。
我们还是以RocketMQ中PullMessageService线程的实现为例进行阐述。
实现经典步骤:
上述线程的协作方式非常的优雅,但细想一下,其实也存在问题。
如果“生产者”线程(RebalacneService)出现异常而提停止,PullMessageServicve线程可能会由于一直没有任务执行而一直阻塞,进入假死异常。
这样的问题在上述场景中或许不那么明显,笔者举一个工作中遇到的场景。
我在负责数据同步产品时,需要增量将MySQL数据库中的数据同步到mq、es等其他目标端,就按照数据拉取、数据解析、数据传输等职责设计了多个线程,其线程模型如下图所示:
如果解析线程出现异常(数据同步场景,碰到的最常见的异常:表结构变更引起的异常),导致解析线程不再从任务队列中获取解析任务,然后解析队列将被填满,导致MySQLBinlogPullThread线程的无限阻塞,整个数据同步流程处于假死,问题很严重,如果不及时处理,数据会积压而造成生产故障,那如何处理呢?
不做任何处理的情况下,一个线程是无法感知另外一个线程的异常状态的,从名称来看,调用栈的作用域是线程。
从上面的线程协作模型来看,可以按照职责分为生产者线程、消费者线程。
如果是生产者线程出现异常,我们如何通知消费端线程呢?
笔者在实践过程中的处理方法:首先捕获生产者线程的异常,然后包装一个异常任务ErrorTask,放入到任务队列,然后消费者线程在处理任务时,首先判断任务的类型,如果是ErrorTask,则停止并退出。
给出的基本实现伪代码如下:
1、生产者线程代码
重点在线程内部需要捕获异常,然后将异常封装成一个Error任务,供下游感知。
2、消费者线程代码
消费端线程的角色主要是从任务队列中获取任务,但自身如果出现异常,只会阻塞生产端线程,要通知发送方线程异常,通常的方式是客户端需要持有发送方的对象,并通过其提供的方法进行通知。
发送者线程需要预留检测点与关闭方法,其代码如下:
关键点如下:
消费者线程的实现如下:
本文的介绍就到这里了,线程与线程之间的协作,大家是否Get。
一键三连(关注、点赞、留言)是对我最大的鼓励。
今天的分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。