您好,欢迎访问代理记账网站
  • 价格透明
  • 信息保密
  • 进度掌控
  • 售后无忧

Xxl-Job调度器原理解析

项目解析源码地址: https://gitee.com/lidishan/xxl-job-code-analysis
xxl-job版本:2.3.0
Xxl-Job分为执行器、调度器。而我们平时的客户端就属于一个执行器,执行器启动的时候会自动注册到调度器上,然后调度器进行远程调度。
 
调度器初始化过程步骤如下
1 国际化相关
配置参数: xxl.job.i18n=zh_CN, 这里设置为中文简体
 
2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool
配置参数:xxl.job.triggerpool.fast.max=200, 这里设置为fastTriggerPool的最大线程数=200, 不能小于200
         xxl.job.triggerpool.slow.max=100, 这里设置为slowTriggerPool的最大线程数=100, 不能小于100
 
3 启动注册监听线程
3.1 初始化registryOrRemoveThreadPool线程池:用于 注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销
3.2 启动监听注册的线程registryMonitorThread: 清除心跳超过90s的注册信息,并且刷新分组注册信息
 
4 启动失败任务监听线程(重试、告警)
配置参数:spring.mail.from=xxx@qq.com, 告警邮箱
 
5 启动监控线程
5.1 初始化callbackThreadPool线程池:用于callback 回调的线程池,客户端调用api/callback接口时会使用这个线程池
5.2 启动监控线monitorThread:调度记录停留在 "运行中" 状态 超过10min,且对应执行器心跳注册 失败不在线,则将本地调度主动标记失败
 
6 启动日志统计和清除线程logrThread
-- 日志记录刷新,刷新 最近三天的日志Report(即统计每天的失败、成功、运行次数等)
-- 每天清除一次 失效过期的日志数据
配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除
 
 
7 启动任务调度(很重要!!主要靠这两个线程进行塞数据到时间轮,然后时间轮取数调度任务)
7.1 scheduleThread线程-取待执行任务数据入时间轮(塞数据)
-- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行
-- 第二步:预读数据从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据
----  preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
-- 第三步:将当前时间与下次调度时间对比,有如下三种情况
****  当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
--------  1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次
--------  2、刷新上一次触发 和 下一次待触发时间
****  当前时间 大于 任务的下一次触发时间 并且是没有过期的:
--------  1、直接触发任务执行器
--------  2、刷新上一次触发 和 下一次待触发时间
--------  3、如果下一次触发在五秒内,直接放进时间轮里面待调度
----------------  1、求当前任务下一次触发时间所处一分钟的第N秒
----------------  2、将当前任务ID和ringSecond放进时间轮里面
----------------  3、刷新上一次触发 和 下一次待触发时间
****  当前时间 小于 下一次触发时间:
--------  1、求当前任务下一次触发时间所处一分钟的第N秒
--------  2、将当前任务ID和ringSecond放进时间轮里面
--------  3、刷新上一次触发 和 下一次待触发时间
-- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time
-- 第五步:提交数据库事务,释放数据库select for update排它锁
 
7.2 ringThread线程 -根据时间轮执行job任务 (取数据执行)
首先时间轮数据格式为:Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>()
-- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
-- 第二步:执行触发器
-- 第三步:清除当前刻度列表的数据
**** 执行的过程中还会选择对应的策略,如下:
-------- 阻塞策略:串行、废弃后面、覆盖前面
-------- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询
 
初始化的入口代码为 XxlJobAdminConfig,代码如下:
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig () {
        return adminConfig ;
    }
    // ---------------------- XxlJobScheduler ----------------------
    private XxlJobScheduler xxlJobScheduler ;
    @Override
    public void afterPropertiesSet () throws Exception { // 生命周期中的属性注入来对xxlJobScheduler初始化
        adminConfig = this;
          // 初始化 xxl-job 定时任务
        xxlJobScheduler = new XxlJobScheduler() ;
        xxlJobScheduler .init() ;
    }
    @Override
    public void destroy () throws Exception { // 生命周期中的销毁来对xxlJobScheduler销毁
        xxlJobScheduler .destroy() ;
    }
    ..............省略..............
}
 
 
xxlJobScheduler.init()进行初始化会执行如下过程:
public class XxlJobScheduler {
    private static final Logger logger = LoggerFactory. getLogger (XxlJobScheduler. class ) ;
    public void init () throws Exception {
          // 1 国际化相关 init i18n
          initI18n() ;
          // 2 初始化快线程池 fastTriggerPool 、慢线程池 slowTriggerPool admin trigger pool start
          JobTriggerPoolHelper. toStart () ;
          // 3 启动注册监听线程 admin registry monitor run
          JobRegistryHelper. getInstance ().start() ;
          // 4 启动失败任务监听线程 ( 重试、告警 ) admin fail-monitor run
          JobFailMonitorHelper. getInstance ().start() ;
          // 5 启动监控线程(调度记录停留在 " 运行中 " 状态超过 10min ,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败) admin lose-monitor run ( depend on JobTriggerPoolHelper )
          JobCompleteHelper. getInstance ().start() ;
          // 6 启动日志统计和清除线程(日志记录刷新,刷新最近三天的日志 Report (即统计每天的失败、成功、运行次数等);每天清除一次失效过期的日志数据) admin log report start
          JobLogReportHelper. getInstance ().start() ;
          // 7 启动任务调度 (scheduleThread- 取待执行任务数据入时间轮; ringThread- 根据时间轮执行 job 任务 ) start-schedule ( depend on JobTriggerPoolHelper )
          JobScheduleHelper. getInstance ().start() ;
          logger .info( ">>>>>>>>> init xxl-job admin success." ) ;
     }
    ................省略........................
}
 

上面初始化的7个步骤拆分如下==============
1 国际化相关
private void initI18n (){ // 根据环境设置title为中文、英文等
    for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum. values ()) {
        item.setTitle(I18nUtil. getString ( "jobconf_block_" .concat(item.name()))) ;
    }
}
 
2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool
这个步骤初始化了两个线程池 fastTriggerPoolslowTriggerPool
在触发调度的时候会有一个选择快慢线程池的过程,如果job在一分钟内超过超过10次,就用slowTriggerPool来处理,如下:
ThreadPoolExecutor triggerPool_ = fastTriggerPool ;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap .get(jobId) ;
if (jobTimeoutCount!= null && jobTimeoutCount.get() > 10 ) { // job 在一分钟内超过超过 10 次,就用 slowTriggerPool 来处理 job-timeout 10 times in 1 min
     triggerPool_ = slowTriggerPool ;
}
triggerPool_.execute( new Runnable() {.........省略............}
 
3 启动注册监听线程
3.1 初始化registryOrRemoveThreadPool线程池:用于 注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销
3.2 启动监听注册的线程registryMonitorThread: 清除心跳超过90s的注册信息,并且刷新分组注册信息
public void start(){
    // 用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销    for registry or remove
    registryOrRemoveThreadPool = new ThreadPoolExecutor(
         2,
         10,
         30L,
         TimeUnit.SECONDS,
         new LinkedBlockingQueue<Runnable>( 2000),
         new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
               return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
            }
         },
         new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
               r.run();
               logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
            }
         });
 
    // 启动监听注册的线程      for monitor
    registryMonitorThread = new Thread(new Runnable() {
      @Override
      public void run() {
         while (!toStop) {
            try {
                // 获取自动注册的执行器组(执行器地址类型:0=自动注册、1=手动录入) auto registry group
               List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
               if (groupList!=null && !groupList.isEmpty()) {// group组集合不为空
                   // 移除死掉的调用地址(心跳时间超过90秒,就当线程挂掉了。默认是30s做一次心跳)          remove dead address (admin/executor)
                  List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                  if (ids!=null && ids.size()>0) { // 移除挂掉的注册地址信息
                     XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                  }
   
                  // fresh online address (admin/executor)
                  HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                   // 找出所有正常没死掉的注册地址
                  List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                  if (list != null) {
                     for (XxlJobRegistry item: list) {
                         // 确保是 EXECUTOR 执行器类型
                        if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                           String appname = item.getRegistryKey();
                           List<String> registryList = appAddressMap.get(appname);
                           if (registryList == null) {
                              registryList = new ArrayList<String>();
                           }
 
 
                           if (!registryList.contains(item.getRegistryValue())) {
                              registryList.add(item.getRegistryValue());
                           }
                           appAddressMap.put(appname, registryList);
                        }
                     }
                  }
 
 
                   // 刷新分组注册地址信息  fresh group address
                  for (XxlJobGroup group: groupList) {
                     List<String> registryList = appAddressMap.get(group.getAppname());
                     String addressListStr = null;
                     if (registryList!=null && !registryList.isEmpty()) {
                        Collections.sort(registryList);
                        StringBuilder addressListSB = new StringBuilder();
                        for (String item:registryList) {
                           addressListSB.append(item).append(",");
                        }
                        addressListStr = addressListSB.toString();
                        addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                     }
                     group.setAddressList(addressListStr);
                     group.setUpdateTime(new Date());
 
 
                     XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                  }
               }
            } catch (Exception e) {
               if (!toStop) {
                  logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
               }
            }
            try {
               TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            } catch (InterruptedException e) {
               if (!toStop) {
                  logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
               }
            }
         }
         logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
      }     
   });
   registryMonitorThread.setDaemon(true);
   registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
   registryMonitorThread.start();
}
 
4 启动失败任务监听线程(重试、告警)
这部分逻辑比较简单,就是重试 + 告警,核心代码如下
// 获取执行失败的job信息
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
   for (long failLogId: failLogIds) {
      // lock log
      int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
      if (lockRet < 1) {
         continue;
      }
      XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
      XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
 
       // 1、失败塞回重试       fail retry monitor
      if (log.getExecutorFailRetryCount() > 0) {
         JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
         String retryMsg = "<span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span>";
         log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
         XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
      }
 
       // 2、进行失败告警       fail alarm monitor
      int newAlarmStatus = 0;        // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
      if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
         boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
         newAlarmStatus = alarmResult?2:3;
      } else {
         newAlarmStatus = 1;
      }
      XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
   }
}
 
5 启动监控线程
5.1 初始化callbackThreadPool线程池:用于callback 回调的线程池,客户端调用api/callback接口时会使用这个线程池
5.2 启动监控线monitorThread:调度记录停留在 "运行中" 状态 超过10min,且对应执行器心跳注册 失败不在线,则将本地调度主动标记失败
逻辑较简单,如上两点
 
6 启动日志统计和清除线程logrThread
-- 日志记录刷新,刷新 最近三天的日志Report(即统计每天的失败、成功、运行次数等)
-- 每天清除一次 失效过期的日志数据
配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除
逻辑较简单,如上两点
 
7 启动任务调度
7.1 scheduleThread-取待执行任务数据入时间轮
-- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行
-- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据
----  preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
-- 第三步:将当前时间与下次调度时间对比,有如下三种情况
****  当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
--------  1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次
--------  2、刷新上一次触发 和 下一次待触发时间
****  当前时间 大于 任务的下一次触发时间 并且是没有过期的:
--------  1、直接触发任务执行器
--------  2、刷新上一次触发 和 下一次待触发时间
--------  3、如果下一次触发在五秒内,直接放进时间轮里面待调度
----------------  1、求当前任务下一次触发时间所处一分钟的第N秒
----------------  2、将当前任务ID和ringSecond放进时间轮里面
----------------  3、刷新上一次触发 和 下一次待触发时间
****  当前时间 小于 下一次触发时间:
--------  1、求当前任务下一次触发时间所处一分钟的第N秒
--------  2、将当前任务ID和ringSecond放进时间轮里面
--------  3、刷新上一次触发 和 下一次待触发时间
-- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time
-- 第五步:提交数据库事务,释放数据库select for update排它锁
 
7.2 ringThread-根据时间轮执行job任务
首先时间轮数据格式为:Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>()
-- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
-- 第二步:执行触发器
-- 第三步:清除当前刻度列表的数据
**** 执行的过程中还会选择对应的策略,如下:
-------- 阻塞策略:串行、废弃后面、覆盖前面
-------- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询
启动两个线程解析的核心源码如下:
public void start (){
 
     // 启动调度线程,这些线程是用来取数据的 schedule thread
    scheduleThread = new Thread( new Runnable() {
    @Override
    public void run () {
    try { // 不知道为啥要休眠 4-5 秒 时间,然后再启动
        TimeUnit. MILLISECONDS .sleep( 5000 - System. currentTimeMillis ()% 1000 ) ;
    } catch (InterruptedException e) {
        if (! scheduleThreadToStop ) {
            logger .error(e.getMessage() , e) ;
        }
    }
    logger .info( ">>>>>>>>> init xxl-job admin scheduler success." ) ;
 
     // 这里是预读数量 pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
    int preReadCount = (XxlJobAdminConfig. getAdminConfig ().getTriggerPoolFastMax() + XxlJobAdminConfig. getAdminConfig ().getTriggerPoolSlowMax()) * 20 ;
 
    while (! scheduleThreadToStop ) {
    // 扫描任务 Scan Job
    long start = System. currentTimeMillis () ;
    Connection conn = null;
    Boolean connAutoCommit = null;
    PreparedStatement preparedStatement = null
    boolean preReadSuc = true;
    try {
        conn = XxlJobAdminConfig. getAdminConfig ().getDataSource().getConnection() ;
          connAutoCommit = conn.getAutoCommit() ;
          conn.setAutoCommit( false ) ;
          // 采用 select for update ,是排它锁。说白了 xxl-job 用一张数据库表来当分布式锁了,确保多个 xxl-job admin 节点下,依旧只能同时执行一个调度线程任务
        preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ) ;
          preparedStatement.execute() ;
 
          // tx start
 
          // 1 、预读数据 pre read
          long nowTime = System. currentTimeMillis () ;
          // -- 从数据库中读取截止到五秒后未执行的 job ,并且读取 preReadCount=6000
          List<XxlJobInfo> scheduleList = XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS , preReadCount) ;
          if (scheduleList!= null && scheduleList.size()> 0 ) {
              // 2 push 压进 时间轮 push time-ring
              for (XxlJobInfo jobInfo: scheduleList) {
 
                  // time-ring jump
                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS ) {
                        // 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS 5s )) , 可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
                        // 2.1 trigger-expire > 5s pass && make next-trigger-time
                        logger .warn( ">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()) ;
                        // 1 、匹配过期失效的策略: DO_NOTHING= 过期啥也不干,废弃; FIRE_ONCE_NOW= 过期立即触发一次 misfire match
                        MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum. match (jobInfo.getMisfireStrategy() , MisfireStrategyEnum. DO_NOTHING ) ;
                      if (MisfireStrategyEnum. FIRE_ONCE_NOW == misfireStrategyEnum) {
                            // FIRE_ONCE_NOW trigger
                              JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. MISFIRE , - 1 , null, null, null ) ;
                              logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ;
                        }
                        // 2 、刷新上一次触发 和 下一次待触发时间 fresh next
                         refreshNextValidTime(jobInfo , new Date()) ;
                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                        // 当前时间 大于 任务的下一次触发时间 并且是没有过期的
                    // 2.2 trigger-expire < 5s direct-trigger && make next-trigger-time
                        // 1 、直接触发任务执行器 trigger
                        JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. CRON , - 1 , null, null, null ) ;
                        logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ;
                        // 2 、刷新上一次触发 和 下一次待触发时间 fresh next
                        refreshNextValidTime(jobInfo , new Date()) ;
 
                        // 如果下一次触发在五秒内,直接放进时间轮里面待调度 next-trigger-time in 5s, pre-read again
                        if (jobInfo.getTriggerStatus()== 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                              // 1 、求当前任务下一次触发时间所处一分钟的第 N make ring second
                              int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ;
                              // 2 、将当前任务 ID ringSecond 放进时间轮里面 push time ring
                              pushTimeRing(ringSecond , jobInfo.getId()) ;
                              // 3 、刷新上一次触发 和 下一次待触发时间 fresh next
                              refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ;
                        }
 
                    } else {
                      // 当前时间 小于 下一次触发时间
                        // 2.3 trigger-pre-read time-ring trigger && make next-trigger-time
                        // 1 、求当前任务下一次触发时间所处一分钟的第 N make ring second
                        int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ;
                        // 2 、将当前任务 ID ringSecond 放进时间轮里面 push time ring
                        pushTimeRing(ringSecond , jobInfo.getId()) ;
                        // 3 、刷新上一次触发 和 下一次待触发时间 fresh next
                        refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ;
                    }
              }
 
              // 3 、更新数据库执行器信息,如 trigger_last_time trigger_next_time update trigger info
              for (XxlJobInfo jobInfo: scheduleList) {
                    XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleUpdate(jobInfo) ;
              }
 
          } else {
            preReadSuc = false;
          }
          // tx stop
    } catch (Exception e) {
          if (! scheduleThreadToStop ) {
              logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}" , e) ;
          }
    } finally {
          // 提交事务,释放数据库 select for update 的锁 commit
        .......................省略.............    
    }
    long cost = System. currentTimeMillis ()-start ;
 
     // 如果执行太快了,就稍微 sleep 等待一下 Wait seconds, align second
    if (cost < 1000 ) { // scan-overtime, not wait
        try {
            // pre-read period: success > scan each second; fail > skip this period;
            TimeUnit. MILLISECONDS .sleep((preReadSuc? 1000 : PRE_READ_MS ) - System. currentTimeMillis ()% 1000 ) ;
        } catch (InterruptedException e) {
            if (! scheduleThreadToStop ) {
                logger .error(e.getMessage() , e) ;
            }
        }
    }) ;
    scheduleThread .setDaemon( true ) ;
    scheduleThread .setName( "xxl-job, admin JobScheduleHelper#scheduleThread" ) ;
    scheduleThread .start() ;
 
 
     // 时间轮线程,用于取出每秒的数据,然后处理 ring thread
    ringThread = new Thread( new Runnable() {
        @Override
        public void run () {
            while (! ringThreadToStop ) {
                   // align second
                   try {
                       TimeUnit. MILLISECONDS .sleep( 1000 - System. currentTimeMillis () % 1000 ) ;
                   } catch (InterruptedException e) {
                    if (! ringThreadToStop ) {
                        logger .error(e.getMessage() , e) ;
                    }
                }
                   try {
                       // second data
                       List<Integer> ringItemData = new ArrayList<>() ;
                       // 获取当前所处的一分钟第几秒,然后 for 两次,第二次是为了重跑前面一个刻度没有被执行的的 job list ,避免前面的刻度遗漏了
                    int nowSecond = Calendar. getInstance ().get(Calendar. SECOND ) ; // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                    for ( int i = 0 ; i < 2 ; i++) {
                        List<Integer> tmpData = ringData .remove( (nowSecond+ 60 -i)% 60 ) ;
                        if (tmpData != null ) {
                            ringItemData.addAll(tmpData) ;
                        }
                       }
 
                       // ring trigger
                       logger .debug( ">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays. asList (ringItemData) ) ;
                       if (ringItemData.size() > 0 ) {
                           // do trigger
                              for ( int jobId: ringItemData) {
                                  // 执行触发器 do trigger
                                  JobTriggerPoolHelper. trigger (jobId , TriggerTypeEnum. CRON , - 1 , null, null, null ) ;
                              }
                              // 清除当前刻度列表的数据 clear
                              ringItemData.clear() ;
                       }
                   } catch (Exception e) {
                         if (! ringThreadToStop ) {
                              logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}" , e) ;
                       }
                   }
               }
            logger .info( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop" ) ;
        }
    }) ;
    ringThread .setDaemon( true ) ;
    ringThread .setName( "xxl-job, admin JobScheduleHelper#ringThread" ) ;
    ringThread .start() ;
}
 
 
 
 
 
 
 
 
 
 
 

分享:

低价透明

统一报价,无隐形消费

金牌服务

一对一专属顾问7*24小时金牌服务

信息保密

个人信息安全有保障

售后无忧

服务出问题客服经理全程跟进