线程池(一)

概述

这几天准备深入学习有关并发的知识,所以先简单复习了一下JDK自带的并发包,其中首先比较重要的一个就是线程池了。

为什么不无限的创造线程?主要基于以下几个原因:

  1. 线程生命周期的开销非常高
  2. 资源消耗
  3. 稳定性

所谓物极必反,线程的创建和销毁和需要一定的时间,如果所创建的线程工作时间还不如创建销毁的时间长那是得不偿失的,并且当线程创建过多也会对内存造成一定的负担甚至溢出,并且对GC也是极大的消耗,由于存在一定数额的活跃线程也提高了响应性。

线程池

根据《阿里巴巴Java开发手册》中对线程创建的要求

  1. 【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程

由此可见,在正式生产环境中,线程池是唯一的创建线程的方法。而JDK对线程池也有强大的支持。

根据《手册》中的另一点要求

  1. 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
    的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
    说明: Executors 返回的线程池对象的弊端如下:
    1) FixedThreadPool 和 SingleThreadPool:
    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
    2) CachedThreadPool 和 ScheduledThreadPool:
    允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM

虽然Executor为我们提供了很多方便的工厂方法,比如newSingleThreadExecutor(),也有Executors为我们很好的实现了这些工厂方法,但是手动实现ThreadPoolExecutor能让我们对线程池有更深的了解和控制。所以接下来让我们来介绍一下ThreadPoolExecutor这个类。

原理

一个最常见的ThreadPoolExecutor构造函数如下

1
2
3
4
5
6
7
ThreadPoolExecutor(int corePoolSize, 
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize: 活动线程数
  • maximumPoolSize: 线程池上限
  • keepAliveTime: 当线程池中线程数超过corePoolSize后,完成工作后的线程存活时间
  • unit: 单位
    其余的几个参数我们会在后面着重介绍。

这里介绍一下ThreadPoolExecutor的核心工作原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);

workderCountOf()获得当前线程池线程总数,若小于corePoolSize,则直接将任务通过addWorker
()方法执行,否则在workQueue.offer()进入等待队列,若进入失败,则任务直接交给线程池,若线程池达到了maximumPoolSize则提交失败执行拒绝策略

任务队列

BlockingQueue:接口,阻塞队列,数据共享通道

任务队列的作用在于,当线程池中线程数达到corePoolSize的时候,接下来的任务将进入这个队列进行等待,等待执行。

简单原理

服务线程(获取队列信息并处理的线程)在队列为空时进行读等待,有新的消息进入队列后自动唤醒,反之,当队列满时进行写等待直到有消息出队。

不同于常用的offer()和poll()方法,这里我们使用take()和put()方法进行读写。我们以ArrayBlockingQueue的为例子,其中包括了这几个控制对象

1
2
3
final ReentrantLock lick;
private final Condition notEmpty;
private final Condition notFull;

就拿take()来说

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

当队列为空时,他会在notEmpty上进行等待,
在线程等待时,若有新的元素插入,线程就会被唤醒

1
2
3
4
5
6
7
8
9
10
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

常用实现

  • SynchronousQueue(直接提交队列): 一个零容量队列,每个插入操作要对应一个删除操作。提交的任务不会被真实保存,其实就是将新任务交给了线程执行。
  • ArrayBlockingQueue(有界任务队列): 这里就会用到线程池中另一个参数maximumPoolSize, 若当前线程池中线程小于corePoolSize则直接在线程池中增加线程,若大于,则加入该任务队列,若队列满则继续加入线程池,若线程池中数目多余maximumPoolSize则执行拒绝策略。
  • LinkedBlockingQueue(无界任务队列):如果未指定容量,那么容量将等于 Integer.MAX_VALUE。只要插入元素不会使双端队列超出容量,每次插入后都将动态地创建链接节点。
  • PriorityBlockingQueue(优先任务队列): 一个特殊的无界任务队列,前面两者都是按FIFO的顺序执行,而这个是可以按照优先级执行。

    拒绝策略

    JDK内置拒绝策略如下
  • AboerPolicy(默认):直接抛出异常,阻止系统正常工作。
  • CallerRunsPolicy: 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。(伪丢弃,但是任务提交线程性能大幅度下降)
  • DiscardOledestPolicy:和名字一样,丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
  • DiscardPolicy: 丢弃无法处理的任务,不给任何处理。

异常堆栈

首先给出一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Main implements Runnable{
int a, b;
public Main(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
int ret = a / b;
System.out.println(ret);
}

public static void main(String[] args) {
ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
for (int i = 0; i < 5; i++) {
pools.submit(new Main(100, i));
}
}
}
/* 结果如下
100
33
50
25

*/

可以发现,其中一个显然的异常除数为0不见了,我们可以通过将submit方法改为execute方法来打印部分异常信息,但是我们仍然不能发现他的调用线程在哪儿。
这里我们通过扩展线程池给出一种解决办法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package first_maven;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main extends ThreadPoolExecutor{

public Main(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable task) {
super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
}

@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}

private Exception clientTrace() {
return new Exception("Client stack trace");
}

private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
return new Runnable() {
@Override
public void run() {
try {
task.run();
} catch(Exception e) {
clientStack.printStackTrace();
System.out.println(" 1212");
throw e;
}
}
};
}
}

我们通过扩展ThreadPoolExecutor,将要执行的Runnable进行包装,通过手动创建异常,获取当前主线程的调用堆栈,从而得到线程池的调用信息,并打印相应的运行异常,这样我们就可以追踪到完整的异常信息。

总结

在使用多线程的时候,要通过ThreadPoolExecutor来手动创建,根据当前任务的需求分配相应的线程池大小和阻塞队列以及拒绝策略,这样才能知根知底。

五. References

《实战Java高并发程序设计》

《阿里巴巴Java开发手册》