20 计划任务

自JDK 1.5 开始,JDK提供了ScheduledThreadPoolExecutor类用于计划任务(又称定时任务),这个类有两个用途:

  • 在给定的延迟之后运行任务

  • 周期性重复执行任务

在这之前,是使用Timer类来完成定时任务的,但是Timer有缺陷:

  • Timer是单线程模式;

  • 如果在执行任务期间某个TimerTask耗时较久,那么就会影响其它任务的调度;

  • Timer的任务调度是基于绝对时间的,对系统时间敏感;

  • Timer不会捕获执行TimerTask时所抛出的异常,由于Timer是单线程,所以一旦出现异常,则线程就会终止,其他任务也得不到执行。

所以JDK 1.5之后,大家就摒弃Timer,使用ScheduledThreadPoolExecutor吧。

20.1 使用案例

假设我有一个需求,指定时间给大家发送消息。那么我们会将消息(包含发送时间)存储在数据库中,然后想用一个定时任务,每隔1秒检查数据库在当前时间有没有需要发送的消息,那这个计划任务怎么写?下面是一个Demo:

public class ThreadPool {

    private static final ScheduledExecutorService executor = new
        ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());

    private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args){
        // 新建一个固定延迟时间的计划任务
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                if (haveMsgAtCurrentTime()) {
                    System.out.println(df.format(new Date()));
                    System.out.println("大家注意了,我要发消息了");
                }
            }
        }, 1, 1, TimeUnit.SECONDS);
    }

    public static boolean haveMsgAtCurrentTime(){
        //查询数据库,有没有当前时间需要发送的消息
        //这里省略实现,直接返回true
        return true;
    }
}

下面截取前面的输出(这个demo会一直运行下去):

这就是ScheduledThreadPoolExecutor的一个简单运用,想要知道奥秘,接下来的东西需要仔细的看哦。

20.2 类结构

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,实现了ScheduledExecutorService。 线程池在之前的章节介绍过了,我们先看看ScheduledExecutorService

ScheduledExecutorService实现了ExecutorService ,并增加若干定时相关的接口。 前两个方法用于单次调度执行任务,区别是有没有返回值。

重点理解一下后面两个方法:

  • scheduleAtFixedRate

    该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。注意,period是从任务开始执行算起的。开始执行任务后,定时器每隔period时长检查该任务是否完成,如果完成则再次启动任务,否则等该任务结束后才再次启动任务。

  • scheduleWithFixDelay

    该方法在initialDelay时长后第一次执行任务,以后每当任务执行完成后,等待delay时长,再次执行任务。

20.3 主要方法介绍

20.3.1 schedule

我们先看看里面涉及到的几个类和接口ScheduledFutureRunnableScheduledFutureScheduledFutureTask的关系:

类图

我们先看看这几个接口和类:

Delayed接口

Delayed接口很简单,继承了Comparable接口,表示对象是可以比较排序的。

ScheduledFuture接口

没有添加其他方法。

RunnableScheduledFuture接口

ScheduledFutureTask类

回到schecule方法中,它创建了一个ScheduledFutureTask的对象,由上面的关系图可知,ScheduledFutureTask直接或者间接实现了很多接口,一起看看ScheduledFutureTask里面的实现方法吧。

构造方法

Delayed接口的实现

Comparable接口的实现

setNextRunTime

Runnable接口实现

总结一下run方法的执行过程:

  1. 如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行步骤2;

  2. 如果不是周期性任务,调用FutureTask中的run方法执行,会设置执行结果,然后直接返回,否则执行步骤3;

  3. 如果是周期性任务,调用FutureTask中的runAndReset方法执行,不会设置执行结果,然后直接返回,否则执行步骤4和步骤5;

  4. 计算下次执行该任务的具体时间;

  5. 重复执行任务。

runAndReset方法是为任务多次执行而设计的。runAndReset方法执行完任务后不会设置任务的执行结果,也不会去更新任务的状态,维持任务的状态为初始状态(NEW状态),这也是该方法和FutureTaskrun方法的区别。

20.3.2 scheduledAtFixedRate

我们看一下代码:

scheduleAtFixedRate这个方法和schedule类似,不同点是scheduleAtFixedRate方法内部创建的是ScheduledFutureTask,带有初始延时和固定周期的任务 。

20.3.3 scheduledAtFixedDelay

FixedDelay也是通过ScheduledFutureTask体现的,唯一不同的地方在于创建的ScheduledFutureTask不同 。这里不再展示源码。

20.3.4 delayedExecute

前面讲到的schedulescheduleAtFixedRatescheduleAtFixedDelay最后都调用了delayedExecute方法,该方法是定时任务执行的主要方法。 一起来看看源码:

delayedExecute方法的逻辑也很简单,主要就是将任务添加到等待队列,然后调用ensurePrestart方法。

ensurePrestart方法主要是调用了addWorker,线程池中的工作线程是通过该方法来启动并执行任务的。 具体可以查看前面讲的线程池章节。

对于ScheduledThreadPoolExecutorworker添加到线程池后会在等待队列上等待获取任务,这点是和ThreadPoolExecutor一致的。但是worker是怎么从等待队列取定时任务的?

因为ScheduledThreadPoolExecutor使用了DelayedWorkQueue保存等待的任务,该等待队列队首应该保存的是最近将要执行的任务,如果队首任务的开始执行时间还未到,worker也应该继续等待。

20.4 DelayedWorkQueue

ScheduledThreadPoolExecutor使用了DelayedWorkQueue保存等待的任务。

该等待队列队首应该保存的是最近将要执行的任务,所以worker只关心队首任务即可,如果队首任务的开始执行时间还未到,worker也应该继续等待。

DelayedWorkQueue是一个无界优先队列,使用数组存储,底层是使用堆结构来实现优先队列的功能。我们先看看DelayedWorkQueue的声明和成员变量:

当一个线程成为leader,它只要等待队首任务的delay时间即可,其他线程会无条件等待。leader取到任务返回前要通知其他线程,直到有线程成为新的leader。每当队首的定时任务被其他更早需要执行的任务替换时,leader设置为null,其他等待的线程(被当前leader通知)和当前的leader重新竞争成为leader。

同时,定义了锁lock和监视器available用于线程竞争成为leader。

当一个新的任务成为队首,或者需要有新的线程成为leader时,available监视器上的线程将会被通知,然后竞争称为leader线程。 有些类似于生产者-消费者模式。

接下来看看DelayedWorkQueue中几个比较重要的方法

20.4.1 take

take方法是什么时候调用的呢?在线程池的章节中,介绍了getTask方法,工作线程会循环地从workQueue中取任务。但计划任务却不同,因为如果一旦getTask方法取出了任务就开始执行了,而这时可能还没有到执行的时间,所以在take方法中,要保证只有在到指定的执行时间的时候任务才可以被取走。

总结一下流程:

  1. 如果堆顶元素为空,在available条件上等待。

  2. 如果堆顶任务的执行时间已到,将堆顶元素替换为堆的最后一个元素并调整堆使其满足最小堆特性,同时设置任务在堆中索引为-1,返回该任务。

  3. 如果leader不为空,说明已经有线程成为leader了,其他线程都要在available监视器上等待。

  4. 如果leader为空,当前线程成为新的leader,并等待直到堆顶任务执行时间到达。

  5. take方法返回之前,将leader设置为空,并通知其他线程。

再来说一下leader的作用,这里的leader是为了减少不必要的定时等待,当一个线程成为leader时,它只等待下一个节点的时间间隔,但其它线程无限期等待。 leader线程必须在从take()poll()返回之前signal其它线程,除非其他线程成为了leader。

举例来说,如果没有leader,那么在执行take时,都要执行available.awaitNanos(delay),假设当前线程执行了该段代码,这时还没有signal,第二个线程也执行了该段代码,则第二个线程也要被阻塞。但只有一个线程返回队首任务,其他的线程在awaitNanos(delay)之后,继续执行for循环,因为队首任务已经被返回了,所以这个时候的for循环拿到的队首任务是新的,又需要重新判断时间,又要继续阻塞。

所以,为了不让多个线程频繁的做无用的定时等待,这里增加了leader,如果leader不为空,则说明队列中第一个节点已经在等待出队,这时其它的线程会一直阻塞,减少了无用的阻塞(注意,在finally中调用了signal()来唤醒一个线程,而不是signalAll())。

20.4.2 offer

该方法往队列插入一个值,返回是否成功插入 。

在堆中插入了一个节点,这个时候堆有可能不满足最小堆的定义,siftUp用于将堆调整为最小堆,这属于数据结构的基本内容,本文不做介绍。

20.5 总结

内部使用优化的DelayQueue来实现,由于使用队列来实现定时器,有出入队调整堆等操作,所以定时并不是非常非常精确。

参考资料

Last updated

Was this helpful?