线程池

notion image

1. 核心参数

核心线程数(corePoolSize)

初始池大小
核心池大小,既然如前原理部分所述。需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到corePoolSize。若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法

最大线程数(maximumPoolSize)

最大池大小(允许同时执行的最大线程数)
池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程

空闲线程存活时间(keepAliveTime)

当线程数大于核心时,多于的空闲线程最多存活时间
keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。

存活时间单位(unit)

keepAliveTime 参数的时间单位

工作队列 workQueue 队列

  • SynchronousQueue 它将任务直接提交给线程而不保持它们。当运行线程小于maxPoolSize时会创建新线程,否则触发异常策略
  • LinkedBlockingQueue 默认无界队列,当运行线程大于corePoolSize时始终放入此队列,此时maxPoolSize无效。当构造LinkedBlockingQueue对象时传入参数,变为有界队列,队列满时,运行线程小于maxPoolSize时会创建新线程,否则触发异常策略
  • ArrayBlockingQueue 有界队列,相对无界队列有利于控制队列大小,队列满时,运行线程小于maxPoolSize时会创建新线程,否则触发异常策略
 
当线程数目超过核心线程数时用于保存任务的队列,此队列仅保存实现Runnable接口的任务。
主要有3种类型的BlockingQueue可供选择:
  • 无界队列
    • 队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现,Executors.newFixedThreadPool采用就是LinkedBlockingQueue
  • 有界队列
    • 常用的有两类:
    • FIFO原则的队列如ArrayBlockingQueue
    • 优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定
    • 使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。
  • 同步移交
    • 如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。
      只有在使用无界线程池或者有饱和策略时才建议使用该队列。

线程工厂(threadFactory)

拒绝策略(rejectedHandler)

  • 直接抛出异常,这也是默认的策略。实现类为 AbortPolicy。
  • 使用调用者所在的线程来执行任务。实现类为 CallerRunsPolicy。
  • 丢弃队列中最靠前的任务并执行当前任务。实现类为 DiscardOldestPolicy。
  • 直接丢弃当前任务。实现类为 DiscardPolicy。
阻塞队列已满且线程数达到最大值时所采取的饱和策略。java默认提供了4种饱和策略的实现方式:
  • 中止(AbortPolicy)
    • 抛出错误RejectedExecutionException
  • 抛弃当前(DiscardPolicy)
    • 抛弃当前的Runnable ,这里是一个空方法,不执行任务
  • 抛弃最旧的(DiscardOldestPolicy)
    • 在队列中弹出队首的任务,执行当前任务
  • 调用者运行(CallerRunsPolicy)
    • 直接运行任务
       

重写拒绝策略

SynchronousQueue是一个特殊的无界队列,它实际上并不会存储元素,而是在生产者把元素放入的同时必须有消费者来领取元素。因此,在大多数情况下,SynchronousQueue的行为类似于一个同步点或者屏障。使用SynchronousQueue作为线程池的队列时,通常意味着线程池会尽可能地重用现有线程,并仅在必要时创建新的线程。 以当尝试将任务放入时,如果没有空闲线程接收该任务,put方法将会阻塞。这意味着在这种情况下,这种拒绝策略实际上会使得提交任务的线程被阻塞,直到有线程可用。

2. 工作原理

数据字典

notion image
线程池的数据字典主要存储线程池工作状态工作线程数量,通过一个32位的Integer类型的原子类对象进行管理和维护,高3位存储线程池工作状态,低29位存储工作线程数量。
字段
功能
实现
ctl
线程池状态、数量控制
AtomicInteger
runState
线程池状态
Integer(32位) 高3位控制
workCount
线程池数量
Integer(32位)低29位控制

线程池生命周期(RunState LifeCycle)

状态
含义
RUNNING
接收新任务,处理工作队列任务
SHUTDOWN
不再接收新任务,但是可以处理工作队列中任务
STOP
不再接收新任务,也不处理工作队列中的任务,打断正在进行中的任务
TIDYING
所有任务已经停止,工作线程数为0,下阶段将执行terminated() 钩子方法
TERMINATED
terminated()方法执行完成

工作流程

任务提交方式

任务提交方式
返回值
exec.execute(runnable)
exec.submit(runnable)
  • 基本没有区别,在submit方法中仍然是调用的execute方法进行任务的执行或进入等待队列或拒绝。
  • submit方法比execute方法多的只是将提交的任务(不管是runnable类型还是callable类型)包装成RunnableFuture然后传递给execute方法执行。
  • submit方法和execute方法最大的不同点在于submit方法可以获取到任务返回值或任务异常信息,execute方法不能获取任务返回值和异常信息。
  • RunnableFuture从名字就可以知道,他既是一个Runnable又是一个Future,所以说submit方法提交的任务被包装成RunnableFuture后,后面执行任务的时候运行的就是RunnableFuture.run()方法,所以最根本的区别在RunnableFuture.run()方法里。所以这里才是重点关注的地方。

工作流程时序

核心方法

ThreadPoolExecutor.execute()

执行线程任务逻辑

ThreadPoolExecutor.submit()

执行线程任务,支持返回值
execute和submit方法对比
方法
特殊处理
返回值
execute
submit
封装成RunnableFuture对象
支持

ThreadPoolExecutor.addWorker()

添加Worker工作线程任务的方法

ThreadPoolExecutor.reject()

线程池的拒绝策略执行方法

ThreadPoolExecutor#Worker.runWorker()

ThreadPoolExecutor#Worker是一个内部类,它继承了AQS类,实现了Runnable接口,因此它具备线程、信号量同步等基础功能,它是ThreadPoolExecutor对线程任务内容的内部封装和加强

ThreadPoolExecutor.processWorkerExit()

Worker工作线程退出方法,处理工作线程回收

ThreadPoolExecutor.getTask()

异常处理

异常流转时序

关于线程池任务的提交分为execute、submit两种方式。
notion image
任务提交方式
异常流转大致节点
execute
①Runnable的try-catch ↓ ②ThreadPoolExecutor的afterExecute方法 ↓ ③ThreadFactory的UncaughtExceptionHandler方法
submit
①Runnable的try-catch ↓ ②FutureTask的run方法,有异常的话会通过setException(ex)收集到Future对象的outcome属性中暂存,当Future.get()方法执行时会根据线程任务执行状态进行report上报,如果有异常会进行抛出

3. 特殊扩展

异常捕获

一般来说有如下几种方式进行线程池异常的处理,根据任务提交execute、submit方式进行区分和适配。
线程池执行submit方法的底层实际也是对execute进行了调用,只是封装了入参对象FutureTask对象,支持返回对象值,也正是因为封装了FutureTask对于异常处理更为特殊,内部实现了try-catch捕获将异常对象调用setException(ex)进行封装,主要是通过Future.get()方法触发report()进行异常上报抛出。
异常一定要进行捕获处理,不要以静默吞噬异常方式忽略它,否则会失控。
任务提交方式
处理方法
注意事项
execute
① ② ③
异常信息传递层较多
submit
需要通过future.get()来触发
  • ① 直接try/catch捕获异常进行处理
  • UncaughtExceptionHandler机制
    • 线程直接重写整个方法:
      • 重写线程池UncaughtExceptionHandler机制
      • ③ 重写protected void afterExecute(Runnable r, Throwable t) {}方法

      线程复用

      线程回收

      总的来说,ThreadPoolExecutor回收线程都是等getTask()获取不到任务,返回null时,调用processWorkerExit()方法从hashSet集合中remove掉线程Worker,getTask()返回null又分为2两种场景:
      • 线程正常执行完任务,并且已经等到超过keepAliveTime时间,大于核心线程数,那么会返回null,结束外层的runWorker中的while循环
      • 当调用shutdown()方法,会将线程池状态置为SHUTDOWN,并且需要等待正在执行的任务执行完,阻塞队列中的任务执行完才能返回null

      4. 常用线程池

      newCategoryExecutor

      • corePoolSize(20):线程池中的核心线程数。即使线程处于空闲状态,也会保留这些线程在池中。
      • maximumPoolSize(20):线程池允许的最大线程数。
      • keepAliveTime(0L):当线程数大于核心线程数时,多余的空闲线程在终止之前等待新任务的最长时间。
      • unit(TimeUnit.SECONDS):keepAliveTime的时间单位,这里是秒。
      • workQueue(new ArrayBlockingQueue<>(300)):用于保存等待执行的任务的阻塞队列。这里使用的是有界的数组阻塞队列,容量为 300。
      • threadFactory(new ThreadFactoryBuilder ().setNameFormat ("newCategoryTask-pool-% d").build ()):用于创建新线程的线程工厂,可以设置线程的名称格式等属性。

      newCachedThreadPool

      参数
      核心线程数
      0
      最大线程数
      MAX
      空闲线程回收时间
      60秒
      工作队列
      SynchronousQueue
      饱和策略
      AbortPolicy

      newFixedThreadPool

      参数
      核心线程数
      nThreads 设置
      最大线程数
      nThreads 设置
      空闲线程回收时间
      0秒(不回收)
      工作队列
      LinkedBlockingQueue(无界队列)
      饱和策略
      AbortPolicy

      newScheduledThreadPool

      参数
      核心线程数
      corePoolSize设置
      最大线程数
      MAX
      空闲线程回收时间
      0秒(不回收)
      工作队列
      DelayedWorkQueue(延迟优先级队列)
      饱和策略
      AbortPolicy

      newSingleThreadExecutor

      参数
      核心线程数
      1
      最大线程数
      1
      空闲线程回收时间
      0秒(不回收)
      工作队列
      LinkedBlockingQueue(无界队列)
      饱和策略
      AbortPolicy

      5. 优点

      • 工作线程可复用,避免频繁创建线程带来的性能损耗
      • 阻塞队列可以很好地控制线程资源的收放,起到缓冲池作用

      6. 注意事项

      使用规约要求

      【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明: Executors 返回的线程池对象的弊端如下:
      1) FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
      2) CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。

      参考

      Loading...
      目录
      文章列表
      王小扬博客
      产品
      Think
      Git
      软件开发
      计算机网络
      CI
      DB
      设计
      缓存
      Docker
      Node
      操作系统
      Java
      大前端
      Nestjs
      其他
      PHP