java高并发线程池_多线程和并发

Java (2) 2024-09-09 20:23

Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说
java高并发线程池_多线程和并发,希望能够帮助你!!!。

作者:丁威

原文链接:https://mp.weixin..com/s/p_Z8q09bewQ6aESY7VcsEw

如果大家从事的是业务开发,在工作中如果涉及到并发,通常是引入线程池来实现并发。

但如果从事的是基础框架的开发,通常并不会直接使用线程池,会按需创建单个线程,并且为了职责的单一与提升性能,通常单个线程只会负责一个流程中的部分功能,多个线程紧密配合。

那线程与线程之间如何协作呢?如果子线程出现异常,如何通知主线程呢?

可以明确的告诉大家,子线程抛出异常,主线程是无法捕获到异常的。

请带着上述问题,开始本文的学习。

1、两个线程如何协作

这里我想拿RocketMQ中的一个场景与大家来分享,通常这也是两个线程进行协作十分经典的使用示例

在RocketMQ消费端,PUSH模式消费者在消费之前需要提前做好准备:

  • 队列负载均衡
  • 消息拉取

这里简单介绍一下相关实现:在RocketMQ消费者PUSH模式启动后,消费组中的成员发生了变化,需要进行重平衡,即进行队列重新负载,主要的依据是查询当前队列的总个数与当前消费者个数,然后使用负载均衡算法(例如平均分配),各个消费者获取分配后的队列后,依次向Broker服务端拉取消息,然后提交消费线程池消费。

单一指责设计原则出发,根据负载均衡、消息拉取两个不通的职责来看,需要设计两个线程分别处理,并且这两个线程必须相互协作,因为负责队列负载的线程需要指引消息拉取线程具体拉取哪些队列。

java高并发线程池_多线程和并发_https://bianchenghao6.com/blog_Java_第1张

从上述流程图中可以看出,RocketMQ的给出的实践要点如下:

  • 引入一个阻塞队列(多线程安全),充当任务队列,两个线程都可以访问。
  • PullMessageService通过调用阻塞队列的take()方法获取任务,如果阻塞队列(任务队列)中没有待处理任务,线程则阻塞,非常关键,避免线程空轮询,造成CPU飙升。
  • RebalanceService线程的职责是根据任务负载算法生成任务,放入任务队列中,从而能够唤醒PullMessageService线程。

大家有没有发现,这不就是典型的生产者-消费者模式吗?没错,这就是。

介绍了线程之间如何协作,接下来再介绍一下如何复用一个线程。

我们还是以RocketMQ中PullMessageService线程的实现为例进行阐述。

java高并发线程池_多线程和并发_https://bianchenghao6.com/blog_Java_第2张

实现经典步骤:

  • 使用 while() 进行循环,这里通常有两种实现方法:
    • while(Thread.interrupted()),即检测当前线程的中断状态,如果要停止该线程,通常是在其他线程中调用要中断线程的interrupt()方法。
    • while(!this.stoped) stoped会使用volatile关键字首饰,如果要停止该线程,并且需要暴露一个方法,用于将stoped设置为true,从而跳出循环。
  • 在while方法中从一个阻塞队列中获取待执行的任务,没有任务执行时当前线程阻塞,不消耗CPU资源。

2、如何处理异常情况

上述线程的协作方式非常的优雅,但细想一下,其实也存在问题。

如果“生产者”线程(RebalacneService)出现异常而提停止,PullMessageServicve线程可能会由于一直没有任务执行而一直阻塞,进入假死异常。

这样的问题在上述场景中或许不那么明显,笔者举一个工作中遇到的场景。

我在负责数据同步产品时,需要增量将MySQL数据库中的数据同步到mq、es等其他目标端,就按照数据拉取、数据解析、数据传输等职责设计了多个线程,其线程模型如下图所示:

java高并发线程池_多线程和并发_https://bianchenghao6.com/blog_Java_第3张

如果解析线程出现异常(数据同步场景,碰到的最常见的异常:表结构变更引起的异常),导致解析线程不再从任务队列中获取解析任务,然后解析队列将被填满,导致MySQLBinlogPullThread线程的无限阻塞,整个数据同步流程处于假死,问题很严重,如果不及时处理,数据会积压而造成生产故障,那如何处理呢?

不做任何处理的情况下,一个线程是无法感知另外一个线程的异常状态的,从名称来看,调用栈的作用域是线程。

从上面的线程协作模型来看,可以按照职责分为生产者线程、消费者线程。

  • 生产者线程,向任务队列中添加任务
  • 消费者线程,从任务队列中获取任务并处理

2.1 生产者线程异常如何通知消费线程

如果是生产者线程出现异常,我们如何通知消费端线程呢?

笔者在实践过程中的处理方法:首先捕获生产者线程的异常,然后包装一个异常任务ErrorTask,放入到任务队列,然后消费者线程在处理任务时,首先判断任务的类型,如果是ErrorTask,则停止并退出。

给出的基本实现伪代码如下:

1、生产者线程代码

java高并发线程池_多线程和并发_https://bianchenghao6.com/blog_Java_第4张

重点在线程内部需要捕获异常,然后将异常封装成一个Error任务,供下游感知。

2、消费者线程代码

java高并发线程池_多线程和并发_https://bianchenghao6.com/blog_Java_第5张

2.2 消费端线程异常如何通知生产线程

消费端线程的角色主要是从任务队列中获取任务,但自身如果出现异常,只会阻塞生产端线程,要通知发送方线程异常,通常的方式是客户端需要持有发送方的对象,并通过其提供的方法进行通知

发送者线程需要预留检测点与关闭方法,其代码如下:

java高并发线程池_多线程和并发_https://bianchenghao6.com/blog_Java_第6张

关键点如下:

  • run方法中使用while(!stoped)方式进行循环,内嵌一个阻塞队列。
  • 线程提供一个改变线程状态运行的方法,shutdown,并传人是否是因为异常退出,供其他线程调用。

消费者线程的实现如下:

java高并发线程池_多线程和并发_https://bianchenghao6.com/blog_Java_第7张

本文的介绍就到这里了,线程与线程之间的协作,大家是否Get。

一键三连(关注、点赞、留言)是对我最大的鼓励。

今天的分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

发表回复