您好,欢迎访问代理记账网站
  • 价格透明
  • 信息保密
  • 进度掌控
  • 售后无忧

JUC源码学习笔记5——线程池,FutureTask,Executor框架源码解析

JUC源码学习笔记5——线程池,FutureTask,Executor框架源码解析

源码基于JDK8
参考了美团技术博客
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

一丶Executor框架概览

刚接触Java线程池的时候,常常被ThreadPoolExecutor,Executor,ExecutorService,Future,FutureTask搞得一头雾水,下面我们先来来理一理这些类的关系。

1.Executor框架的主要结构

  • 任务,表示被执行任务,实现Runnable 或者Callable接口来描述任务需要做什么事情
  • 任务的执行,包括任务的执行核心接口Executor,和实现了ExecutorExecutorService接口,其中包括两个重要的实现ThreadPoolExecutorScheduledThreadPoolExecutor
  • 异步计算的结果,Future表示未来可期,FutureTask是其重要实现

2.Executor框架重要类简介

2.1 Executor 接口

public interface Executor {
    void execute(Runnable command);
}

Executor的作用的是把任务提交与每个任务将如何运行的机制进行解耦,其中只有一个方法execute,但是其实现类,可能是同步的直接调用Runnable#run,也可能是异步开启线程执行,并不要求Executor的实现必须是异步,所以没有一个叫submit(Callable)返回异步计算结果的Future方法

2.2 ExecutorService 接口

ExecutorService 实现了Executor 接口,提供管理和终止的方法,并且可以生成 Future 来跟踪一个或多个异步任务的进度的方法,还提供了批量提交任务的方法。

方法 描述
void shutdown() 关闭执行器,如果还有任务没有执行完,那么这些任务还会执行,但是不会接受新的任务,如果已经处于关闭状态还去调用此方法,不会有任何效果
List<Runnable> shutdownNow() 尝试停止所有正在执行的任务,返回等待执行但未执行的任务,停止执行任务通常是通过调用对应线程的interrupt方法,如果线程自己不响应中断,那么无济于事,任务还是会继续执行
boolean isShutdown() 如果已经被关闭那么返回true,通常调用shutdownshutdownNow后可能存在线程在执行任务,但是还是返回true
boolean isTerminated() 如果所有任务在关闭后都已完成,则返回 true。请注意,除非调用了 shutdown 或 shutdownNow,且所以任务都结束了,否则 isTerminated 永远不会为真
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException 关闭当前ExcutorServuce后阻塞当前线程,直到所有任务都完成执行,或者发生超时,或者当前线程被中断。
<T> Future<T> submit(Callable<T> task) 提供一个具备返回值的任务,返回一个Future表示是此任务的异步执行结果。
<T> Future<T> submit(Runnable task, T result) submit(Callable)类似,但是其异步返回结果在执行完后是参数result
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException 批量提交一批任务,阻塞直到所有任务都完成或者任务执行失败或者当前线程被中断
List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException; 批量提交一批任务,阻塞直到所有任务都完成或者任务执行失败或者当前线程被中断,指定的时间超时
T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException,ExecutionException; 提交一批任务,等待其中一个执行完成,或者直到当前线程被中断,返回时会取消没有执行完的任务
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 提交一批任务,等待其中一个执行完成,或者直到当前线程被中断,或者等待时间超时,返回时会取消没有执行完的任务

2.3 Future 接口

Future 表示异步计算的结果。提供了检查计算是否完成、等待其完成以及检索计算结果的方法

方法 描述
boolean cancel(boolean mayInterruptIfRunning) 尝试取消此任务的执行。如果任务已完成、已被取消或由于某些其他原因无法取消,则此尝试将失败。如果成功,并且在调用取消时此任务尚未启动,则此任务不应该运行。如果任务已经开始,则 mayInterruptIfRunning 参数确定是否应该中断执行该任务的线程以尝试停止该任务。此方法返回后,对 isDone 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled 的后续调用将始终返回 true。
boolean isCancelled() 如果此任务在正常完成之前被取消,则返回 true。返回:如果此任务在完成之前被取消,则为 true
boolean isDone() 如果此任务完成,则返回 true。完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回 true。
V get() throws InterruptedException, ExecutionException 如果任务没有执行完,那么一直阻塞直到任务完成(执行出现异常也是一种完成),或者直到当前线程被阻断
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException V get() throws InterruptedException, ExecutionException的超时等待版本,如果超时还没有结束任务,那么抛出TimeoutException

二丶FutureTask源码解析

1.FutureTask 是什么

根据源码上的注释,FutureTask是一个可取消的异步计算,它提供了启动和取消计算,查询计算是否完成,以及获取计算结果(只有计算结束了才能获取到计算结果,否则被阻塞),一旦计算完成那么不可以重新或者取消计算,除非调用runAndReset

2.FutureTask 的继承关系

Future 表示异步计算的结果,Runnale表示一个任务,RunnableFuture实现了二者也是一个接口表示的是具备异步计算结果的任务(没有结果的Runnable可以视作结果是null)。

3.FutureTask 内部结构

//任务执行的状态,使用volatile修饰保证线程可见性
private volatile int state;

//任务处于新建
private static final int NEW          = 0;
//任务正在完成,意味着异步计算结束,但是结果没有写回到 outcome
private static final int COMPLETING   = 1;
//任务正常结束,调用callable的call没有抛出异常
private static final int NORMAL       = 2;
//调用callable的call抛出异常
private static final int EXCEPTIONAL  = 3;
//任务被取消
private static final int CANCELLED    = 4;
//任务正在被中断,一般是一个瞬态
private static final int INTERRUPTING = 5;
//任务已经被中断
private static final int INTERRUPTED  = 6;

//一般是我们的业务逻辑代码封装而成的,(下面会解释runnable怎么半)
private Callable<V> callable;
//记录异步计算的结果,如果成功callable的结果,如果抛出异常那么记录的是异常对象
//(下面会解释为啥这里不需要用volatile修饰)
private Object outcome; // non-volatile, protected by state reads/writes
//执行当前任务的线程
private volatile Thread runner;
//一个FutureTask可以被多个线程调用获取异步计算结果的方法
//如果异步计算没有结束那么这些方法会被阻塞
//WaitNode这种数据结果用于保存阻塞的线程们
private volatile WaitNode waiters;

3.1 WaitNode

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

WaitNode 包含等待的线程和,指向其他线程的next引用

4.FutureTask 的构造方法

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

4.1 通过 Executors.callableRunnable适配为Callable

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

   static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

这部分就是一个简单的适配器模式,没什么好说的

4.2 使用this.state = NEW保证 this.callable对其他线程的可见性

试想如果执行FutureTask的构造方法的时候,发生重排序,this.callable 的赋值重排序到外部获取到构造方法生成的FutureTask的后面,并且立马有另外一个线程调用了FutureTask的任务执行方法,这时候this.callable还来不及赋值,调用执行方法抛出空指针异常。那么为什么不用volatile修饰callable还能保证其可见性昵,能让源码写上// ensure visibility of callable这行注释昵?

在《JUC源码学习笔记4——原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法》的学习笔记中,我们说过volatile变量写具备如下内存屏障

img

这里的store store屏障防止了this.callable 的赋值重排序到 this.state = NEW之后,且后续的store屏障会保证当前线程(构造FutureTask的线程)工作内存会立马写回到主内存,并让其他线程关于此FutureTask的缓存无效,从而保证了callable的线程可见性。

5.FutureTask#run 运行任务

5.1带着问题看源码

  • 前面我们说过FutureTask是一个可取消的异步计算,也就是说一个线程在运行FutureTask的时候可能存在其他线程并发的调用其取消,注意只有没有完成的任务可以取消,已经完成的任务不可以取消,那么doug lea是如何实现FutureTask完成前后的取消昵
  • 上面所说的取消是cancel方法,这个方法有一个布尔类型入参mayInterruptIfRunning为true的时候会尝试中断运行FutureTask的线程,那么如果我们业务中逻辑中响应了中断,FutureTask是怎么维护任务状态从newINTERRUPTING然后到INTERRUPTED的昵,
  • FutureTask提供获取计算结果的get方法,如果FutureTask正在运行,调用get方法的线程将被阻塞,这就意味着,FutureTask#run成功或者失败都要唤醒阻塞的线程(多个线程都可以调get)

5.2 源码分析

public void run() {
    //如果不是初始,说明有其他线程启动了,或者说其他线程取消了任务 那么不需要运行
    //如果是new 但是cas runner失败了,说明同时有多个线程执行此cas,当前线程没有抢过,那么不能执行此任务,
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
	
        //再次校验下是否为初始状态 如果不是 说明在当前线从第一个if到此存在其他线程取消任务
        //任务启动之前可以取消任务的运行
        //====代码点1 源码解析用=====
        if (c != null && state == NEW) {
            V result;
	        //记录当前任务是否成功执行,如果Callable代码写错了,
            //或者说Callable响应中断,执行的途中被中断那么为false
            boolean ran;
            try {
                //业务逻辑执行
                result = c.call();
                //成功执行
                ran = true;
            } catch (Throwable ex) {
                //这里可能是Callable本身代码逻辑错误异常 也可能是响应中断抛出异常
                result = null;
                ran = false;
                //但是setExcption只会处理代码逻辑异常
                setException(ex);
            }
            if (ran)
                //设置任务正常执行结果
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        //处理中断
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

5.2.1只有初始状态的任务,且只能允许单个线程允许当前任务

 if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
  • 如果任务当前状态不是new,说明有其他线程运行了,或者说其他线程取消了任务
  • 如果是new 但是cas runner失败了,说明同时有多个线程执行此cas,当前线程没有抢过,那么不能执行此任务,这里使用CAS确保任务不会被其他线程再次执行

5.2.2 任务运行(调用callalbe#call前)取消任务那么任务不会允许

代码点1进行了 if (c != null && state == NEW)的判断,其中c!=null属于是空指针PTSD,但是state == NEW是为了确保,当前线程从5.2.1中cas到代码点1这一段代码执行时间内,没有其他线程取消任务。如果存在其他线程取消了任务,那么state == NEW就不成立——任务执行前可以取消任务

5.2.3 记录任务执行结果setException set方法

  • 如果运行出现异常

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    
  • 正常运行结束

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    

这两个方法都差不多,都是上来一个CAS将state从new转变为COMPLETING,然后用outcome记录异常或者记录成功返回值,然后使用UNSAFE.putOrderedInt改变state,如果是出现异常,那么设置状态为EXCEPTIONAL,如果正常结束设置为NORMAL

5.2.3.1 为什么使用UNSAFE.putOrderedInt 为什么outcome没有使用volatile修饰

UNSAFE.putOrderedInt这个方法我在《JUC源码学习笔记4——原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法》中关于AtomicInteger lazySet中说过,Store load屏障可以让后续的load指令对其他处理器可见,但是需要将其他处理器的缓存设置成无效让它们重新从主内存读取,putOrderedInt提供一个store store屏障,然后写数据,store store是保证putOrderedInt之前的普通写入和putOrderedInt的写入不会重排序,但是不保证下面的volatile读写不被重排序,省去了store load内存屏障,提高了性能,但是后续的读可能存在可见性的问题。putOrderedInt的store store屏障保证了outcome回立即刷新回主存

//也就是说
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
		//outcome是刷新回主存的,且不会重排序到putOrderedInt后面
        //这也是outcome没有使用volatile修饰的原因之一,
        //有后续调用putOrderedInt方法保证其对其他线程的可见性
        outcome = v;
        
        //state字段使用putOrderedInt写入 其他线程存在可见性的问题
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        
        finishCompletion();
    }
}

state的线程可见性问题是如何解决的——请接着看下去

5.2.3.2 finishCompletion 完成对等待线程的唤醒
private void finishCompletion() {
    // 等待的节点
    for (WaitNode q; (q = waiters) != null;) {
        
        //==代码点1 后续解析使用==
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //唤醒
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                //直到一个为null的节点,意味着遍历结束
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            //结束
            break;
        }
    }
	//钩子方法 留给我们自己扩展
    done();
	
    //将任务置为null
    callable = null;        // to reduce footprint
}

这里拿到waiters然后进行自旋遍历所有等待的节点线程然后唤醒它们,有意思的点在代码点1为何这里要使用CAS更新waiters为null昵,因为这里存在线程A执行完FutureTask调用finishCompletion 的同时线程B调用get进行等待,调用get方法进行排队(排队时也是CAS设置自己为waiters)这两个CAS必定有一个成功,有一个失败

  • 如果A失败,说明B在A唤醒之前进行排队,挂起自己,那么A在自旋唤醒的时候会唤醒B
  • 如果B失败,那么说明B在A唤醒之后进行排队,那么这时候不需要排队了,因为任务已经完成了,B只需要进行自旋获取返回结果即可
5.2.3.3 处理因为取消任务造成的中断#handlePossibleCancellationInterrupt

在run方法的finally块中存在

//运行完设置runner为空
runner = null;
//重新获取状态
int s = state;
 //如果是INTERRUPTING 或者INTERRUPTED
if (s >= INTERRUPTING)
    //handlePossibleCancellationInterrupt
    handlePossibleCancellationInterrupt(s);
private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
        //如果是打断中 那么等待直到结束打断
        while (state == INTERRUPTING)
            Thread.yield(); 

cancel方法可以选择传入true表示,如果任务还在运行那么调用运行任务线程的interrupt方法进行中断,如果是调用cancel的线程还没有结束中断那么当前运行的线程会让步,为什么这么做,后续讲解cancel的时候我们再说

5.3.获取任务执行结果

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //任务为NEW 和 COMPLETING 那么调用那么会调用awaitDone
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    //此方法如果发现Future调用异常那么排除异常
    return report(s);
}
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    //如果状态小于COMPLETING 则为 NEW 和 COMPLETING 那么会调用awaitDone
    //如果awaitDone结束的时候返回的状态还是 NEW or COMPLETING 抛出中断异常
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    //此方法如果发现Future调用异常那么排除异常
    return report(s);
}

get()表示无限期等待,直到当前线程被中断,get(long timeout, TimeUnit unit)则是超时等待,如果等待时间内任务没有完成那么抛出TimeoutException,如果等待过程被中断那么抛出中断异常。二者都调用了awaitDone(是否超时等待,等待时长)此方法返回值是此FutureTask状态。

5.3.1 COMPLETING

FutureTask在调用run方法正常结束调用set 运行出现异常调用 setException,这两个方法首先都会使用CAS将任务状态从NEW转变为COMPLETING,然后设置outcome的值,然后再设置任务状态为NORMAL(正常完成),EXCEPTIONAL出现异常,最后调用finishCompletion唤醒所有等待当前FutureTask完成的线程。

所有说COMPLETING 意味着当前任务(程序员自己定义的业务逻辑)的run方法调用完成,但是任务返回还没写到outcome上,以及还没唤醒等待的线程。但是只要outcome结果赋值后,立马改变任务状态为NORMAL或者EXCEPTIONAL,而不是调用完成finishCompletion后改变。为什么昵

  • 线程A调用get()无限期等待改变任务状态为NORMAL或者EXCEPTIONAL之前

    那么这时候线程A会进入awaitDone如果当前FutureTask状态为new那么会挂起自己等待任务执行线程调用finishCompletion,如果当前FutureTask状态为COMPLETING那么调用yield让出cpu,而不是挂起自己,因为后续FutureTask执行的线程写回outcome改变状态为NORMAL或者EXCEPTIONAL是很快的,也许修改状态为NORMAL或者EXCEPTIONAL会导致线程Ayield使用的是UNSAFE.putOrderedInt存在线程可见性问题),但是无关紧要

  • 线程A调用get()无限期等待改变任务状态为NORMAL或者EXCEPTIONAL之后

    说明当前线程A"踩点很准"任务一执行完成立马来了,那么当前线程A或许会yield,也许是直接获取到执行结果

  • 线程A调用get(long,timeunit)超时等待

    这种方式多一个超时机制,如果等待结束还是发现任务状态为new,说明程序员自己定义的任务业务逻辑还没有走完,那么直接超时异常,如果为COMPLETING,线程会进行让步,而不是视作任务执行超时

5.3.2 等待直到任务完成awaitDone方法

此方法还还支持超时等待

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    //等待结束时间,如果非超时等待,那么为0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
	//当前线程如果进入等待任务完成队列,此变量记录等待节点
    WaitNode q = null;
    //是否入队(等待任务完成队列)
    boolean queued = false;
    for (;;) {
        //如果等待的过程中被中断,
        //那么把自己从等待waiters中删除
        //并且抛出中断异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        //读取state,volatile保证可见性
        int s = state;
        //如果当前大于COMPLETING 说明任务执行完成,
        //或者取消了,或者由于取消而被中断 直接返回当前状态,不需要再等了
        if (s > COMPLETING) {
            //节点线程置为null,后续执行任务线程唤醒等待线程的时候不会唤醒到此线程
            if (q != null)
                q.thread = null;
            return s;
        }
        //如果任务正在完成,进行线程让步
        //后续FutureTask执行的线程写回outcome改变状态为NORMAL或者EXCEPTIONAL是很快的,
        //也许修改状态为NORMAL或者EXCEPTIONAL会导致线程A多yield几下(使用的是UNSAFE.putOrderedInt存在线程可见性问题),但是无关紧要
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //当前线程的节点
        else if (q == null)
            q = new WaitNode();
        //如果没有入队(等待任务完成的队列)那么入队(等待任务完成的队列)
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //如果是超时等待
        else if (timed) {
            nanos = deadline - System.nanoTime();
			//等待超时 把自己从等待任务完成队列中移除
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            //等待指定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            //无限期等待
            LockSupport.park(this);
    }
}

这个方法分支超多,我们慢慢分析

  1. 如果线程执行awaitDone之前就被中断,那么会直接抛出中断异常

  2. 如果线程执行awaitDone并且成功被使用LockSupport.parkNanos(this, nanos)或者LockSupport.park(this)挂起,这时候被中断那么会从这两个方法中返回,继续自旋,Thread.interrupted()为true,会重置中断标识并且抛出中断异常

  3. 如果自旋的时候发现任务状态大于COMPLETING

    说明当前任务执行完成了,或者说任务被取消,或者由于取消已经中断了,那么直接返回即可,从这里返回有三种情况第一次自旋发现任务完成了超时等待指定时间结束发现任务完成了,任务完成的时候被任务执行线程唤醒,继续自旋发现任务完成了

  4. 如果自旋的时候发现任务状态等于COMPLETING

    那么调用yield让出cpu,而不是挂起自己,因为后续FutureTask执行的线程写回outcome改变状态为NORMAL或者EXCEPTIONAL是很快的,也许修改状态为NORMAL或者EXCEPTIONAL会导致线程A多yield几下(使用的是UNSAFE.putOrderedInt存在线程可见性问题),但是无关紧要

    注意如果超时等待指定时间结束,继续自旋,如果进入此分支,那么让出cpu,再次获得时间片后,继续执行,下一次自旋,而不会进入到下面的超时异常分支,也就是说COMPLETING意味着任务执行完了,但是在做一些善后工作(写入任务返回值,唤醒等待线程)不会由于此状态导致超时

  5. q == null

    意味着当前线程没有被包装成WaitNode,当前线程也没有被中断,任务没有完成也不是Completing状态,这时候调用构造方法然后继续自旋

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
    
  6. !queued

    这是在5的基础上,当前q已经有了节点,但是还没有进入等待任务完成队列,下面通过CAS让当前线程入队

    else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                             q.next = waiters, q);
    

    这里首先q.next = waiters让当前节点的next指向waiters,然后CAS设置waiters为当前节点,也就是说最后入队的节点使用waiters记录,使用next串联每一个等待的线程节点,q.next = waiters不需要考虑线程安全,和AQS中的入队类似,这是改变当前节点的next引用指向,但是修改waiters需要考虑线程安全问题,如果这里CAS失败了,那么queued为false 继续自旋尝试CAS自己为waiters

  7. 挂起当前线程

    //超时等待
    else if (timed) {
        //需要等待的时间
        nanos = deadline - System.nanoTime();
        //已经超时
        if (nanos <= 0L) {
            //把自己从waiters等待任务完成队列中移除
            removeWaiter(q);
            return state;
        }
        //挂起指定时间
        LockSupport.parkNanos(this, nanos);
    }
    //等待直到被中断或者唤醒
    else
        LockSupport.park(this);
    

    这里超时部分多一个removeWaiter,将自己从等待任务完成队列中移除,这个方法的执行需要考虑线程安全问题,同样使用自旋+CAS保证线程安全,这里不做过多分析。

5.3.3 report
private V report(int s) throws ExecutionException {
    Object x = outcome;
    //如果任务正常结束
    if (s == NORMAL)
        //强转
        return (V)x;
    //如果任务取消了 或者由于取消被中断了,抛出取消异常
    if (s >= CANCELLED)
        throw new CancellationException();
    //反之抛出ExecutionException 包装 原始的异常
    throw new ExecutionException((Throwable)x);
}

只有任务正常执行的时候,才会返回结果,如果被取消那么抛出取消异常。

5.4 取消任务

取消有一个比较有趣的点,如果取消在任务执行完之前,那么说明取消成功,后续任务完成调用set或者setException应该是什么都不做。如果取消在任务执行完之后,那么取消的这个动作应该失败,下面我们看下doug lea如果处理这个细节。

//mayInterruptIfRunning 表示需要中断任务执行线程
public boolean cancel(boolean mayInterruptIfRunning) {
    //任务不是初始,或者CAS修改状态从new 到INTERRUPTING 或者CANCELLED 失败
    //直接返回false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {   
        //如果需要中断
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                //执行中断
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //修改状态为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //唤醒所有等待任务执行的线程
        finishCompletion();
    }
    return true;
}
  1. 第一个if

    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    
    • 如果 state == NEW不成立,说明任务在执行此判断之前已经结束了(Completing,或者已经到了NORMAL,或者EXCEPTIONAL)说明取消在任务结束之前,那么直接返回false。或者说当前线程A对任务的取消在其他线程B取消任务之后,这时候就A线程取消返回false

    • 如果UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))不成立

      意味着state == NEW判读的时候成立,但是执行这句CAS的时候之前有老六线程抢先一步,或者说存在并发当前线程没有抢过,那么也直接返回false,这里保证了cancel的执行是串行的,不存在线程安全问题。注意这里如果需要中断任务执行线程那么CAS修改状态到INTERRUPTING ,反之直接修改到CANCELLED

  2. 尝试中断任务执行线程

    if (mayInterruptIfRunning) {
        try {
            Thread t = runner;
            if (t != null)
                t.interrupt();
        } finally { // final state
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
        }
    }
    

    调用interrupt具体如何响应中断,得看程序员定义的业务逻辑是啥样的。调用putOrderedInt修改状态为INTERRUPTED,表示已经完成了中断

  3. 唤醒等待任务结束的线程

    直接调用finishCompletion,这个方法前面分析过。这里假如线程A取消了任务,那么线程B任务执行完后调用set或者setException 会如何昵——什么都不做

    protected void set(V v) {
        //此时state 是INTERRUPTING 或者INTERRUPTED if为false
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    
  4. run方法对中断的处理

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            //省略任务的执行
        } finally {
            runner = null;
            int s = state;
            //进入这个if 必须是INTERRUPTING,或者INTERRUPTED
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); 
    }
    

​ 这里为INTERRUPTING ,可能是取消任务的线程还没来得及执行 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED),也可能是可见性导致运行任务的线程没有读取到最新的state,handlePossibleCancellationInterrupt会让运行任务的线程等待,这点我不是很理解其必要性在哪里,也许回了确保任务执行线程在执行完FutureTask的run方法之后,一定已经被中断了。

三丶ThreadPoolExecutor 源码解析

1.线程池概览

1.1线程池是什么

一个ExecutorService的实现,会使用池中的一个线程处理提交的任务。池中存在n个线程,当前任务提交时,可能会new一个线程执行执行任务,也可能让当前任务进行排队。线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中。

1.2 线程池的结构

线程池可以看作两部分,执行工作的工人和在队列中等待被执行的任务

图2 ThreadPoolExecutor运行流程

2丶为什么要使用线程池,线程池解决了什么问题

2.1.使用线程池的好处

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。java线程和操作系统时一对一的映射关系,新建或者销毁一个线程都存在资源的消耗
  • 提高响应速度:任务到达时,一定情况下无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。使用过多的线程会导致线程上下文切换更多,从而导致在保存“现场”和恢复“现场”的开销激增
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行

2.2.线程池解决了什么问题

  1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。

    所以使用线程池,保存线程,避免每到一个任务需要新建一个线程,也避免执行完任务之后线程立马消亡

  2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。

    线程池可以设置最大线程数,线程池持有的线程不可超过此值

  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

3.线程池ThreadPoolExecutor的继承关系

Executor,ExecutorService两个接口前面分析了,下面看下AbstractExecutorService

3.1AbstractExecutorService

AbstractExecutorService提供了RunnableCallable适配成RunnableFuture(一般适配成FutureTask),还实现了ExecutorServicesubmitinvokeAny,以及invokeAll。是对ExecutorService的抽象实现,有点模板方法的意思

3.1.1 RunnableCallable适配
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
rotected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

这里都new了一个FutureTask,其中适配Runnable调用了Executors#callable方法,最后new了一个RunnableAdapter,典型的适配器模式

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
3.1.2 submit
  public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
//省略了submit(Runnable)和 submit(Runnable,T)

最后都是调用的execute(ftask)方法,此方法有具体子类实现,

3.1.3 执行任意一个 invokeAny

执行任意一个的难点在于,只要有任何一个执行完成了那么就需要返回,而不是阻塞等待所有任务完成,但是如果一直遍历所有任务执行状态是否是完成,那么会消耗CPU,那么douglea是如何实现的昵?

invokeAny方法最终调用doInvokeAny进行实现,具体的做法是将提交的任务,包装成ExecutorCompletionService的内部类QueueingFuture

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    //任务完成之后会回调 done方法 这时候,			  	
    //执行线程会把当前任务放入到 completionQueue——完成队列之中
    protected void done() { completionQueue.add(task); }
    //原来的任务
    private final Future<V> task;
}

completionQueue 是一个BlockingQueue 通常是无解阻塞队列LinkedBlockingQueue,所有进行自旋遍历的时候,会由于队列中没有完成的任务而阻塞挂起,减少CPU资源的消耗,而多个线程在执行任务,只要任务结束了(任务执行失败,被取消也是结束,正常完成也是结束)就会回调done方法把当前任务放入到completionQueue 等待的线程将被唤醒,获取到AQS锁的线程将调用take获取最先完成的任务,然后释放锁(这部参阅我的JUC源码学习笔记3——AQS等待队列和CyclicBarrier,BlockingQueue)接着其他的线程也如此往复,take方法是会删除等待队列头部的,也就是说多个线程调用invokeAny那怕任务不一样也是获取到不同的执行结果,3个任务被,四个线程调用invokeAny,最后拿到锁的线程拿到空

3.1.4 执行所有 invokeAll

invokeAll相比于 invokeAny就简单许多,直接循环执行每一个任务,然后调用Future#get方法就好,get方法会让当前线程阻塞,也就是说任务执行时间>Max(耗时最长任务时间),invokeAny则是>min(任务时间),但是invokeAll 返回List<Future<T>>,并且忽略单个任务的超时和取消抛出的异常,如果存在任何一个任务执行被中断且抛出中断异常,那么会去取消还在执行的任务并且进行中断,invokeAll还有一个超时版本,如果超时会直接返回所有Future。invokeAny返回的是T也就是任务执行结果,而不是Future<T>并且不忽略取消和中断也不忽略任务执行失败抛出的异常

3.2 线程池ThreadPoolExecutor的状态和属性

3.2.1 状态和状态的变更

img

图3 线程池生命周期

具体变更源码会在后续讲到

//ctl 高三位标识线程池的状态 低29位标识线程池工作线程个数
//一个int 标识方便了在更新状态和工作线程个数不需要加锁
private final AtomicInteger ctl  = new AtomicInteger(ctlOf(RUNNING, 0));
//29 移位操作使用
private static final int COUNT_BITS = Integer.SIZE - 3;
//2的29次方减1 标识线程池工作线程个数的最大值
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//RUNNING:接受新任务并处理排队任务,
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN:不接受新任务,但处理排队任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//STOP:不接受新任务,不处理排队任务,并中断正在进行的任务
private static final int STOP       =  1 << COUNT_BITS;
//  TIDYING:所有任务都已终止,workerCount 为零
private static final int TIDYING    =  2 << COUNT_BITS;
// TIDYING 状态后将运行 terminate() 钩子方法.然后来到TERMINATED状态
private static final int TERMINATED =  3 << COUNT_BITS;


//保存待处理任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//锁,线程池用一个set保存所有线程,一个int保存最大的线程数,修改这些的时候使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();
//包含池中所有工作线程的集合。仅在持有 mainLock 时访问
private final HashSet<Worker> workers = new HashSet<Worker>();
//调用awaitTermination的线程在此等待队列上等待
private final Condition termination = mainLock.newCondition();
//线程池中存在的最大的工作线程数。只能在 mainLock 下访问。
private int largestPoolSize;
//已完成任务的计数器。仅在工作线程终止时更新。只能在 mainLock 下访问。
private long completedTaskCount;
//新线程的工厂。所有线程都是使用这个工厂创建的
private volatile ThreadFactory threadFactory;
//拒绝策略,当达到最大线程数,并且队列也无法容纳任务的时候调用此方法
private volatile RejectedExecutionHandler handler;
//工作线程多久(纳秒)没有执行任务将被回收,(一般针对非核心线程,也可以用于核心线程的回收)
private volatile long keepAliveTime;
//如果为 false(默认),核心线程即使在空闲时也保持活动状态。如果为真,核心线程使用 keepAliveTime 超时等待工作。
private volatile boolean allowCoreThreadTimeOut;
//核心线程数,如果池中线程数小于核心线程数,那么接受新任务总是new一个线程
private volatile int corePoolSize;
//最大线程数,当池中线程数达到此值,将会把新任务放置在阻塞队列
private volatile int maximumPoolSize;
//默认拒绝的策略,抛出拒绝异常
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

3.3线程池源码分析

3.3.1 执行任务execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    //如果当前工作线程总数小于核心线程
    if (workerCountOf(c) < corePoolSize) {
        //那么会尝试新增一个核心线程执行当前任务
        //如果新增任务成功那么直接返回
        if (addWorker(command, true))
            return;
        //新增失败那么重新获取线程总数和线程池状态
        //新增失败可能是并发提交任务,当前线程尝试增加的时候已经达到核心线程数
        //或者尝试新增线程的时候,其他线程关闭了线程池
        c = ctl.get();
    }
    
    //如果是运行状态 且加入到了任务队列
    //上面新增核心线程失败,但是满足线程池是运行状态说明是并发提交任务核心线程数已经达到了
    if (isRunning(c) && workQueue.offer(command)) {
        //如果新增成功 重新获取程总数和线程池状态
        int recheck = ctl.get();
        //如果发现不行运行状态了尝试删除任务,说明加入后线程池被关了
        //isRunning(c) && workQueue.offer(command) 不是一个原子操作
        if (!isRunning(recheck) && remove(command))
            //如果成功从队列删除了任务,那么调用拒绝策略
            reject(command);
   
        //反之 添加一个非核心线程,尽可能保证队列中的任务会被执行
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //如果队列满了,或者说不是running 那么新增一个非核心线程
    //不是running 新增非核心线程也不是一定成功的
    //如果新增非核心失败 那么调用拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

这里新增工作线程调用了addWorker(执行的任务,是否核心线程)

  1. if (workerCountO(c) < corePoolSize)这个分支说明了,当线程池中线程数没有达到核心线程数的时候都是无脑增加核心线程,但是调用addWorker增加线程的时候是要获取全局锁mainLock的。如果新增核心线程成功了那么直接返回,反之会重新获取以下线程池的状态,这里失败,可能if (workerCountO(c) < corePoolSize)if (addWorker(command, true))不是一个原子操作,中间可能被其他线程更改了线程池运行状态,或者说多线程并发调用addWorker增加核心线程数,核心线程达到了corePoolSize
  2. if (isRunning(c) && workQueue.offer(command)) 来到这里可能时新增核心线程失败,或者说一进来就已经达到了核心线程数,这里先检查时运行状态,然后向队列塞任务,这里的。同样的if (isRunning(c) && workQueue.offer(command))也不是一个原子操作,可能塞的一瞬间线程池不是running状态了,或者说任务阻塞队列满了(来到下面的3)。如果不是running了但是阻塞队列接受了任务,会进入下面的判断,重新检查线程池的状态如果不是running那么尝试删除任务,如果成功删除了任务,说明这个任务没有被工作线程拿去执行,这时候直接调用拒绝策略。
  3. 如果队列满了,或者说不是running了,来到else if (!addWorker(command, false))虽然不是running了能不能新增非核心线程addWorker方法会控制的,如果队列满了,会尝试新增非核心线程,这时候是奔着最大线程数去的
3.3.2 新增工作线程addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    //=========自旋部分开始=================
    for (;;) {
        
        int c = ctl.get();
        int rs = runStateOf(c);
		
        //如果大于等于SHUTDOWN 且 线程池不是SHUTDOWN 说明是STOP TIDYING TERMINATED这几种都是不接受新任务的
        //大于等于SHUTDOWN 且队列是空,这时候也不接受新任务,线程池即将关闭
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //自选
        for (;;) {
            int wc = workerCountOf(c);
            //如果大于(2^29)-1 直接不可新增线程,ctl 高三位状态低29位线程数 再多表示不了了
           // 如果表示新增核心线程 且大于核心线程 或者非核心但是大于最大线程数 返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //cas增加 工作线程数 这里只是更新ctl 不是真的增加一个线程
            //这样增加成功了才能退出 保证了线程数不会超过阈值
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //如果增加失败了重新看下状态,状态改变了,那么重新自旋
            //cas失败了,状态没变也会自选
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
   //=========自旋部分结束================= 
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //新建一个线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //上锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
           		//线程池状态
                int rs = runStateOf(ctl.get());
			   //如果小于 SHUTDOWN 说明是RUNNING
                //或者是SHUTDOWN 但是没有任务执行,说明是为了执行队列中的任务或者预热线程池
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
					//加到set集合
                    workers.add(w);
                    int s = workers.size();
                    //更新最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                //解锁
                mainLock.unlock();
            }
            //如果加入了set 启动worker
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果没有启动 说明线程池已经不接受新任务了,或者其他奇奇怪怪的异常
        if (! workerStarted)
            //尝试减少工作线程数 并且尝试关闭线程池
            addWorkerFailed(w);
    }
    //返回worker是否启动了
    return workerStarted;
}
  1. 自旋部分

    这部分的目的是保证池中线程数不超过核心线程数(如果新增核心线程),最大线程数(如果新增非核心线程)

    • 前置判断

       if (rs >= SHUTDOWN &&
                      ! (rs == SHUTDOWN &&
                         firstTask == null &&
                         ! workQueue.isEmpty()))
                      return false;
      

      1.STOP TIDYING TERMINATED这几种状态不接受新任务

      2.SHUTDOWN 状态但是提交新任务,直接返回false,SHUTDOWN 会处理队列的任务但是不接受新任务

      3.SHUTDOWN 状态且队列是空,这时候也不接受新任务,线程池即将关闭

    • 内部自旋自增工作线程数

      for (;;) {
          int wc = workerCountOf(c);
          if (wc >= CAPACITY ||
              wc >= (core ? corePoolSize : maximumPoolSize))
              return false;
          if (compareAndIncrementWorkerCount(c))
              break retry;
          c = ctl.get();  // Re-read ctl
          if (runStateOf(c) != rs)
              continue retry;
          // else CAS failed due to workerCount change; retry inner loop
      }
      

      首先没有新增线程之前的工作线程数, 不能大于等于最大容量,如果准备新增核心线程那么不能大于等于corePoolSize,如果准备新增非核心线程那么不能大于等于maximumPoolSize,然后来一个cas if (compareAndIncrementWorkerCount(c)),如果CAS表示成功增加了线程数,但是没有增加线程,相当于占了一个坑。那啥时候会失败昵——并发增加,cas失败,这时候会继续自旋。接着if (runStateOf(c) != rs)如果发现状态改变了continue retry从外层for开始执行,这样会重新判断线程状态,反之从内层for执行,判断线程数并产生重新增加。

      这一段自旋相当于一个乐观锁,当多线程一起提交任务的时候,大家都会进行自旋尝试增加工作线程数,doug lea不是在程序入口加排他锁,而是使用乐观锁,是因为后面还有new一个工作线程等操作,只需要乐观锁保证数目,new的部分是可以并发的。

  2. 增加工作线程

    在new完worker之后加锁将worker放入workers中,并更新最大线程数,然后解锁,再尝试启动工作线程,这里可以看到dougLea尽可能把锁的粒度控制到最小,new一个工作线程和启动工作线程是可以并发的,所有没有加锁。如果没有成功启动工作线程会进入addWorkerFailed方法

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
    

    这里从工作线程集合中删除工作线程,自旋cas减少工作线程数目,尝试关闭线程池(这个方法内部会判断线程池状态,并不是尝试关就一定会关)这一步就是上面操作的回滚

3.3.3 Worker工作线程内部类

Worker这个类继承了AQS实现了Runnable接口,继承Runnalbe 比较好理解,比较Worker的职责就是从阻塞队列中不断获取任务执行。但是为什么Worker为什么要继承AQS昵?继承AQS是为了处理中断,如果是由于获取阻塞队列任务而挂起的话,这时候是需要响应中断的,但是如果是正在运行任务的话,是不会响应中断的,这是一个策略问题。

1.初始化时设置state=-1只有在启动Worker后才会响应中断
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
void interruptIfStarted() {
    Thread t;
    //只有state 大于等于才会响应中断
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}
2.Worker#run
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    /// 这里会将 state置为0 interruptIfStarted 才可以中断
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
	   //从队列中获取任务 如果getTask的时候中断,不会中断当前线程获取任务
        while (task != null || (task = getTask()) != null) {
			//上锁 不响应中断 
            w.lock();
            //如果是STOP 或TIDYING 或TERMINATED 确保当前线程线程会被中断
            //如果池正在停止,请确保线程被中断;
            //如果没有,请确保线程不被中断。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //钩子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //钩子方法
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        //执行任务时抛出异常那么此为true 反之则为false
        completedAbruptly = false;
    } finally {
        //进入这里可能是执行业务逻辑错误,也可能时队列没有任务了
        processWorkerExit(w, completedAbruptly);
    }
}

关于中断

  • lock方法一旦获取了独占锁,表示当前线程正在执行任务中,如果正在执行任务,则不应该中断线程。

  • 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。

  • 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

  • 获取任务的时候不会由于中断而抛出中断异常退出自旋

    这部分逻辑在getTask方法中,后面我们讨论

  • 如果池停止了那么确保当前线程一定会被中断,如果池没有停止那么确保当前线程不被中断。(shutdownNow和shutdown会中断线程)

      if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
    
    • (runStateAtLeast(ctl.get(), STOP)&&!wt.isInterrupted()成立

      说明这个时候池状态至少STOP,这种情况下,拿到了队列中的任务还是是需要处理队中的任务的,所有这里这是中断当前线程wt.interrupt(),具体怎么应对中断这取决于业务逻辑代码。

    • (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted()这依据非常巧妙,如果成立了,说明池停止了,但是Thread.interrupted()重置了中断标识这时候!wt.isInterrupted()就是true,那么会执行wt.interrupt()补上中断标识。如果不成立,说明池没有停止那么Thread.interrupted()复位了中断标识,保证了池没有停止的时候线程没有被中断

回收线程

如果线程执行业务代码抛出异常,或者说队列中没有任务了,或者说当前线程空闲太久没有执行任务,会回收工作线程

private void processWorkerExit(Worker w, boolean completedAbruptly) {
	//执行业务逻辑错误 那么自旋减少工作线程数 
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
	
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //完成任务数据加到线程池完成任务数上
        completedTaskCount += w.completedTasks;
		//回收当前线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
	//尝试关闭线程池,会中断一个工作线程,“传播”关闭信号,运行状态的线程池不会受影响
    tryTerminate();

    int c = ctl.get();
    //SHUTDOWN RUNNING 状态
    if (runStateLessThan(c, STOP)) {
		//当前线程不是执行任务业务逻辑错误 
        if (!completedAbruptly) {
            //如果运行核心线程回收那么为 最小为0 反之线程池保证最起码有核心线程数个线程
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
			// 队列还有任务那么流一个线程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //如果大于最小 那么结束
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //如果是业务异常退出 
        //或者不是业务逻辑就是没有拿到任务,但是线程池个数少于min那么新增一个工作线程
        addWorker(null, false);
    }
}
3.3.4 从阻塞队列中获取任务getTask
private Runnable getTask() {
    //获取任务是否超时
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果线程池为STOP  TIDYING TERMINATED 那么cas减小线程数 return
        //如果SHUTDOWN 但是队列存在任务 不会cas减少,那么不会return 
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        
        int wc = workerCountOf(c);
        //如果允许核心线程超时被回收 那么为true 或者工作线程大于核心线程数会没有任务的时候会减少到核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        //如果工作线程大于最大核心数 或者 允许过期且获取任务超时
        if ((wc > maximumPoolSize || (timed && timedOut))
            //如果队列不是空至少保证wc大于1 那么减少后工作线程至少为1
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            //如果CAS失败那么继续自旋
            continue;
        }
        try {
            //如果允许核心线程超时,且当前工作线程小于核心线程数 那么会使用响应中断但是无限等待的take方法,反之使用超时等待的pool
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                //获取到任务 那么直接返回任务
                return r;
            //反之说明超时没有获取到任务
            timedOut = true;
        } catch (InterruptedException retry) {
            //如果被中断那么把超时置为false 继续自旋
            timedOut = false;
        }
    }

这里我们可以看到getTask是不会响应中断的,如果在poll或者take的时候被中断会设置timedOut=fasle然后继续自旋,也就是说getTask返回要么是拿到了任务,要么是超时没有获取到任务说明当前线程空闲需要被回收,但是这个空闲需要被回收有一个限制——要么允许核心线程空闲被回收,要么当前工作线程数大于核心线程数。这里对中断的处理的目的是,那怕线程池被shutdown不会中断getTask的线程,因为shutdown需要确保队列中的任务要被处理完。

3.3.5 shutdown,shutDownNow
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //确保状态至少为SHUTDOWN,因为终止的同时新增线程数的可能这里使用自旋+cas
        advanceRunState(SHUTDOWN);
        //中断所有的空闲工作线程,会首先获取锁,保证工作线程是空闲才会中断,如果正在运行说明在处理队列中的任务不会中断工作线程,这里没有中断一个空闲的线程是为了让多余的worker就会立即退出,而不是等待一个落后的任务完成
        interruptIdleWorkers();
        //钩子函数
        onShutdown(); 
    } finally {
        mainLock.unlock();
    }
    //尝试终止线程池
    tryTerminate();
}

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        //注意这里 shutdown是interruptIdleWorkers
        //这个方法比较无脑直接中断中断线程
        interruptWorkers();
        //不同点在于 这儿 这里会把所有未完成的任务放到集合中返回
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdown关闭线程池,但是会让工作线程处理完队列中任务,但是shutdownNow会调用drainQueue排空工作队列中的任务,也就是说被排空的任务不会被执行。

3.3.6 interruptIdleWorkers,interruptWorkers 和 interruptIfStarted 中断worker
  • interruptIdleWorkers

worker.tryLock成功才会中断工作线程,tryLock成功意味着worker处于空闲状态(worker一旦启动就会调用lock方法,表示处于工作状态)

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //注意这里,只有worker没有被中断,且 尝试上锁成功后才会中断worker
            //tryLock 成功那么worker的独占锁被获取了,后续worker就无法运行了
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
  • interruptWorkers 中断所有启动的工作线程,无论是否空闲
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //获取全局锁循环遍历每一个worker,这里全局锁的获取可以避免提交任务是新建线程加入到workers 集合中
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    //这里判断了state大于等于0 才能中断,初始化worker的时候置为-1 只有worker启动后才可以中断
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}
3.3.7 尝试终止线程池tryTerminate
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //如果是运行状态 或者已经为TIDYING或者 TERMINATED
        //或者是SHUTDOWN 但是队列还有任务需要执行那么直接返回
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        
        //不符合上面if 且工作线程不为0 那么中断其中一个,说明此时为shutdown,且队列为空
        //后续这个被中断的线程会中断后续的工作线程
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
		
        //锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //设置为TIDYING 工作线程数为0 
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //调用terminated 钩子方法
                    terminated();
                } finally {
                    //设置成工作线程数为0 状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    //唤醒在termination 上等待的线程 
                    //如果调用awaitTermination会阻塞直到线程池关闭
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }

这里可以看到,如果为shutdown,但是队列还有任务让队列的任务得到处理。这里比较不好理解的是interruptIdleWorkers(ONLY_ONE)中断一个线程,为什么说中断一个线程后续会传播到其他线程昵,如果在shutdown调用tryTerminate执行中断一个线程的这一瞬间,这个线程在getTask那么不好有啥影响,他会继续处理队列中的任务(如果是shutdownNow,会把阻塞中的任务排空,那么当前这个线程可能拿不到任务,也可能在排空之前拿到任务继续执行),getTask如果为空 那么会执行processWorkerExit后续还是会调用tryTerminate,如果getTask不为null,执行任务的途中发现当前线程池处于关闭状态为设置当前线程中断表示,然后继续允许任务的业务逻辑,是否抛出中断异常取决业务逻辑代码,如何抛出异常那么也会执行processWorkerExit后续还是会调用tryTerminate,如果没有抛出异常那么继续getTask,一进来直接判断是否处于关闭,如果是Stop及以上那么直接减少线程数返回null会首先从,如果shutdown会保证队列中的任务被执行完,后续执行完还是会调用processWorkerExit中的tryTerminate

3.3.8 阻塞直到线程池被终止,或者被中断,或者超时
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            //如果调用此方法的时候已经是TERMINATED 返回ture
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            //如果等待时间小于0 返回false
            if (nanos <= 0)
                return false;
            
            //超时等待 中断抛出中断异常 超时抛出超时异常
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

这里说的线程池被终止是指执行完shutdown,shutdownNow,也就是说执行完tryTerminate中的terminated并且设置线程池为TERMINATED,后续才会被唤醒,这里涉及到AQS的等待队列——JUC源码学习笔记3——AQS等待队列和CyclicBarrier,BlockingQueue

3.3.9 拒绝策略
  • CallerRunsPolicy 由提交任务的线程执行任务,如果线程池是STOP,TIDYING,TERMINATED 那么一声不吭的抛弃任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		//如果不是RUNNING
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
  • AbortPolicy 直接抛出RejectedExecutionException
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }

  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
  • DiscardPolicy 悄无声息的忽略任务 什么都不做忽略任务
public static class DiscardPolicy implements RejectedExecutionHandler {
   
    public DiscardPolicy() { }

 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
  • DiscardOldestPolicy 如果线程池没有被关闭那么丢弃队列头部的任务,然后提交此任务

这里其实不一定是等待最久的任务,如果是优先级队列,意味着是优先级最高的任务

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

四丶Executors中的线程池

Executors 和 Executor的关系类似于Collection 和Collections的关系,Executors 是一个工具类,下面我们看下里面提供的一些实用线程池

  • newFixedThreadPool 固定数目工作线程,无界任务阻塞队列(可以容纳int最大个任务)的线程池——容易oom,如果请求量大容易操作阻塞队列积压过多任务造成oom

    虽然这里keepAliveTime是0,但是线程池中的线程都是核心线程,核心线程默认不允许回收

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • newSingleThreadExecutor 单线程,无界任务阻塞队列的线程池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
 FinalizableDelegatedExecutorService ——线程池被回收的时候会调用shutdown方法,并且只暴露了ExecutorService接口中的方法给我们调用
  • newCachedThreadPool,支持工作线程数达到Integer.MAX_VALUE,且空闲时间达到60秒那么就会被回收,使用的是SynchronousQueue不会容纳任何任务,每一个任务提交之后都必须有另外一个线程获取任务——线程多并不意味着效率高,上下文的切换,线程的new 和消耗都是消耗大量资源的,支持Integer.MAX_VALUE个线程显然也是不符合实际的
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • newScheduledThreadPool 定时执行任务的线程池,还有单线程定时执行任务的线程池

使用的是ScheduledThreadPoolExecutor,后续整理这部分的源码

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
  • unconfigurableExecutorService 禁止调整线程池参数配置的线程池

  • newWorkStealingPool 工作窃取池,使用的是ForkJoinPool,后续会学习这部分源码


分享:

低价透明

统一报价,无隐形消费

金牌服务

一对一专属顾问7*24小时金牌服务

信息保密

个人信息安全有保障

售后无忧

服务出问题客服经理全程跟进