xxl-job 开源网址: https://gitee.com/xuxueli0323/xxl-job
搭建任务调度平台
1、拉取xxl-job代码
首先去上述的gitee拉取开源的xxl-job代码,千万不要选择master分支,要选择稳定的版本,这里我选择2.2.0来做演示

2、创建需要的数据库

执行这个sql文件,创建必要的数据库
3、配置yaml文件
首先去xxl-job-admin找到配置文件,并修改port、mysql、mail、xxl.job.accessToken



server.port 可以修改成 8888,避免与Java项目的端口产生冲突
数据库的地址、用户名、密码修改成自己的
mail.username和mail.password修改成自己的,如果不用qq邮箱,记得mail.host和mail.from也修改一下
accessToken可以理解为秘钥,后面会在你自己的项目中配置和他名字一模一样的秘钥,让你的项目与任务调度器平台互相关联起来。这里我就不做修改了,使用默认的,可以自定义。
4、配置logback.xml

在linux中先创建好自己的路径日志,然后赋予权限,
1 2 3 4 5
| mkdir -p /data/xxl-job touch /data/xxl-job/xxl-job-admin.log
chmod -R 777 /data/xxl-job/xxl-job-admin.log
|
5、启动任务调度器
配置好之后,用idea打包,并将jar在linux中运行。

1 2 3 4 5
| nohup java -jar xxl-job-admin-2.4.1-SNAPSHOT.jar > /data/xxl-job/xxl-job-admin.log 2>&1 &
tail -f /data/xxl-job/xxl-job-admin.log
|
6、访问任务调度器
访问地址:http://自己的ip:自己设置的端口号/xxl-job-admin
账号:admin ;密码:123456
sprintboot引入xxl-job
1、引入xxl-job-core依赖
在自己的项目中引入xxl-job-core依赖;只需引入这一个依赖就行
1 2 3 4 5
| <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.3.1</version> </dependency>
|
2、配置yaml
在自己的项目中配置yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| xxl: job: admin: addresses: http://192.168.139.110:8888/xxl-job-admin accessToken: default_token executor: appname: job-demo address: ip: 192.168.139.110 port: 9999 logpath: /data/xxl-job/xxl-job-admin.log logretentiondays: 30
|

注意!!!!: logpath执行日志存储位置,也是需要自己在linux中创建的,和上述任务调度器平台创建一样,创建完成之后,设置权限。
3、配置日志
去你拉取的xxl-job项目中找到 logback.xml,复制到你自己项目的位置,并且创建该文件,赋予权限

4、编写配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; }
}
|
xxl-job项目中的XxlJobConfig
上述的配置类代码,其实就是这个xxl-job项目中的XxlJobConfig 一模一样。

5、编写执行器

|
@Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
@XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) { XxlJobHelper.log("beat at:" + i); TimeUnit.SECONDS.sleep(2); } }
@XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception {
int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
for (int i = 0; i < shardTotal; i++) { if (i == shardIndex) { XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); } else { XxlJobHelper.log("第 {} 片, 忽略", i); } }
}
@XxlJob("commandJobHandler") public void commandJobHandler() throws Exception { String command = XxlJobHelper.getJobParam(); int exitValue = -1;
BufferedReader bufferedReader = null; try { if (command<mark>null || command.trim().length()</mark>0) { XxlJobHelper.handleFail("command empty."); return; }
String[] commandArray = command.split(" ");
ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command(commandArray); processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
String line; while ((line = bufferedReader.readLine()) != null) { XxlJobHelper.log(line); }
process.waitFor(); exitValue = process.exitValue(); } catch (Exception e) { XxlJobHelper.log(e); } finally { if (bufferedReader != null) { bufferedReader.close(); } }
if (exitValue == 0) { } else { XxlJobHelper.handleFail("command exit value("+exitValue+") is failed"); }
}
@XxlJob("httpJobHandler") public void httpJobHandler() throws Exception {
String param = XxlJobHelper.getJobParam(); if (param<mark>null || param.trim().length()</mark>0) { XxlJobHelper.log("param["+ param +"] invalid.");
XxlJobHelper.handleFail(); return; }
String url; String method; String data; try { Map<String, String> paramMap =GsonTool.fromJson(param, Map.class); url = paramMap.get("url"); method = paramMap.get("method"); data = paramMap.get("data"); } catch (Exception e) { XxlJobHelper.log(e); XxlJobHelper.handleFail(); return; }
if (url<mark>null || url.trim().length()</mark>0) { XxlJobHelper.log("url["+ url +"] invalid.");
XxlJobHelper.handleFail(); return; } if (method==null || !Arrays.asList("GET", "POST").contains(method.toUpperCase())) { XxlJobHelper.log("method["+ method +"] invalid.");
XxlJobHelper.handleFail(); return; } method = method.toUpperCase(); boolean isPostMethod = method.equals("POST");
HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { URL realUrl = new URL(url); connection = (HttpURLConnection) realUrl.openConnection();
connection.setRequestMethod(method); connection.setDoOutput(isPostMethod); connection.setDoInput(true); connection.setUseCaches(false); connection.setReadTimeout(5 * 1000); connection.setConnectTimeout(3 * 1000); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
connection.connect();
if (isPostMethod && data!=null && data.trim().length()>0) { DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); dataOutputStream.write(data.getBytes("UTF-8")); dataOutputStream.flush(); dataOutputStream.close(); }
int statusCode = connection.getResponseCode(); if (statusCode != 200) { throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); }
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line; while ((line = bufferedReader.readLine()) != null) { result.append(line); } String responseMsg = result.toString();
XxlJobHelper.log(responseMsg);
return; } catch (Exception e) { XxlJobHelper.log(e);
XxlJobHelper.handleFail(); return; } finally { try { if (bufferedReader != null) { bufferedReader.close(); } if (connection != null) { connection.disconnect(); } } catch (Exception e2) { XxlJobHelper.log(e2); } }
}
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") public void demoJobHandler2() throws Exception { XxlJobHelper.log("XXL-JOB, Hello World."); } public void init(){ logger.info("init"); } public void destroy(){ logger.info("destroy"); }
}
|
6. xxl-job 任务详解
6.1 执行器
执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能;
另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器


以下是执行器的属性说明:
属性名称 | 说明 |
---|
AppName | 是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用; |
名称 | 执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性; |
排序 | 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表; |
注册方式 | 调度中心获取执行器地址的方式; |
机器地址 | 注册方式为”手动录入”时有效,支持人工维护执行器的地址信息; |
自动注册和手动注册的区别和配置

6.2 基础配置
在我们新建任务的时候,里面有很多的配置项,下面我们就来介绍下里面具体的作用

基础配置

调度配置
- 调度类型:
- 无:该类型不会主动触发调度;
- CRON:该类型将会通过CRON,触发任务调度;
- 固定速度:该类型将会以固定速度,触发任务调度;按照固定的间隔时间,周期性触发;

任务配置

阻塞处理策略
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO(First Input First Output)队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

路由策略
当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询)
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

7 xxl-job 案例
7.1 轮询
1.修改任务为轮询

2.启动多个微服务

修改yml配置文件
1 2 3 4 5 6 7 8 9 10 11
| server: port: ${port:8881}
xxl: job: admin: addresses: http://192.168.200.146:8888/xxl-job-admin executor: appname: xxl-job-executor-sample port: ${executor.port:9999}
|
3.启动多个微服务
每个微服务轮询的去执行任务
7.2 分片广播
执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务

具体案例
需求:让两个节点同时执行10000个任务,每个节点分别执行5000个任务
①:在xxl-job-executor-sample执行器下新创建任务,路由策略为分片广播

②:分片广播代码
分片参数
index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
total:总分片数,执行器集群的总机器数量;
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.heima.xxljob.job;
import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.List;
@Component public class HelloJob {
@Value("${server.port}") private String port;
@XxlJob("demoJobHandler") public void helloJob(){ System.out.println("简单任务执行了。。。。"+port);
}
@XxlJob("shardingJobHandler") public void shardingJobHandler(){ int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
List<Integer> list = getList(); for (Integer integer : list) { if(integer % shardTotal == shardIndex){ System.out.println("当前第"+shardIndex+"分片执行了,任务项为:"+integer); } } }
public List<Integer> getList(){ List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) { list.add(i); } return list; } }
|
④:测试
启动多个微服务测试,一次执行可以执行多个任务