原 java 多线程 ThreadPoolExecutor 接收并处理数据[亲测有效]

Java (54) 2023-07-22 11:12

Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说原 java 多线程 ThreadPoolExecutor 接收并处理数据[亲测有效],希望能够帮助你!!!。

1. 一般的互联网项目,都涉及多数据的处理,这个是再常见不过的事情了,如果是但线程去对数据做处理,明显性能上是慢了很多,那么有没有什么好的方式呐?

当然有,这就是java本身的多线程机制对应java 多线程的问题,有一大堆的demo去做参考,在jdk的的 java.util.concurrent 包下,提供了很多的可以使用的api ,不再类述了。。。

2. 主要用的到是ThreadPoolExecutor:常用构造方法为:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit,

BlockingQueue<Runnable> workQueue,

RejectedExecutionHandler handler)

corePoolSize: 线程池维护线程的最少数量

maximumPoolSize:线程池维护线程的最大数量

keepAliveTime: 线程池维护线程所允许的空闲时间

unit: 线程池维护线程所允许的空闲时间的单位

workQueue: 线程池所使用的缓冲队列

handler: 线程池对拒绝任务的处理策略

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。

当一个任务通过execute(Runnable)方法欲添加到线程池时:

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。

也就是:处理任务的优先级为:

核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:

NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue

handler有四个选择:

ThreadPoolExecutor.AbortPolicy()

抛出java.util.concurrent.RejectedExecutionException异常

ThreadPoolExecutor.CallerRunsPolicy()

重试添加当前的任务,他会自动重复调用execute()方法

ThreadPoolExecutor.DiscardOldestPolicy()

抛弃旧的任务

ThreadPoolExecutor.DiscardPolicy()

抛弃当前的任务

3. 最近在做一个项目,将这个使用记一下,方便发现问题和后期做修改。其他不说了,主要看代码:

public class AcceptServer implements Runnable {

public final Logger logger = Logger.getLogger(AcceptServer.class);

//服务端,用来接收来自c程序发送的字节码数据.

private ServerSocket server = null;

//线程池

public static ThreadPoolExecutor executor = null;

/**

* 初始化方法,工程启动时调用

*/

public void initRevc(){

try {

executor = new ThreadPoolExecutor(200,300,1,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(300),

new ThreadPoolExecutor.CallerRunsPolicy()

);

logger.debug("PersonaeServer start to run");

server = new ServerSocket(Integer.parseInt(getOperatePort("ServerPort","25105")));

AcceptServer recv = new AcceptServer();

recv.setServer(server);

Thread thread = new Thread(recv);

thread.start();

} catch (IOException e) {

logger.error(e.getMessage());

} catch (Exception ex) {

logger.error(ex.getMessage());

}

}

public void run() {

Socket client = null;

while(true){

try {

logger.debug("PersonaeServer befor server.accept()");

client = this.getServer().accept();

client.setSoTimeout(60000*10);

logger.debug("PersonaeServer after server.accept()");

//对流文件做处理的入口方式

QueryReader.parseByteReader(new BufferedInputStream(client.getInputStream()), client);

} catch (IOException e) {

logger.error(e.getMessage());

} catch (Exception ex) {

logger.error(ex.getMessage());

}

}

}

/**

* 读配置文件取接收端口

* @param property

* @param defaultValue

* @return

*/

private String getOperatePort(String property,String defaultValue){

Properties prop = new Properties();

InputStream fis = getClass().getClassLoader().getResourceAsStream("config.properties");

try {

prop.load(fis);

} catch (IOException e) {

logger.error(e.getMessage());

}

String value = prop.getProperty(property,defaultValue).trim();

logger.debug(property+" value=="+value);

return value;

}

public ServerSocket getServer() {

return server;

}

public void setServer(ServerSocket server) {

this.server = server;

}

}

通过这个框架代码,就可以用来在服务端处理客户端发过来的请求,并做出相应的相应。

QueryReader.parseByteReader(new BufferedInputStream(client.getInputStream()), client); 主要是对数据做处理的,并返回相应的成功代码的。

4. 部署注意点:

1). 此方法采用动态读取ip和端口的方式来获取客户端的流信息,所以要动态的配置config.properties文件中的ip和端口;

2). 采用spring的ben工程来初始化,将该线程放入beanFactory中去:

<bean class="com.AcceptServer " init-method="initRevc">

5. 在使用多线程处理问题的时候,可以根据自己的习惯,采用java.util.concurrent 包下提供的api 来做对数据,对业务的处理工作。

原 java 多线程 ThreadPoolExecutor 接收并处理数据[亲测有效]_https://bianchenghao6.com/blog_Java_第1张

发表回复