xxl-job 开源网址: https://gitee.com/xuxueli0323/xxl-job
搭建任务调度平台
1、拉取xxl-job代码
首先去上述的gitee拉取开源的xxl-job代码,千万不要选择master分支,要选择稳定的版本,这里我选择2.2.0来做演示

2、创建需要的数据库

执行这个sql文件,创建必要的数据库
3、配置yaml文件
首先去xxl-job-admin找到配置文件,并修改port、mysql、mail、xxl.job.accessToken



server.port 可以修改成 8888,避免与Java项目的端口产生冲突
数据库的地址、用户名、密码修改成自己的
mail.username和mail.password修改成自己的,如果不用qq邮箱,记得mail.host和mail.from也修改一下
accessToken可以理解为秘钥,后面会在你自己的项目中配置和他名字一模一样的秘钥,让你的项目与任务调度器平台互相关联起来。这里我就不做修改了,使用默认的,可以自定义。
4、配置logback.xml

在linux中先创建好自己的路径日志,然后赋予权限,
1 2 3 4 5
| mkdir -p /data/xxl-job touch /data/xxl-job/xxl-job-admin.log
chmod -R 777 /data/xxl-job/xxl-job-admin.log
|
5、启动任务调度器
配置好之后,用idea打包,并将jar在linux中运行。

1 2 3 4 5
| nohup java -jar xxl-job-admin-2.4.1-SNAPSHOT.jar > /data/xxl-job/xxl-job-admin.log 2>&1 &
tail -f /data/xxl-job/xxl-job-admin.log
|
6、访问任务调度器
访问地址:http://自己的ip:自己设置的端口号/xxl-job-admin
账号:admin ;密码:123456
sprintboot引入xxl-job
1、引入xxl-job-core依赖
在自己的项目中引入xxl-job-core依赖;只需引入这一个依赖就行
1 2 3 4 5
| <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.3.1</version> </dependency>
|
2、配置yaml
在自己的项目中配置yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| xxl: job: admin: addresses: http://192.168.139.110:8888/xxl-job-admin accessToken: default_token executor: appname: job-demo address: ip: 192.168.139.110 port: 9999 logpath: /data/xxl-job/xxl-job-admin.log logretentiondays: 30
|

注意!!!!: logpath执行日志存储位置,也是需要自己在linux中创建的,和上述任务调度器平台创建一样,创建完成之后,设置权限。
3、配置日志
去你拉取的xxl-job项目中找到 logback.xml,复制到你自己项目的位置,并且创建该文件,赋予权限

4、编写配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; }
}
|
xxl-job项目中的XxlJobConfig
上述的配置类代码,其实就是这个xxl-job项目中的XxlJobConfig 一模一样。

5、编写执行器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
|
@Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
@XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) { XxlJobHelper.log("beat at:" + i); TimeUnit.SECONDS.sleep(2); } }
@XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception {
int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
for (int i = 0; i < shardTotal; i++) { if (i == shardIndex) { XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); } else { XxlJobHelper.log("第 {} 片, 忽略", i); } }
}
@XxlJob("commandJobHandler") public void commandJobHandler() throws Exception { String command = XxlJobHelper.getJobParam(); int exitValue = -1;
BufferedReader bufferedReader = null; try { if (command<mark>null || command.trim().length()</mark>0) { XxlJobHelper.handleFail("command empty."); return; }
String[] commandArray = command.split(" ");
ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command(commandArray); processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
String line; while ((line = bufferedReader.readLine()) != null) { XxlJobHelper.log(line); }
process.waitFor(); exitValue = process.exitValue(); } catch (Exception e) { XxlJobHelper.log(e); } finally { if (bufferedReader != null) { bufferedReader.close(); } }
if (exitValue == 0) { } else { XxlJobHelper.handleFail("command exit value("+exitValue+") is failed"); }
}
@XxlJob("httpJobHandler") public void httpJobHandler() throws Exception {
String param = XxlJobHelper.getJobParam(); if (param<mark>null || param.trim().length()</mark>0) { XxlJobHelper.log("param["+ param +"] invalid.");
XxlJobHelper.handleFail(); return; }
String url; String method; String data; try { Map<String, String> paramMap =GsonTool.fromJson(param, Map.class); url = paramMap.get("url"); method = paramMap.get("method"); data = paramMap.get("data"); } catch (Exception e) { XxlJobHelper.log(e); XxlJobHelper.handleFail(); return; }
if (url<mark>null || url.trim().length()</mark>0) { XxlJobHelper.log("url["+ url +"] invalid.");
XxlJobHelper.handleFail(); return; } if (method==null || !Arrays.asList("GET", "POST").contains(method.toUpperCase())) { XxlJobHelper.log("method["+ method +"] invalid.");
XxlJobHelper.handleFail(); return; } method = method.toUpperCase(); boolean isPostMethod = method.equals("POST");
HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { URL realUrl = new URL(url); connection = (HttpURLConnection) realUrl.openConnection();
connection.setRequestMethod(method); connection.setDoOutput(isPostMethod); connection.setDoInput(true); connection.setUseCaches(false); connection.setReadTimeout(5 * 1000); connection.setConnectTimeout(3 * 1000); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
connection.connect();
if (isPostMethod && data!=null && data.trim().length()>0) { DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); dataOutputStream.write(data.getBytes("UTF-8")); dataOutputStream.flush(); dataOutputStream.close(); }
int statusCode = connection.getResponseCode(); if (statusCode != 200) { throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); }
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line; while ((line = bufferedReader.readLine()) != null) { result.append(line); } String responseMsg = result.toString();
XxlJobHelper.log(responseMsg);
return; } catch (Exception e) { XxlJobHelper.log(e);
XxlJobHelper.handleFail(); return; } finally { try { if (bufferedReader != null) { bufferedReader.close(); } if (connection != null) { connection.disconnect(); } } catch (Exception e2) { XxlJobHelper.log(e2); } }
}
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") public void demoJobHandler2() throws Exception { XxlJobHelper.log("XXL-JOB, Hello World."); } public void init(){ logger.info("init"); } public void destroy(){ logger.info("destroy"); }
}
|
6. xxl-job 任务详解
6.1 执行器
执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能;
另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器


以下是执行器的属性说明:
属性名称 | 说明 |
---|
AppName | 是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用; |
名称 | 执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性; |
排序 | 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表; |
注册方式 | 调度中心获取执行器地址的方式; |
机器地址 | 注册方式为”手动录入”时有效,支持人工维护执行器的地址信息; |
自动注册和手动注册的区别和配置

6.2 基础配置
在我们新建任务的时候,里面有很多的配置项,下面我们就来介绍下里面具体的作用

基础配置

调度配置
- 调度类型:
- 无:该类型不会主动触发调度;
- CRON:该类型将会通过CRON,触发任务调度;
- 固定速度:该类型将会以固定速度,触发任务调度;按照固定的间隔时间,周期性触发;

任务配置

阻塞处理策略
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO(First Input First Output)队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

路由策略
当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询)
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

7 xxl-job 案例
7.1 轮询
1.修改任务为轮询

2.启动多个微服务

修改yml配置文件
1 2 3 4 5 6 7 8 9 10 11
| server: port: ${port:8881}
xxl: job: admin: addresses: http://192.168.200.146:8888/xxl-job-admin executor: appname: xxl-job-executor-sample port: ${executor.port:9999}
|
3.启动多个微服务
每个微服务轮询的去执行任务
7.2 分片广播
执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务

具体案例
需求:让两个节点同时执行10000个任务,每个节点分别执行5000个任务
①:在xxl-job-executor-sample执行器下新创建任务,路由策略为分片广播

②:分片广播代码
分片参数
index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
total:总分片数,执行器集群的总机器数量;
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.heima.xxljob.job;
import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.List;
@Component public class HelloJob {
@Value("${server.port}") private String port;
@XxlJob("demoJobHandler") public void helloJob(){ System.out.println("简单任务执行了。。。。"+port);
}
@XxlJob("shardingJobHandler") public void shardingJobHandler(){ int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
List<Integer> list = getList(); for (Integer integer : list) { if(integer % shardTotal == shardIndex){ System.out.println("当前第"+shardIndex+"分片执行了,任务项为:"+integer); } } }
public List<Integer> getList(){ List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) { list.add(i); } return list; } }
|
④:测试
启动多个微服务测试,一次执行可以执行多个任务