第6章 线程池与Future

线程池的实现原理

下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型。

线程池的实现原理

要实现这样一个线程池,有几个问题需要考虑:

  1. 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
  2. 线程池中的线程个数是固定的,还是动态变化的?
  3. 每次提交新任务,是放入队列?还是开新线程?
  4. 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?

针对问题4,有3种做法:

  1. 做法(1):不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞—唤醒机制。当队列为空时,线程池中的线程只能睡眠一会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询。
  2. 做法(2):不使用阻塞队列,但在队列外部、线程池内部实现了阻塞—唤醒机制。
  3. 做法(3):使用阻塞队列。很显然,做法(3)最完善,既避免了线程池内部自己实现阻塞—唤醒机制的麻烦,也避免了做法(1)的睡眠—轮询带来的资源消耗和延迟。正因为如此,接下来要讲的ThreadPoolExector/ScheduledThreadPoolExecutor都是基于阻塞队列来实现的,而不是一般的队列,至此,各式各样的阻塞队列就要派上用场了。

线程池的类继承体系

线程池的类继承体系如图所示。

ThreadPoolExector和ScheduledThreadPoolExecutor类继承体系

在这里,有两个核心的类:ThreadPoolExector和ScheduledThreadPoolExecutor,后者不仅可以执行某个任务,还可以周期性地执行任务。

向线程池中提交的每个任务,都必须实现Runnable接口,通过最上面的Executor接口中的execute(Runnable command)向线程池提交任务。

然后,在ExecutorService 中,定义了线程池的关闭接口shutdown(),还定义了可以有返回值的任务,也就是Callable。

ThreadPoolExector

核心数据结构

基于线程池的实现原理,下面看一下ThreadPoolExector的核心数据结构。

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet<Worker> workers = new HashSet<Worker>();
}

每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;
    }
}

由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁有什么用处呢?在接下来分析线程池的关闭、线程执行任务的过程时会了解到。

核心配置参数解释

针对本章最开始提出的线程池实现的几个问题,ThreadPoolExecutor在其构造函数中提供了几个核心配置参数,来配置不同策略的线程池。了解了清楚每个参数的含义,也就明白了线程池的各种不同策略。

public class ThreadPoolExecutor extends AbstractExecutorService {
     public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

上面的各个参数,解释如下:

  1. corePoolSize:在线程池中始终维护的线程个数。
  2. maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。
  3. keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
  4. blockingQueue:线程池所用的队列类型。
  5. threadFactory:线程创建工厂,可以自定义,也有一个默认的。
  6. RejectedExecutionHandler:corePoolSize 已满,队列已满,maxPoolSize 已满,最后的拒绝策略。

下面来看这6个配置参数在任务的提交过程中是怎么运作的。在每次往线程池中提交任务的时候,有如下的处理流程:

  1. step1:判断当前线程数是否大于或等于corePoolSize。如果小于,则新建线程执行;如果大于,则进入step2。
  2. step2:判断队列是否已满。如未满,则放入;如已满,则进入step3。
  3. step3:判断当前线程数是否大于或等于maxPoolSize。如果小于,则新建线程执行;如果大于,则进入step4。
  4. step4:根据拒绝策略,拒绝任务。

总结一下:首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。

很显然,基于这种流程,如果队列是无界的,将永远没有机会走到step 3,也即maxPoolSize没有使用,也一定不会走到step 4。

线程池的优雅关闭

线程的优雅关闭,是一个很需要注意的地方。而线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。

线程池的生命周期

在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字段里面,即ctl变量。如图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,这两个变量是分开存储的。

ctl变量的bit位布局

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
}

由上面的代码可以看到,ctl变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。

下面分析状态之间的迁移过程,如图所示。

线程池的状态迁移图

线程池有两个关闭函数,shutdown()和shutdownNow(),这两个函数会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING 状态;最后执行一个钩子函数terminated(),进入TERMINATED状态,线程池才“寿终正寝”。

这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态。

除terminated()之外,线程池还提供了其他几个钩子函数,这些函数的实现都是空的。如果想实现自己的线程池,可以重写这几个函数。

public class ThreadPoolExecutor extends AbstractExecutorService {
    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
    protected void terminated() { }
}

正确关闭线程池的步骤

通过上面的分析,我们知道了线程池的关闭需要一个过程,在调用shutDown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用awaitTermination 来等待线程池关闭。关闭线程池的正确步骤如下:

		ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
                1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
        executor.shutdown();
//        executor.shutdownNow();
        try {
            boolean loop = true;
            do {
                loop = !executor.awaitTermination(2, TimeUnit.SECONDS);
            } while (loop);
        } catch (InterruptedException e) {
            ...
        }

awaitTermination(..)函数的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞一段时间,“苏醒”之后,继续判断。

public class ThreadPoolExecutor extends AbstractExecutorService {
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 判断状态是否是TERMINATED
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
}

shutdown()与shutdownNow()的区别

下面的代码展示了shutdown()和shutdownNow()的区别:

(1)前者不会清空任务队列,会等所有任务执行完成,后者再清空任务队列。

(2)前者只会中断空闲的线程,后者会中断所有线程。

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检测是否有关闭线程池的权限
            checkShutdownAccess();
            // 将线程池状态设置到SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 只中断空闲的线程
            interruptIdleWorkers();
            // 钩子函数
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检测是否有关闭线程池的权限
            checkShutdownAccess();
            // 将线程池状态设置到STOP
            advanceRunState(STOP);
            // 中断所有线程
            interruptWorkers();
            // 清空任务队列
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        // 返回队列中未执行的任务
        return tasks;
    }
}

下面看一下在上面的代码里中断空闲线程和中断所有线程的区别。

public class ThreadPoolExecutor extends AbstractExecutorService {
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 关键点:tryLock调用成功,说明线程处于空闲状态;
                // tryLock调用不成功,则说明线程当前持有锁,正在执行某个任务
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                // 不管线程是否正在执行任务,一律发送中断信号
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
}

关键区别点在tryLock():一个线程在执行一个任务之前,会先加锁(在后面会详细讲),这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。tryLock()如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。

在上面的代码中,shutdown()和shutdownNow()都调用了tryTerminate()函数,如下所示。

public class ThreadPoolExecutor extends AbstractExecutorService {
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            // 当workQueue为空,workCount为0时,程序才会执行到这里
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 将线程池状态切换到TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 调用钩子函数
                        terminated();
                    } finally {
                        // 将线程池状态由TIDYING推进到TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 通知awaitTermination
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
}

tryTerminate()不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子函数terminated()。当钩子函数执行完成时,把状态从TIDYING 改为TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。

所以,TIDYING和TREMINATED的区别是在二者之间执行了一个钩子函数terminated(),目前是一个空实现。

任务的提交过程分析

提交任务的函数如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 如果当前的线程数小于corePoolSize,则新创建线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果当前的线程数大于或等于corePoolSize,则调用workQueue.offer(command)放入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 放入阻塞队列失败,则新创建线程
            reject(command); // 线程数大于maxPoolSize,调用拒绝策略
    }
    
    /* 
     * 新创建一个线程;如果第二个参数core为true,则用corePoolSize作为上界,
     * 否则用maxPoolSize作为上界 
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 只要状态大于或等于SHUTDOWN,说明线程池进入了关闭的过程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 线程数超过上界corePoolSize(或maximumPoolSize),直接返回false
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    // workCount成功加1,跳出整个for循环
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    // unState在这个过程中发生了变化,重新开发for循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // workCount成功加1,开始添加线程操作
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建一个线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将新创建的线程线加入线程集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 若成功加入,则启动该线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 线程启动失败,将workCount减1
                addWorkerFailed(w);
        }
        return workerStarted;
    }
}

任务的执行过程分析

在上面的任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。

下面来看Woker的run()方法的实现过程。

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        Worker(Runnable firstTask) {
            // 初始状态是1
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run() {
            // 调用了ThreadPoolExecutor的runWork(Worker w)函数
            runWorker(this);
        }
    }
    
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 不断从阻塞队列中获取任务
            while (task != null || (task = getTask()) != null) {
                // 任务开始执行
                // 关键点:在执行任务之前要先加锁,此处与shutdown()函数中的点对应起来
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 拿到任务了,在执行之前重新检测线程池的状态
                    // 如果发现线程池已经开始关闭,自己给自己发送中断信号
                    wt.interrupt();
                try {
                    // 任务执行之前的钩子函数,目前是空实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务代码
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任务执行之后的钩子函数,目前是空实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 任务执行完成,completedTasks累加
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 判断这个worker是正常退出,还是收到中断或其他异常而退出
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
}

shutdown()与任务执行过程综合分析

把任务的执行过程和上面的线程池的关闭过程结合起来进行分析,当调用shutdown()的时候,可能出现以下几种场景:

  1. 场景1:当调用shutdown()的时候,所有线程都处于空闲状态。

    这意味着任务队列一定是空的。此时,所有线程都会阻塞在getTask()函数的地方。然后,所有线程都会收到interruptIdleWorkers()发来的中断信号,getTask()返回null,所有Worker都会退出while循环,之后执行processWorkerExit。

  2. 场景2:当调用shutdown()的时候,所有线程都处于忙碌状态。

    此时,队列可能是空的,也可能是非空的。interruptIdleWorkers()内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回null。之后,就和场景1一样了,退出while循环。

  3. 当调用shutdown()的时候,部分线程忙碌,部分线程空闲。

    有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在getTask()函数的地方。空闲的这些线程会和场景1一样处理,不空闲的线程会和场景2一样处理。

下面看一下getTask()函数的内部细节。

public class ThreadPoolExecutor extends AbstractExecutorService {
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 关键点
            // 如果rs >= STOP,即调用了shutdownNow(),此处会返回null
            // 如果rs >= SHUTDOWN,即调用了shutdown(),并且队列为空,此处也会返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                // 此处返回null,上面的worker就会退出while循环,然后结束
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 关键点
                // 队列为空,就会阻塞此处的poll或者take,线程空闲,前者带超时,后者不带超时
                // 一旦收到中断信号,此处就会抛出中断异常,对应上面的场景一
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
}

shutdownNow()与任务执行过程综合分析

和上面的shutdown()类似,只是多了一个环节,即清空任务队列。在第1章中已经讲到,如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。从这个意义上讲,中断空闲线程和中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。

当一个Worker最终退出的时候,会执行清理工作,代码如下所示。

public class ThreadPoolExecutor extends AbstractExecutorService {
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果哟个Worker正常退出,在上面的getTask()里面,就已经把workerCount减1了。
        // 走到此处,都是非正常退出,所以要把workerCount减1
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 将自己从workers集合中移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 和shutdown()、shutdownNow()一样,每个线程在结束的时候都会尝试
        // 调用这个函数,看是否可以终止整个线程池
        tryTerminate();

        // 这是一个保险,在自己要退出的时候,发现线程池的状态小于STOP,并且队列不为空,
        // 并且当前没有工作线程数了,那么调用addWorker()再开启一个新线程,把队列中任务完成
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
}

线程池的4种拒绝策略

在execute(Runnable command)的最后,调用了reject(command)执行拒绝策略,代码如下所示。

public class ThreadPoolExecutor extends AbstractExecutorService {
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
}

RejectedExecutionHandler 是一个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是AbortPolicy。四种策略的实现代码如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
}

Callable与Future

execute(Runnable command)接口是无返回值的,与之相对应的是一个有返回值的接口Future submit(Callabletask)。

Callable也就是一个有返回值的Runnable,其定义如下所示。

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

使用方式如下:

		ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
                1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
		Callable<String> callable = new XXXCallable<String>();
		Future<String> future = executor.submit(callable);
        try {
            String result = future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

submit(Callable task)并不是在ThreadPoolExecutor 里面直接实现的,而是实现在其父类AbstractExecutorService中,源码如下:

public abstract class AbstractExecutorService implements ExecutorService {
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 将Callable转换成Runnable
        RunnableFuture<T> ftask = newTaskFor(task);
        // 调用ThreadPoolExecutor的execute方法
        execute(ftask);
        return ftask;
    }
}

从这段代码中可以看出,Callable其实是用Runnable实现的。在submit内部,把Callable通过FutureTask这个Adapter转化成Runnable,然后通过execute执行。如图所示为Callable被转换成Runnable示意图。

Callable被转换成Runnable示意图

FutureTask是一个Adapter对象。一方面,它实现了Runnable接口,也实现了Future接口;另一方面,它的内部包含了一个Callable对象,从而实现了把Callable转换成Runnable。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
}

如图所示,一方面,线程池内部的线程在执行RunTask的run()方法;另一方面,外部多个线程又在调用get()方法,等着返回结果,因此这个地方需要一个阻塞—通知机制。

FutureTask对象的线程同步示意图

在JDK 6中借用AQS的功能来实现阻塞—唤醒。但自JDK 7开始,既没有借用AQS的功能,也没有使用Condition的await()/notify()机制,而是直接基于CAS state变量+park/unpark()来实现阻塞—唤醒机制。由于这个原理在上文讲AQS和Condition的时候已反复提及,此处就不再对awaitDone()进一步展开分析了。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor实现了按时间调度来执行任务,具体而言有两个方面:

(1)延迟执行任务

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
    
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
}

(2)周期执行任务

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
    
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
}

这两个函数的区别如下:

AtFixedRate:按固定频率执行,与任务本身执行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。

WithFixedDelay:按固定间隔执行,与任务本身执行时间有关。例如,任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s。

延迟执行和周期性执行的原理

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,这意味着其内部的数据结构和ThreadPoolExecutor是基本一样的,那它是如何实现延迟执行任务和周期性执行任务的呢?

延迟执行任务依靠的是DelayQueue。DelayQueue是BlockingQueue的一种,其实现原理是二叉堆。

而周期性执行任务是执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行。

不过这里并没有使用DelayQueue,而是在ScheduledThreadPoolExecutor内部又实现了一个特定的DelayQueue,如下所示。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
        
    }
}

其原理和DelayQueue一样,但针对任务的取消进行了优化。

延迟执行

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
}

传进去的是一个Runnable,外加延迟时间delay。在内部通过decorateTask(..)函数把Runnable包装成一个ScheduleFutureTask对象,而DelayedWorkerQueue中存放的正是这种类型的对象,这种类型的对象一定实现了Delayed接口。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
}

从上面的代码中可以看出,schedule()函数本身很简单,就是把提交的Runnable 任务加上delay时间,转换成ScheduledFutureTask对象,放入DelayedWorkerQueue中。任务的执行过程还是复用的ThreadPoolExecutor,延迟的控制是在DelayedWorkerQueue内部完成的。

周期性执行

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
}

和schedule(..)函数的框架基本一样,也是包装一个ScheduledFutureTask对象,只是在延迟时间参数之外多了一个周期参数,然后放入DelayedWorkerQueue就结束了。

两个函数的区别在于一个传入的周期是一个负数,另一个传入的周期是一个正数,为什么要这样做呢?下面进入ScheduledFutureTask的内部一探究竟。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
        /** Sequence number to break ties FIFO */
        private final long sequenceNumber;

        /** The time the task is enabled to execute in nanoTime units */
        private long time;

        /**
         * Period in nanoseconds for repeating tasks.  A positive
         * value indicates fixed-rate execution.  A negative value
         * indicates fixed-delay execution.  A value of 0 indicates a
         * non-repeating task.
         */
        private final long period;

        /** The actual task to be re-enqueued by reExecutePeriodic */
        RunnableScheduledFuture<V> outerTask = this;

        /**
         * Index into delay queue, to support faster cancellation.
         */
        int heapIndex;
        
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }

        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

        /**
         * Returns {@code true} if this is a periodic (not a one-shot) action.
         *
         * @return {@code true} if periodic
         */
        public boolean isPeriodic() {
            return period != 0;
        }

        /**
         * Sets the next time to run for a periodic task.
         */
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }

        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
        }
        
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
    }
}

withFixedDelay和atFixedRate的区别就体现在setNextRunTime里面。

如果是atFixedRate,period>0,下一次开始执行时间等于上一次开始执行时间+period;

如果是withFixedDelay,period < 0,下一次开始执行时间等于triggerTime(-p),为now+(-period),now即上一次执行的结束时间。

Executors工具类

concurrent包提供了Executors工具类,利用它可以创建各种不同类型的线程池,如下所示。

public class Executors {
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

从上面的代码中可以看出,这些不同类型的线程池,其实都是由前面的几个关键配置参数配置而成的。

在《阿里巴巴Java开发手册》中,明确禁止使用Executors创建线程池,并要求开发者直接使用ThreadPoolExector或ScheduledThreadPoolExecutor进行创建。这样做是为了强制开发者明确线程池的运行策略,使其对线程池的每个配置参数皆做到心中有数,以规避因使用不当而造成资源耗尽的风险。