并发编程学习与入门-5

线程池

线程池

new Thread 弊端

1.每次 new Thread 新建对象,性能差

2.线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或者 OOM

3.缺少更多功能,如更多执行,定期执行,线程中断

线程池的好处

1.重用存在的线程,减少对象创建,消亡的开销,性能佳

2.可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞

3.提供定时执行,定期执行,单线程,并发数控制等功能

ThreadPoolExecutor

参数:
corePoolSize: 核心线程数量
maximumPoolSize: 线程最大线程数
workQueue: 阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响

说明:如果运行线程数少于corePoolSize的时候,直接创建新线程来处理任务(即使线程池里面的其他线程是空闲状态的).如果运行线程数量大于等于corePoolSize
并且小于maximumPoolSize的时候,只有当workQueue满的时候才会创建新的线程去处理任务.如果corePoolSize和maximumPoolSize相等的时候,则创建的线程池线程数是固定的,
这时如果有新任务提交,如果workQueue还没满的时候,就把请求放到workQueue里面等待有空闲的线程去执行任务.如果运行的线程数大于maximumPoolSize并且workQueue
也已经满的时候,那么就由拒绝策略指定的方式去处理任务

keepAliveTime: 线程没有任务执行的时候自多保持多久时间终止
unit: keepAliveTime的时间单位
threadFactory: 线程工厂,用来创建线程
rejectHandler: 当拒绝处理任务的时候的策略
  jdk默认提供了四种拒绝策略:
  CallerRunsPolicy - 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,
  如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
  AbortPolicy - 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,
  影响后续的任务执行。
  DiscardPolicy - 直接丢弃,其他啥都没有
  DiscardOldestPolicy -  当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入

常用方法:
execute() : 提交任务,交给线程池执行
submit() : 提交任务,能够返回执行结果,execute+Future
shutdown(): 关闭线程池,等待任务都执行完
shutdownNow(): 关闭线程池,不等待任务执行完

getTaskCount(): 线程池已执行和未执行的任务总数
getCompletedTaskCount():已完成的任务数量
getPoolSize(): 线程池当前的线程数量
getActiveCount(): 当前线程池中正在执行任务的线程数量

线程池 - Executor 框架接口

1.Executors.newCachedThreadPool 可缓存的线程池,,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程;它的线程数是最大的,无限的。但是核心线程数为 0,这没关系。这里要考虑线程的摧毁,因为不能够无限的创建新的线程,所以在一定时间内要摧毁空闲的线程

源码如下:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {

ExecutorService executorService = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) { // 放入10个任务
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}", index);
}
});
}
executorService.shutdown();
}

2.Executors.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待;传入一个固定的核心线程数,其核心线程数等于最大线程数,而且它们的线程数存活时间都是无限的.

源码如下:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(3);

for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}", index);
}
});
}
executorService.shutdown();
}

3.Executors.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行,内部有一个延时的阻塞队列来维护任务的进行.

源码如下:

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {

ExecutorService executorService = Executors.newSingleThreadExecutor();

for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}", index);
}
});
}
executorService.shutdown();
}

4.Executors.newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行;其默认的时间参数为 0 意味着无限的生命,就不会被摧毁了

源码如下:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  public static void main(String[] args) {

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

// executorService.schedule(new Runnable() {
// @Override
// public void run() {
// log.warn("schedule run");
// }
// }, 3, TimeUnit.SECONDS);

executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
log.warn("schedule run");
}
}, 1, 3, TimeUnit.SECONDS);
// executorService.shutdown();

Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
log.warn("timer run");
}
}, new Date(), 5 * 1000);
}

5.Executors.newWorkStealingPool 是 JDK1.8 版本加入的一种线程池,stealing 翻译为抢断、窃取的意思,它实现的一个线程池和上面 4 种都不一样,用的是 ForkJoinPool 类;最明显的用意就是它是一个并行的线程池,参数中传入的是一个线程并发的数量,这里和之前就有很明显的区别,前面 4 种线程池都有核心线程数、最大线程数等等,而这就使用了一个并发线程数解决问题。该线程池不会保证任务的顺序执行,也就是 WorkStealing 的意思,抢占式的工作。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

线程池的合理配置

1.CPU 密集型任务,就需要尽量压榨 CPU,参考值可设置为 NCPU+1

2.IO 密集型任务,参考值可设置为 2 * NCPU