JUC线程池

JUC的线程池包含了很多并发环境下使用的工具和类,包括FutureTask、ThreadPoolExecutor、ScheduledThreadPoolExecutor等。

线程池的存在能够对线程进行统一分配、调优和监控:

  • 避免程序无限制地创建线程,降低资源消耗
  • 提高响应速度
  • 提高线程的复用率和可管理性

FutureTask

Future是JUC下的一个接口,它代表了一个任务的生命周期,提供了一种在异步计算完成后获取结果的机制。

RunnableFuture接口继承了Runnable和Future,是一个可被运行的任务,同时也能返回异步计算的结果。

1
2
3
4
5
6
7
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

FutureTask类则是实现了RunnableFuture接口,因此既能当作一个Runnable直接被Thread执行,也可以作为Future来得到Callable的计算结果;同时提供了相应方法的实现,例如:获取任务执行结果(get)、取消任务(cancel)等。

1
public class FutureTask<V> implements RunnableFuture<V>

FutureTask的线程安全性是由CAS来保证的。


ThreadPoolExecutor

一般有两种方式去创建线程池:

  • 通过Executors类的内置方法来创建
  • 通过ThreadPoolExecutor的构造函数来创建(推荐)

但是,并不建议使用第一种方式。

为什么不建议用Executors来创建线程池

Executors有四个内置方法用来创建四种类型的线程池:

  • newCachedThreadPool:创建一个可缓存线程池,若线程池长度超过处理需要,则可回收空闲线程,没有空闲则创建新线程
  • newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
  • newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行
  • newSingleThreadExecutor:创建一个单线程化的线程池,只会用唯一的工作线程来执行任务,按照指定的调度顺序来执行

上述方法创建线程池时,无法让程序员能够显式地控制,容易出现资源耗尽的风险。例如:

  • newFixedThreadPoolnewSingleThreadExecutor会产生大量的堆积请求队列,内存消耗极大,容易OOM
  • newCachedThreadPoolnewScheduledThreadPool容纳的线程最大数是Integer.MAX_VALUE,创建的线程数过多,也会OOM

ThreadPoolExecutor数据结构

ThreadPoolExecutor的一种构造函数如下:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

由此可知与线程池相关的参数包括:

  • corePoolSize:核心线程数。这是线程池中保持活动状态的线程数量的下限,即创建新的线程直到活跃线程数达到corePoolSize;如果当前线程数等于corePoolSize,那么后续提交的任务会保存到阻塞队列中等待。
  • maximumPoolSize:最大线程数。如果阻塞队列满了,并且仍有任务提交上来,那么会创建新的线程来执行任务,直到线程数抵达maximumPoolSize,后续再出现的任务则会根据饱和策略进行处理;如果阻塞队列是无界队列,那么这个参数就没用了
  • keepAliveTime:线程空闲时的存活时间。
  • unit:存活时间的单位
  • workQueue:保存等待被执行的任务的阻塞队列。具体可看JUC集合这篇文章。
    • ArrayBlockingQueue:基于数组的有界阻塞队列,按照FIFO调度任务
    • LinkedBlockingQueue:基于链表的阻塞队列,按照FIFO调度任务,吞吐量一般高于ArrayBlockingQueue
    • SynchronousQueue:这个阻塞队列不存储任何元素,采用「即时消费,即时生产」的方式。即:一个线程若向SynchronousQueue插入元素,则必须有一个消费者线程准备好接收该元素,否则会阻塞;反之亦然
    • PriorityBlockingQueue:基于优先级的无界阻塞队列,按照优先级调度任务
  • handler:线程池的饱和策略。
    • AbortPolicy:直接抛出一个RejectedExecutionException,默认是这个策略
    • CallerRunsPolicy:用调用者所在的线程来执行任务
    • DiscardOldestPolicy:丢弃阻塞队列中最老(最靠前)的任务,并执行当前任务
    • DiscardPolicy:直接丢弃当前的任务

线程池处理任务全流程

线程池处理任务的全流程如图所示:

ThreadPool
  • 提交任务后,首先判断核心线程的数量是否超过corePoolSize;若没有超过,则创建线程来处理任务;否则,将任务加入到workQueue

  • 然后判断workQueue是否已满,若未满,则加入;否则,尝试创建线程来处理

  • 判断当前线程数是否已达到maximumPoolSize,若未过量,则创建新线程来处理任务;否则,按照handler指定的饱和策略来处理

配置线程池要考虑哪些因素

一般考虑任务的优先级、任务的执行时间长短、任务的性质、任务的依赖关系这四个方面的因素,尽可能使用有界的阻塞队列。

另外,如何设置线程池的大小主要由任务的性质来决定。

  • CPU密集型任务:主要消耗CPU资源,可将线程数设置为N+1(N为CPU核心数)。比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用CPU的空闲时间
  • I/O密集型任务:主要处理I/O交互,可将线程数设置为2N。线程在处理I/O的时间段内不会占用CPU,这时就可以将CPU交出给其它线程使用。因此在I/O密集型任务的应用中,可以多配置一些线程

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,一般用于周期性任务或者任务延迟。与ThreadPoolExecutor不同的是:

  • 使用专门的任务类型ScheduledFutureTask来执行周期任务
  • 使用专门的存储队列DelayedWorkQueue来存储任务,这是无界阻塞队列DelayQueue的一种