本文是笔者浅析Executor框架系列第三篇,主要介绍下Exector的核心实现类ThreadPoolExecutor,该类便是我们常说的线程池的实现。至于线程池的好处有很多,比如复用,统一管理,批量处理等等,这里不再赘述。本文主要探讨ThreadPoolExecutor的具体实现,看一看一个任务放入线程池后是如何被执行的。下面具体看看它们的实现。
ThreadPoolExecutor
我们首先看一下如何创建一个线程池,ThreadPoolExecutor有四个发布的构造函数,其中三个会调用第四个,只是指定不同的默认实现。因此我们这里只看下第四个构造函数的实现,来了解一下构建一个线程池所需要的参数。它的源码如下:
1 | public ThreadPoolExecutor(int corePoolSize, |
下面介绍一下各个参数的含义:
- corePoolSize:核心线程池大小。在线程池被创建后,池里是没有线程的,只有当有任务进来时才会创建线程去执行任务(当然也可以调用prestartAllCoreThreads或者prestartAllCoreThreads去预先创建线程)。而核心线程大小表示当池中线程少于这个值时,每有任务进来,都会创建一个新的线程去执行它;当线程数量等于或者大于这个值时,即池里没有空闲线程,此时会把任务暂存在任务队列中。此外,核心线程也表示池中常驻的线程,即当没有需要执行的任务时,这些线程不会被清除。(当然你也可以设置allowCoreThreadTimeOut(true)使核心线程在超出keepAliveTime后被清除,默认是false)
- maxinumPoolSize:池中存在的最大线程数量。当核心线程都在忙碌,并且任务队列也已经满时;此时,再有新的任务进来,线程池就会临时创建线程去执行这个任务。
- keepAliveTime:线程的保活时间,默认只有当池中线程数量超出核心线程数时该参数才会起作用,即默认该参数只对超出核心线程数目的线程起作用。
- unit:时间单位,会转换为纳秒
- workQueue:任务队列。根据情况的选择,有多种队列,这个下文再介绍。
- threadFactory:线程工厂,用于创建线程
- handler:任务拒绝策略。当线程数量达到maxinumPoolSize或者线程池被关闭时,此时线程池将无法再接收任务,线程池采用RejectedExecutionHandler来进行任务拒绝策略选择。
从上面的构造函数我们也能看到这些参数需要满足的条件,如下:
- corePoolSize不能小于0;maximumPoolSize不能小于0,且maximumPoolSize必须大于corePoolSize;keepAliveTime不能小于0
- workQueue,threadFactory,handler不能同时为null。
可以看到这个构造函数中还有一个acc变量,据注释的说明,这个一个会在调用finalizer时使用的上下文对象。
除开上面四个参数,后面三个参数都是可选的对象,这里我们再仔细介绍下这三个参数都有哪些选择。
workQueue:
- SynchronousQueue:同步队列。它不会保存提交的任务,而是直接将任务交给线程执行,若当前没有空闲线程,则会创建一个新的线程来执行任务。因此使用这个队列时,通常会将maxinumPoolSize设置的非常大。
- ArrayBlockingQueue:有界队列。基于数组,在创建时必须指定大小。
- LinkedBlockingQueue:无界队列。基于链表,如果没有指定大小,默认值为Integer.MAX_VALUE。
这些队列的具体实现由于篇幅过大,笔者打算在后续的文章中单独进行介绍。
threadFactory:
threadFactory如果没有指定的话会采用一个默认实现,这个默认实现的源码如下:
1 | public static ThreadFactory defaultThreadFactory() { |
1 | DefaultThreadFactory() { |
可以看到这个默认实现中,指定了线程的线程组和线程名的前缀。当然我们也可以通过实现ThreadFactory来定制线程的一些特性,比如线程名,线程组,优先级,守护线程状态等。这个实现类我们可以自己实现,也可以直接使用一些通用框架的实现类,比如apache-commons中的BasicThreadFactory类。
handler:
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,或者线程池被关闭,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。默认策略。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
这四个是ThreadPoolExecutor的内部类,皆实现了RejectedExecutionHandler接口,并重写了它的rejectedExecution方法,定制了不同的策略。下面看下这四个类的具体实现。
AbortPolicy的源码如下:
1 | /** |
可以看到该实现类的rejectedExecution方法中只是抛出了RejectedExecutionException。
DiscardPolicy:
1 | /** |
rejectedExecution方法中什么都没做。
1 | public static class DiscardOldestPolicy implements RejectedExecutionHandler { |
可以看到这个rejectedExecution方法中,首先判断线程池是否被关闭,如果没有,则将队列的首元素出队,并执行当前任务。
CallerRunsPolicy:
1 | /** |
可以看到在这个类的rejectedExecution方法中,如果线程池没有关闭,则直接在当前线程执行该任务。
上面我们了解了如何构造一个线程池,以及它各种构件的选择,下面我们来看一下线程池执行的过程。我们知道线程池在执行时一般调用的是submit方法或者直接调用execute方法,但事实上最终都会调用到execute方法,submit方法的三个版本如下:
1 | /** |
可以看到它们实际上都是构造了一个RunnableFuture类型变量,然后传入execute方法中。事实上传入的都是一个FutureTask对象,我们看一下其中一个newTaskFor的实现就可明白,其他的newTaskFor版本类似,只是调用了FutureTask不同的构造方法,它的源码如下:
1 | protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { |
下面我们就重点看一下execute方法的实现,它的源码如下:
1 | public void execute(Runnable command) { |
execute的执行流程大致就如上面注释所说,下面我们看一下execute所涉及的其他方法。首先看看这个ctl.get()到底会得到一个什么,我们分析下面这些源码:
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
这里又是一个用到位运算的巧妙设计,作者将32位二进制数拆成两部分,前三位表示线程池状态,后29位表示线程数目。我们来一步步分析,逐步计算出这些常量的值。
- Integer.SIZE:这是Integer中的一个常量,值为32。
- COUNT_BITS:显而易见,比Integer.SIZE少3,值为29。
- CAPACITY:1左移29位减一,即为2^29 - 1,表示为二进制即为00011111111111111111111111111111。
- RUNNING:-1的二进制表示为11111111111111111111111111111111,左移29位为11100000000000000000000000000000。
- SHUTDOWN:0无论咋移还是0哈,所以它的二进制表示为00000000000000000000000000000000
- STOP:1的二进制表示为00000000000000000000000000000001,左移29位为00100000000000000000000000000000。
- TIDYING:2的二进制表示为00000000000000000000000000000010,左移29位为01000000000000000000000000000000。
- TERMINATED:3的二进制表示为00000000000000000000000000000011,左移29位为01100000000000000000000000000000。
仔细看下上面这些常量的二进制表示我们可以发现,容量的前三位为0,而状态的后29位为0。所以我们最终可以得出是上面的结论:前三位表示线程状态,后二十九位表示线程数量。
我们再看一下后面三个静态方法,既然CAPACITY为00011111111111111111111111111111,取反则为11100000000000000000000000000000,那么在runStateOf方法中无论c的值是什么,返回的都是头三位相与的结果;而在方法workerCountOf中返回的就是后29位相与的结果;ctlOf方法返回线程状态和线程数目的并集。
现在我们就可以计算出ctl的值了,即为RUNNING的值,-1 << 29。回到上面的execute方法workerCountOf(c)的初始值便为0,事实上我们只需要知道它表示线程池中的线程数量即可。
下面看一下addWorker的实现,它的源码如下:
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
其中Worker是ThreadPoolExecutor的内部类,它包裹了Thread和FutureTask,既是任务也会任务的调用者。我们看一下它的实现。下面是它的构造函数源码:
1 | Worker(Runnable firstTask) { |
可以看到thread中放入的便是Worker本身,当调用thread的start方法时,线程启动,调用Worker的run方法。它的源码如下:
1 | /** Delegates main run loop to outer runWorker */ |
它调用了一个runWorker方法,它的源码如下:
1 | final void runWorker(Worker w) { |
这个方法中最终调用到了FutureTask的run方法,而在FutureTask的run方法中,最终会执行Callable的call方法,任务就这样被执行起来了。这里需要注意的是Worker并不是每次都持有FutureTask,如果没有持有,它会调用getTask从队列中获取。FutureTask的run方法笔者在之前的博客中介绍过了,这里不再赘述。
总结
ThreadPoolExecutor的源码就介绍到这里,由于笔者水平有限,有些地方笔者也没有完成搞清楚,但大致流程倒是理通了。其中不明之处笔者可以自行查证,到时希望在评论里留言一二,告知笔者,这里万分感谢了。这是笔者Executor源码分析的第三篇,后续笔者会继续介绍Executor中其他的成员。