线程池

本文最后更新于:星期一, 九月 12日 2022, 1:25 凌晨

概念

一般情况下,客户端传入任务,需要服务端快速处理并返回结果。如果服务端每接受到一个任务,就创建一个线程,这种方式将会创建数以万记的线程。使操作系统频繁的进行线程上下文切换,无故增加系统负载,而线程的创建和消亡都是需要耗费系统资源,也无疑浪费了系统资源。
线程池能很好地解决这个问题,它预先创建了若干数量的线程,并且不由用户直接对线程的创建进行控制,在此前提下重复使用固定或较为固定数目的线程来完成任务的执行。

目的

  • 提高线程复用能力
  • 消除了频繁创建和销毁线程的系统资源开销
  • 避免创建过多的线程耗尽进程内存空间,同时减少线程上下文切换次数
  • 充分利用CPU多核资源,最大限度的利用多核提升应用程序性能

java线程池详解

java5中增加了内置线程池实现ThreadPoolExecutor,构造方法如下

public ThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      RejectedExecutionHandler handler) {
      // 。。。
}

参数

  • corePoolSize: 最大核心线程数
  • maximumPoolSize: 最大线程数
  • keepAliveTime: 非核心线程空闲存活时间
  • unit: 空闲存活时间单位
  • workQueue: 存放任务的阻塞队列
  • threadFactory: 创建新线程的工厂,所有线程都通过该工厂创建,有默认实现。可自定义线程名字,但是默认线程名称格式为**pool-<线程池编号>-thread-<线程编号>**,这对于监控和日志输出并不明显,所以最好自定义线程名(见参考资料2)
  • handler: 拒绝执行任务策略

流程

创建完ThreadPoolExecutor,当有任务提交时,使用execute或submit方法来执行相关线程池操作

执行流程图如下

线程池执行流程

  1. 如果线程池中存活的核心线程数小于最大核心线程数corePoolSize,线程池创建一个核心线程去处理提交的任务
  2. 如果线程池中核心线程数已满,即线程数已等于corePoolSize,当有一个新提交的任务时,会被放进任务队列workQueue排队等待执行
  3. 当线程池里面存活的线程数已等于corePoolSize,且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即判断最大线程数是否已满,如果没满,创建一个非核心线程执行提交的任务
  4. 如果当前线程数达到了maximumPoolSize,还有新的任务要执行,直接采用拒绝策略处理

execute与submit的区别

  • execute适用于不需要关注返回值的场景
  • submit方法适用于需要关注返回值的场景

下面有几个注意点,非常重要,无论你是面试官还是候选人都必须很清楚下列这些概念和原理

线程池类型

可通过自定义ThreadPoolExecutor或用jdk内置的Executors来创建一系列的线程池(不推荐使用内置的Executors创建,建议自定义ThreadPoolExecutor,后面会详细说明这一点)

  • newFixedThreadPool
    创建固定线程数量的线程池,用于已知并发压力情况下,对线程数做限制的场景,比较适合执行时间长的任务
  • newSingleThreadExecutor
    创建只有一个线程的线程池,用于需要保证顺序执行的场景,并只有一个线程在执行,比较适合一个任务接一个任务执行的场景
  • newCachedThreadPool
    创建线程数量会自动扩容, 自动销毁的线程池。可无限扩大,比较适合处理执行时间较短的任务
  • newScheduledThreadPool
    创建支持定时任务的线程池。可延时启动,定时启动,用于需要多个后台线程执行周期任务的场景
  • newWorkStealingPool
    Java8开始才有的,内部会构建ForkJoinPool并行处理任务,不保证处理顺序。拥有多个任务队列,可减少连接数,创建当前可用cpu数量的线程来并行执行。适合使用在很耗时的任务中

相应的源码实现在Executors类中

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
     
public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool(parallelism,
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    null, true);
}

ScheduledThreadPoolExecutor类

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
}

ForkJoinPool类

public ForkJoinPool(int parallelism,
       ForkJoinWorkerThreadFactory factory,
       UncaughtExceptionHandler handler,
       boolean asyncMode) {
    this(parallelism, factory, handler, asyncMode,0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
}

为啥不建议使用Executors创建线程池

弊端如下

  • FixedThreadPool和SingleThreadExecutor
    看上述源码,使用的队列为LinkedBlockingQueue,该队列允许的队列最大长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
  • CachedThreadPool和ScheduledThreadPool
    还是看上述源码,允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM

所以在有大量请求的线程池场景中, 更推荐自定义ThreadPoolExecutor来创建线程池

核心线程

默认情况

  • 核心线程不会预先创建,只有在有任务时才会创建。
  • 核心线程不会因为空闲而被终止,keepAliveTime参数不适用于它

但是ThreadPoolExecutor类中有如下方法,可以改变这些默认情况

//预先创建所有核心线程
public int prestartAllCoreThreads()
//创建一个核心线程,若所有核心线程都已创建,则返回false
public boolean prestartCoreThread()
//方法参数设定为true,则keepAliveTime参数也适用于核心线程
public void allowCoreThreadTimeOut(boolean value)

keepAliveTime

非核心线程执行任务完毕,并不是马上被销毁,而是等待一段时间,再被销毁
目的有两方面

  • 如果执行任务完毕,线程不被销毁,也没有keepAliveTime,那么此线程会永远堆积在线程池中,一旦这样的线程数量达到maximumPoolSize上限,这样任务队列workQueue中正在排队等待的任务永远不会进入线程池被执行,而是会被拒绝策略直接处理掉。所以要及时销毁,让线程池中的线程数不会达到maximumPoolSize,方便任务队列workQueue中正在排队等待的任务进线程池,被新创建的非核心线程执行,而不是让这些正在等待的任务被拒绝策略处理
  • 见前述,线程池目的是减少频繁创建和销毁线程开销。空闲等待的线程如果还没到keepAliveTime,此时任务队列workQueue中正在排队等待的任务可以进入线程池被此线程执行,这样就最大化利用已有线程进行任务操作。如果马上销毁,任务进线程池后还需要重新创建一个非核心线程执行提交的任务,增加了创建和销毁线程开销。任务队列workQueue中如果没有正在排队等待的任务,则在达到keepAliveTime后,马上被销毁

常见任务队列workQueue

先描述线程池类型源码中出现的几个队列

  • LinkedBlockingQueue
    可设置容量队列,基于链表结构的阻塞队列,按FIFO排序,容量可自行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。默认无界。newFixedThreadPool线程池使用此队列

  • SynchronousQueue
    一个不存储元素,没有实际存储空间的同步阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene。newCachedThreadPool线程池使用此队列

  • DelayedWorkQueue
    延迟队列,是一个任务定时周期的延迟执行队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用此队列

ThreadPoolExecutor还支持自定义队列来实现,主要会用到下面这两个

  • ArrayBlockingQueue
    有界队列,是一个用数组实现的有界阻塞队列,按FIFO排序量
  • PriorityBlockingQueue
    优先级队列,是具有优先级的,基于堆的无界阻塞队列

注意: 由前文为啥不建议使用Executors创建线程池可知,无界队列允许的队列最大长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM,因此不建议自定义队列使用无界队列,怕没设置队列长度引发OOM。
这也是不建议使用Executors的最重要原因

拒绝策略

JDK内置4种线程池拒绝策略,但是最好还是自定义拒绝策略

  1. CallerRunsPolicy(调用者运行策略)
  • 概念
    只要线程池没有关闭,就由提交任务的当前线程处理。
  • 使用场景
    一般在不允许失败、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但由于是调用者线程自己执行,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就低了
  1. AbortPolicy(中止策略)
  • 概念
    直接抛出拒绝执行的异常,意思也就是打断当前执行流程
  • 使用场景
    没有特殊场景,但要正确处理抛出的异常

ThreadPoolExecutor中默认的策略就是AbortPolicy

  1. DiscardPolicy(丢弃策略)
  • 概念
    直接丢弃任务,不触发任何动作
  • 使用场景
    没有。如果提交的任务无关紧要,可以使用这个策略。毫无声息的丢弃任务。但是基本上提交的任务都是有用的,所以这个策略基本不会被用到
  1. DiscardOldestPolicy(丢弃最老策略)
  • 概念
    如果线程池未关闭,就弹出任务队列workQueue头部的任务元素,然后尝试执行
  • 使用场景
    这个策略也会丢弃任务,而且也是毫无声息的丢弃任务,但特点是丢弃的是任务是排队等待的任务中最老的那个任务,而且会等待执行优先级较高的任务
  1. 自定义拒绝策略
    实现RejectedExecutionHandler接口,编写自定义的RejectHandler 。来实现自己的拒绝策略

建议自定义拒绝策略原因

  1. 使用默认AbortPolicy时,抛出的拒绝执行的异常是RejectedExecutionException。这是个运行时异常,对于运行时异常编译器并不强制catch它,所以默认拒绝策略要慎重使用。所以在线程池处理的任务非常重要时,建议自定义拒绝策略
  2. 执行execute方法时,如果任务在执行过程中出现运行时异常,会导致当前执行任务的线程自动终止;但最致命的是任务虽然异常了,但是却获取不到任何通知,这会让人误以为任务都执行正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理。详见《Java 并发编程实战》7.3节“处理非正常的线程终止”,详细介绍了异常处理的方案

线程池关闭

调用shutdownNow和shutdown两个方法来实现

  • shutdownNow
    对正在执行的任务全部发出interrupt(),停止执行,对还未开始执行的任务全部取消,并且返回还没开始的任务列表
  • shutdown
    调用shutdown后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务

关闭线程池之后,可用isTerminated来判断所有的线程是否执行完成,千万不要用isShutdown,isShutdown只是返回是否调用过shutdown的结果

线程池大小选择策略

线程池大小不合适,太多或太少,都会导致麻烦,需要去考虑一个合适的线程池大小。

一般思路

  • CPU密集型
    如果任务主要是进行计算,那就意味着CPU的处理能力是稀缺的资源,不能通过大量增加线程数提高计算能力,因为如果线程太多,反而会导致大量的上下文切换开销。所以,通常建议线程池大小按照CPU核的数目N或者N+1设置

  • IO密集型
    如果是需要较多等待的任务,比如I/O操作较多,可考虑的计算方法

    线程数 = CPU核数 × 目标CPU利用率 ×(1 + 平均等待时间/平均工作时间)

但是这些都不是精准预计,需要根据测试或者分析进行计算,在实际中验证和调整

参考资料

  1. Java线程池必知的8大拒绝策略

  2. 创建线程以及线程池时候要指定与业务相关的名字,以便于追溯问题


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!