Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说原 java 多线程 ThreadPoolExecutor 接收并处理数据[亲测有效],希望能够帮助你!!!。
1. 一般的互联网项目,都涉及多数据的处理,这个是再常见不过的事情了,如果是但线程去对数据做处理,明显性能上是慢了很多,那么有没有什么好的方式呐?
当然有,这就是java本身的多线程机制对应java 多线程的问题,有一大堆的demo去做参考,在jdk的的 java.util.concurrent 包下,提供了很多的可以使用的api ,不再类述了。。。
2. 主要用的到是ThreadPoolExecutor:常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
ThreadPoolExecutor.AbortPolicy()
抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy()
重试添加当前的任务,他会自动重复调用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务
ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务
3. 最近在做一个项目,将这个使用记一下,方便发现问题和后期做修改。其他不说了,主要看代码:
public class AcceptServer implements Runnable {
public final Logger logger = Logger.getLogger(AcceptServer.class);
//服务端,用来接收来自c程序发送的字节码数据.
private ServerSocket server = null;
//线程池
public static ThreadPoolExecutor executor = null;
/**
* 初始化方法,工程启动时调用
*/
public void initRevc(){
try {
executor = new ThreadPoolExecutor(200,300,1,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(300),
new ThreadPoolExecutor.CallerRunsPolicy()
);
logger.debug("PersonaeServer start to run");
server = new ServerSocket(Integer.parseInt(getOperatePort("ServerPort","25105")));
AcceptServer recv = new AcceptServer();
recv.setServer(server);
Thread thread = new Thread(recv);
thread.start();
} catch (IOException e) {
logger.error(e.getMessage());
} catch (Exception ex) {
logger.error(ex.getMessage());
}
}
public void run() {
Socket client = null;
while(true){
try {
logger.debug("PersonaeServer befor server.accept()");
client = this.getServer().accept();
client.setSoTimeout(60000*10);
logger.debug("PersonaeServer after server.accept()");
//对流文件做处理的入口方式
QueryReader.parseByteReader(new BufferedInputStream(client.getInputStream()), client);
} catch (IOException e) {
logger.error(e.getMessage());
} catch (Exception ex) {
logger.error(ex.getMessage());
}
}
}
/**
* 读配置文件取接收端口
* @param property
* @param defaultValue
* @return
*/
private String getOperatePort(String property,String defaultValue){
Properties prop = new Properties();
InputStream fis = getClass().getClassLoader().getResourceAsStream("config.properties");
try {
prop.load(fis);
} catch (IOException e) {
logger.error(e.getMessage());
}
String value = prop.getProperty(property,defaultValue).trim();
logger.debug(property+" value=="+value);
return value;
}
public ServerSocket getServer() {
return server;
}
public void setServer(ServerSocket server) {
this.server = server;
}
}
通过这个框架代码,就可以用来在服务端处理客户端发过来的请求,并做出相应的相应。
QueryReader.parseByteReader(new BufferedInputStream(client.getInputStream()), client); 主要是对数据做处理的,并返回相应的成功代码的。
4. 部署注意点:
1). 此方法采用动态读取ip和端口的方式来获取客户端的流信息,所以要动态的配置config.properties文件中的ip和端口;
2). 采用spring的ben工程来初始化,将该线程放入beanFactory中去:
<bean class="com.AcceptServer " init-method="initRevc">
5. 在使用多线程处理问题的时候,可以根据自己的习惯,采用java.util.concurrent 包下提供的api 来做对数据,对业务的处理工作。