线程池
线程池是一组线程的集合。线程池维护一个队列,调用者向这个队列中添加任务,而线程池中的线程则不停地从队列中取出任务执行。
线程池的继承关系如下图,其中ThreadPoolExecutor和ScheduledThreadPoolExecutor是具体的实现。
ThreadPoolExecutor
基本的线程池,可以提交无返回值和有返回值的任务执行。重载的构造方法最长有七个参数,分比是核心线程(常驻线程)个数,最大线程个数(当核心线程已满,任务队列也满,最大扩充线程的个数),非核心线程空闲的时间,时间单位,阻塞任务队列,线程生产工厂,饱和策略(用来处理核心线程已满,最大线程已满,任务队列满时的新到来任务的策略,也叫拒绝策略)。
其中,饱和策略共四种:
1.DiscardOldestPolicy,抛弃最早的任务,将新任务加入队列。
2.AbortPolicy,拒绝执行新任务,并抛出异常。
3.CallerRunsPolicy,交由调用者线程执行新任务,如果调用者线程已关闭,则抛弃任务。
4.DiscardPolicy,直接抛弃新任务。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
使用
一般在使用线程池时需要自行定制,一般来说普遍的理论是:
如果是CPU密集型任务:那么核心线程数 = CPU核心数 + 1;加1是为了防止突发情况导致某个线程不可执行,最大化利用CPU。
如果是IO密集型任务:那么核心线程数 = CPU核心数 * 2;
实际在应用过程中许多任务也分不出是IO密集还是CPU密集,上述理论也不一定是最好的,如果对性能追求极致,需要自行探索,也可以通过线程池的一些set方法动态修改核心线程数,空闲时间等参数。
下面是一个简单的例子:
package com.example.demo;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class MyThreadPool {private static final ThreadPoolExecutor executor;//线程所属的线程组private static final ThreadGroup group = new ThreadGroup("my-group");//线程名字的前缀private static final String prefix = "my-thread-";//线程计数器private static final AtomicInteger atomicInteger=new AtomicInteger(0);static {int core = Runtime.getRuntime().availableProcessors();ThreadFactory factory= r -> new Thread(group, r, prefix+atomicInteger.incrementAndGet());executor = new ThreadPoolExecutor(core,core*2,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>(32),factory,//一共四种策略,这里采用调用者执行策略new ThreadPoolExecutor.CallerRunsPolicy());//虚拟机退出时关闭线程池Runtime.getRuntime().addShutdownHook(new Thread(){@Overridepublic void run() {executor.shutdown();try {boolean flag;do {//一直等待线程池关闭完成flag = executor.awaitTermination(1,TimeUnit.SECONDS);} while (!flag);} catch (Exception e){e.printStackTrace();}}});}public static ThreadPoolExecutor getExecutor(){return executor;}
}
测试自定义的线程池
package com.example.demo;import org.junit.Test;import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class TestClass {Random random = new Random();@Testpublic void test() throws InterruptedException {//采用闭锁等待所有线程池线程执行完成CountDownLatch latch=new CountDownLatch(50);ThreadPoolExecutor executor = MyThreadPool.getExecutor();for (int i=0;i<50;i++) {final int j = i;executor.execute(()->{try {System.out.println(String.format("我是线程:%s,执行第%d个任务",Thread.currentThread().getName(),j));TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();} finally {latch.countDown();}});}latch.await();}
}
实现原理
基本结构
部分源码如下,包含了主要的属性
public class ThreadPoolExecutor extends AbstractExecutorService {//线程池状态控制变量,一个int包含两部分,低29位为workCount,高3位为runState,初始为RUNNING状态,workCount=0private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;//workCount最大2^29-1private static final int CAPACITY = (1 << COUNT_BITS) - 1;//线程池运行的状态,共5种private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int ctlOf(int rs, int wc) { return rs | wc; }//任务队列private final BlockingQueue<Runnable> workQueue;//访问变量用的互斥锁private final ReentrantLock mainLock = new ReentrantLock();//任务线程的集合private final HashSet<Worker> workers = new HashSet<Worker>();//awaitTermination()方法的条件变量private final Condition termination = mainLock.newCondition();public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null : AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
}
execute方法
分3步进行:
1、当前工作线程数与corePoolSize比较,如果小于则尝试启动一个新线程,使用给定的参数command作为其第一个任务,否则执行2步骤
2、如果任务能够成功入队,则再次检查运行状态,如果不是运行状态则从队列中移除任务;如果不能入队则执行3步骤
3、如果不能将任务入队,则尝试添加一个新的非核心线程。如果增加失败了,则执行拒绝策略。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//判断线程个数是否小于核心线程数if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果是运行状态,则入队,任务队列没满则入队成功if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再检查状态是不是运行状态,不是则尝试从队列中移除任务,移除成功则执行饱和策略if (! isRunning(recheck) && remove(command))reject(command);//启动一个线程,什么都不做else if (workerCountOf(recheck) == 0)addWorker(null, false);}//核心线程、任务队列都满了,增加非核心线程,如果也增加失败,则执行饱和策略else if (!addWorker(command, false))reject(command);
}private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();//获取运行状态int rs = runStateOf(c);// 运行状态>= SHUTDOWN,说明线程池进入关闭状态if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;for (;;) {//获取workCountint wc = workerCountOf(c);//线程超出个数,返回失败if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//CAS操作,将workCount+1,成功则退出循环,否则重新读取控制变量if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl//CAS失败,如果状态不变继续进行内部for循环,如果变了则跳到外部for循环重新开始if (runStateOf(c) != rs)continue retry;}}//标识任务已启动boolean workerStarted = false;//标识任务已新增boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//线程是活跃状态if (t.isAlive())throw new IllegalThreadStateException();workers.add(w);int s = workers.size();//更新最大线程数if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//如果新增成功,则启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {//线程没启动成功,当做增加失败处理,addWorkerFailed内部把workCount减1if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
submit方法
submit方法定义在AbstractExecutorService中:
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();//封装成RunnableFuture执行,RunnableFuture实现了RunnableRunnableFuture<T> ftask = newTaskFor(task);//执行execute(ftask);return ftask;
}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}
Callable
在线程池中执行的任务有两种类型,一种是无参无返回值的,例如Runnable类型的,还有一种是无参有返回值的,即为Callable类型的。Callable接口如下定义:
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}
Worker
任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {//......./** 运行worker的线程. */final Thread thread;/** 接收的第一个任务 */Runnable firstTask;/** 执行完成的任务个数 */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); //初始为RUNNING状态this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}//.......
}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//不停地从队列中取出任务while (task != null || (task = getTask()) != null) {w.lock();//执行任务之前先上锁,AQS实现//检测线程池状态,如果状态>=STOP,则线程给自己发中断信号if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//钩子方法beforeExecute(wt, task);Throwable thrown = null;try {//执行task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//钩子函数afterExecute(task, thrown);}} finally {task = null;//完成的任务数+1w.completedTasks++;w.unlock();}}//可根据此变量判断Worker以何种方式退出(正常,中断,其他异常)completedAbruptly = false;} finally {//清理workerprocessWorkerExit(w, completedAbruptly);}
}
钩子方法
ThreadPoolExecutor提供了下列的钩子方法,如果要继承ThreadPoolExecutor实现自己的线程池,可以考虑实现下列方法。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
关闭线程池
当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。
状态转换过程
线程池有两个关闭函数,shutdown()和shutdownNow(),这两个函数会让线程池切换到不同的状态。但无论是哪个状态,在队列为空且线程池也为空之后,会切换为TIDYING 状态;最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才算真正关闭。
状态迁移是从小到大,按照-1,0,1,2,3的顺序,只会正向迁移,不能颠倒。
两种shutdown的区别
shutdown()方法不会清空任务队列,会等所有任务执行完成,而shutdownNow()会清空任务队列。
shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程。
tryTerminate()不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子函数terminated()。当钩子函数执行完成时,把状态从TIDYING 改为TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。
部分源码如下:
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//检查是否有关闭线程池的权限checkShutdownAccess();//CAS操作,将线程池状态迁移为SHUTDOWNadvanceRunState(SHUTDOWN);//关闭空闲的线程interruptIdleWorkers();//给ScheduledThreadPoolExecutor留的钩子方法onShutdown();} finally {mainLock.unlock();}//迁移到TERMINATED状态tryTerminate();
}
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//检查是否有关闭线程池的权限checkShutdownAccess();//CAS操作,将线程池状态迁移为STOPadvanceRunState(STOP);//关闭所有线程interruptWorkers();//移除任务队列的元素并返回它们tasks = drainQueue();} finally {mainLock.unlock();}//迁移到TERMINATED状态tryTerminate();return tasks;
}private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;//线程未中断且Worker尝试加锁成功,说明线程处于空闲状态,如果加锁失败,说明线程当前持有锁,正在执行任务if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//中断所有线程for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}
}final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) {interruptIdleWorkers(ONLY_ONE);return;}//任务队列为空,workCount=0会走到这里final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//迁移到TIDYING状态if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//调用钩子函数terminated();} finally {//迁移到TERMINATED状态ctl.set(ctlOf(TERMINATED, 0));//通知等待的awaitTermination()方法termination.signalAll();}return;}} finally {mainLock.unlock();}// 迁移状态失败则重新循环}
}
推荐的关闭方式
executor.shutdown();
try {boolean flag;do {//一直等待线程池关闭完成flag = executor.awaitTermination(1,TimeUnit.SECONDS);} while (!flag);
} catch (Exception e){e.printStackTrace();
}
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,除了基本的线程池功能,还实现了
ScheduledExecutorService,可以延迟执行任务或按频率进行任务调度,具体体现在以下两方面:
1、延迟执行任务
2、周期执行任务
对应的方法如下:
//延迟执行任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit);
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit);//周期执行任务,按固定频率执行,与任务本身执行时间无关,但是任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,
TimeUnit unit);
//周期执行任务,按固定间隔执行,与任务本身执行时间有关。假如任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,TimeUnit unit);
使用
调度线程池的构建比较简单,重载的构造方法最多需要3个参数,分别是核心线程数,线程工厂和饱和策略。下面简单演示了几种延迟调度的方式:
public class TestClass {private static final ThreadGroup group = new ThreadGroup("schedule-group");private static final String prefix = "schedule-thread-";private static final AtomicInteger atomicInteger=new AtomicInteger(0);private static final ThreadLocal<DateTimeFormatter> timeFormatters=new ThreadLocal();private DateTimeFormatter getFormatter(){DateTimeFormatter formatter = timeFormatters.get();if (Objects.isNull(formatter)){DateTimeFormatter formatter1=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");timeFormatters.set(formatter1);return formatter1;}return formatter;}@Testpublic void schedule(){ThreadFactory factory= r -> new Thread(group, r ,prefix+atomicInteger.incrementAndGet());int i = Runtime.getRuntime().availableProcessors();ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(i, factory, new ThreadPoolExecutor.CallerRunsPolicy());System.out.println("启动调度线程池:"+getFormatter().format(LocalDateTime.now()));//延迟执行,执行一次ScheduledFuture<String> delay = executor.schedule(() -> "单次延迟执行开始时间:"+getFormatter().format(LocalDateTime.now()),3, TimeUnit.SECONDS);//固定频率执行executor.scheduleAtFixedRate(() ->System.out.println("固定频率开始执行:"+getFormatter().format(LocalDateTime.now())),1,3, TimeUnit.SECONDS);//固定延迟执行executor.scheduleWithFixedDelay(()->System.out.println("固定延迟开始执行:"+getFormatter().format(LocalDateTime.now())),1,3,TimeUnit.SECONDS);while (!delay.isDone()) {try {String s = delay.get();System.out.println("单次延迟执行结果:"+s);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}break;}while (true){}}
}
实现原理
构造方法调用了ThreadPoolExecutor的构造方法,最大线程个数设置为了int的最大值,空闲时间也设为0,任务队列也固定为一个自定义延迟队列。
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);}
基本结构
DelayedWorkQueue
ScheduledThreadPoolExecutor的任务队列使用了一个自定义的延迟队列DelayedWorkQueue,
跟普通的延迟队列相差不大,这个静态内部类定义如下:
static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {private static final int INITIAL_CAPACITY = 16;//RunnableScheduledFuture的唯一实现是ScheduledFutureTaskprivate RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private final ReentrantLock lock = new ReentrantLock();private int size = 0;//队头等待的线程private Thread leader = null;//当队列头部有新的任务可用或新线程可能需要成为leader时发出的条件private final Condition available = lock.newCondition();//......
}
ScheduledFutureTask
ScheduledThreadPoolExecutor还有一个内部类ScheduledFutureTask,实现如下:
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {//任务序号private final long sequenceNumber;//以纳秒为单位,启动任务的时间(周期任务启动时间一直会累加)private long time;//周期(以纳秒为单位)用于重复任务。正值表示固定速率执行。负值表示固定延迟执行。0表示非重复任务private final long period;//通过reExecutePeriodic()方法重新加入队列的实际任务RunnableScheduledFuture<V> outerTask = this;//当前任务在延迟队列中的索引,用于更快的取消任务int heapIndex;ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}ScheduledFutureTask(Callable<V> callable, long ns) {super(callable);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}public int compareTo(Delayed other) {if (other == this) return 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}public boolean isPeriodic() {return period != 0;}private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}public boolean cancel(boolean mayInterruptIfRunning) {boolean cancelled = super.cancel(mayInterruptIfRunning);if (cancelled && removeOnCancel && heapIndex >= 0)remove(this);return cancelled;}public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();//设置下一次启动的时间reExecutePeriodic(outerTask);//重新入队}}
}
schedule方法(Runnable)
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();//把Runnable包装成RunnableScheduledFuture,ScheduledFutureTask是其实现RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));delayedExecute(t);return t;
}protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {return task;
}private void delayedExecute(RunnableScheduledFuture<?> task) {//线程池关闭则执行拒绝策略if (isShutdown())reject(task);else {//把任务放入延迟队列super.getQueue().add(task);//线程池关闭或者 当前线程池状态不能运行任务,且从队列中移除任务成功,则取消任务if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();}
}void ensurePrestart() {int wc = workerCountOf(ctl.get());//当小于核心线程数只开启新线程(不运行任务)if (wc < corePoolSize)addWorker(null, true);//即使corePoolSize=0,当第一次运行时,也启动一个新线程else if (wc == 0)addWorker(null, false);
}
scheduleWithFixedDelay和scheduleAtFixedRate方法
两个方法实现几乎一模一样,唯一的区别是ScheduledFutureTask的构造方法一个是正数,一个是负数,在ScheduledFutureTask中,正值表示固定速率执行。负值表示固定延迟执行。0表示非重复任务。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));//负数RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;
}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));//正数RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;
}private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
Future
Future的继承关系如下图所示
RunnableFuture
接口定义如下,继承了Runnable和Future两个接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {void run();
}
FutureTask
这个类是RunnableFuture的实现,内部存有一个Callable,作为一个适配器,将Callable转换为Runnable执行。
基本结构
public class FutureTask<V> implements RunnableFuture<V> {//任务状态private volatile int state;//初始状态private static final int NEW = 0;//中间状态private static final int COMPLETING = 1;//以下都是最终状态private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;//中间状态private static final int INTERRUPTING = 5;//最终状态private static final int INTERRUPTED = 6;/** 实际要执行的Callable,通过构造函数传入 */private Callable<V> callable;/** 存放执行结果或者抛出的异常 */private Object outcome;/** 运行Callable的线程 */private volatile Thread runner;/** 在get方法等待的线程链表 */private volatile WaitNode waiters;@SuppressWarnings("unchecked")private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; }public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; }static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}
}
run方法
public void run() {//任务状态不是NEW 或者 其他线程在执行任务,结束方法if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//执行result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;//异常存入outcome变量setException(ex);}if (ran)//返回值存入outcome变量set(result);}} finally {//置空runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)//处理中断handlePossibleCancellationInterrupt(s);}
}
//状态 NEW->COMPLETING->EXCEPTIONAL
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state//通知等待的线程finishCompletion();}
}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//通知等待的线程finishCompletion();}
}private void handlePossibleCancellationInterrupt(int s) {// It is possible for our interrupter to stall before getting a// chance to interrupt us. Let's spin-wait patiently.if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); // wait out pending interrupt// assert state == INTERRUPTED;// We want to clear any interrupt we may have received from// cancel(true). However, it is permissible to use interrupts// as an independent mechanism for a task to communicate with// its caller, and there is no way to clear only the// cancellation interrupt.//// Thread.interrupted();
}private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;//唤醒LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; q = next;}break;}}//空实现done();callable = null;
}
get方法
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)//不是正在完成状态,无限期等待s = awaitDone(false, 0L);return report(s);
}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//等待指定时间throw new TimeoutException();return report(s);
}
//返回结果
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {//线程中断了则从链表中移除removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // 正处于中间状态Thread.yield();else if (q == null)q = new WaitNode();else if (!queued)//入队queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}else//阻塞LockSupport.park(this);}
}
Executors
这个类是Executor框架的辅助工具类,用来创建预定义的线程池和一些任务(Callable)。在实际环境下,不建议直接使用此工具创建的线程池,特别是无界阻塞队列和不限制线程个数的实现,容易造成OOM,资源耗尽等严重问题。
//每来一个任务,就创建一个线程
ExecutorService executorService1 = Executors.newCachedThreadPool();
//固定线程数的线程池
ExecutorService executorService2 = Executors.newFixedThreadPool(6);
//多线程的调度线程池
ExecutorService executorService3 = Executors.newScheduledThreadPool(8);
//单线程线程池
ExecutorService executorService4 = Executors.newSingleThreadExecutor();
//单线程调度线程池
ExecutorService executorService5 = Executors.newSingleThreadScheduledExecutor();
//forkjoin线程池
ExecutorService executorService6 = Executors.newWorkStealingPool();