xxl-job任务触发流程
xxl-job老版本是依赖quartz的定时任务触发,在v2.1.0版本开始 移除quartz依赖:一方面是为了精简系统降低冗余依赖,另一方面是为了提供系统的可控度与稳定性。(本文 相应代码版本 2.2.0-SNAPSHOT)
以下是本文的目录大纲:
一.任务触发执行总体流程
二.任务定时触发流程
三.关于这么设计的感悟
请尊重作者劳动成果,转载请标明原文链接:
https://www.cnblogs.com/wanghongsen/p/12510533.html
一 任务触发执行总体流程
先来看下任务触发和执行的 完整的任务触发执行总体流程图 如下:
上图所示左上角的 第一步:任务触发方式 主要有以下几种类型:1 根据设置的时间自动触发JobScheduleHelper,2 页面点击操作按钮执行触发,3 父子任务触发,4失败重试触发。
本文重点讲解 第一步:任务触发 的第一种 1 根据设置的时间自动触发,即上图 红色框内标示的部分,具体见JobScheduleHelper这个类。
二 任务定时触发流程
详细的JobScheduleHelperCron定时触发 这个阶段流程图如下:
具体见JobScheduleHelper这个类结合上面流程图来分析,在工程spring启动的时候 触发了JobScheduleHelper类的start()方法,完整代码如下
1 public void start(){ 2 3 // schedule thread 4 scheduleThread = new Thread(new Runnable() { 5 @Override 6 public void run() { 7 8 try { 9 TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); 10 } catch (InterruptedException e) { 11 if (!scheduleThreadToStop) { 12 logger.error(e.getMessage(), e); 13 } 14 } 15 logger.info(">>>>>>>>> init xxl-job admin scheduler success."); 16 17 // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) 18 int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; 19 20 while (!scheduleThreadToStop) { 21 22 // Scan Job 23 long start = System.currentTimeMillis(); 24 25 Connection conn = null; 26 Boolean connAutoCommit = null; 27 PreparedStatement preparedStatement = null; 28 29 boolean preReadSuc = true; 30 try { 31 32 conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); 33 connAutoCommit = conn.getAutoCommit(); 34 conn.setAutoCommit(false); 35 36 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); 37 preparedStatement.execute(); 38 39 // tx start 40 41 // 1、pre read 42 long nowTime = System.currentTimeMillis(); 43 List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); 44 if (scheduleList!=null && scheduleList.size()>0) { 45 // 2、push time-ring 46 for (XxlJobInfo jobInfo: scheduleList) { 47 48 // time-ring jump 49 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { 50 // 2.1、trigger-expire > 5s:pass && make next-trigger-time 51 logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); 52 53 // fresh next 54 refreshNextValidTime(jobInfo, new Date()); 55 56 } else if (nowTime > jobInfo.getTriggerNextTime()) { 57 // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time 58 59 // 1、trigger 60 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null); 61 logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); 62 63 // 2、fresh next 64 refreshNextValidTime(jobInfo, new Date()); 65 66 // next-trigger-time in 5s, pre-read again 67 if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { 68 69 // 1、make ring second 70 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); 71 72 // 2、push time ring 73 pushTimeRing(ringSecond, jobInfo.getId()); 74 75 // 3、fresh next 76 refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); 77 78 } 79 80 } else { 81 // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time 82 83 // 1、make ring second 84 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); 85 86 // 2、push time ring 87 pushTimeRing(ringSecond, jobInfo.getId()); 88 89 // 3、fresh next 90 refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); 91 92 } 93 94 } 95 96 // 3、update trigger info 97 for (XxlJobInfo jobInfo: scheduleList) { 98 XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); 99 } 100 101 } else { 102 preReadSuc = false; 103 } 104 105 // tx stop 106 107 108 } catch (Exception e) { 109 if (!scheduleThreadToStop) { 110 logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); 111 } 112 } finally { 113 114 // commit 115 if (conn != null) { 116 try { 117 conn.commit(); 118 } catch (SQLException e) { 119 if (!scheduleThreadToStop) { 120 logger.error(e.getMessage(), e); 121 } 122 } 123 try { 124 conn.setAutoCommit(connAutoCommit); 125 } catch (SQLException e) { 126 if (!scheduleThreadToStop) { 127 logger.error(e.getMessage(), e); 128 } 129 } 130 try { 131 conn.close(); 132 } catch (SQLException e) { 133 if (!scheduleThreadToStop) { 134 logger.error(e.getMessage(), e); 135 } 136 } 137 } 138 139 // close PreparedStatement 140 if (null != preparedStatement) { 141 try { 142 preparedStatement.close(); 143 } catch (SQLException e) { 144 if (!scheduleThreadToStop) { 145 logger.error(e.getMessage(), e); 146 } 147 } 148 } 149 } 150 long cost = System.currentTimeMillis()-start; 151 152 153 // Wait seconds, align second 154 if (cost < 1000) { // scan-overtime, not wait 155 try { 156 // pre-read period: success > scan each second; fail > skip this period; 157 TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); 158 } catch (InterruptedException e) { 159 if (!scheduleThreadToStop) { 160 logger.error(e.getMessage(), e); 161 } 162 } 163 } 164 165 } 166 167 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); 168 } 169 }); 170 scheduleThread.setDaemon(true); 171 scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); 172 scheduleThread.start(); 173 174 175 // ring thread 176 ringThread = new Thread(new Runnable() { 177 @Override 178 public void run() { 179 180 // align second 181 try { 182 TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 ); 183 } catch (InterruptedException e) { 184 if (!ringThreadToStop) { 185 logger.error(e.getMessage(), e); 186 } 187 } 188 189 while (!ringThreadToStop) { 190 191 try { 192 // second data 193 List<Integer> ringItemData = new ArrayList<>(); 194 int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; 195 for (int i = 0; i < 2; i++) { 196 List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); 197 if (tmpData != null) { 198 ringItemData.addAll(tmpData); 199 } 200 } 201 202 // ring trigger 203 logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); 204 if (ringItemData.size() > 0) { 205 // do trigger 206 for (int jobId: ringItemData) { 207 // do trigger 208 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null); 209 } 210 // clear 211 ringItemData.clear(); 212 } 213 } catch (Exception e) { 214 if (!ringThreadToStop) { 215 logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); 216 } 217 } 218 219 // next second, align second 220 try { 221 TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); 222 } catch (InterruptedException e) { 223 if (!ringThreadToStop) { 224 logger.error(e.getMessage(), e); 225 } 226 } 227 } 228 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); 229 } 230 }); 231 ringThread.setDaemon(true); 232 ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); 233 ringThread.start(); 234 }