xxl-job 使用

Lu Lv3

xxl-job 开源网址: https://gitee.com/xuxueli0323/xxl-job

搭建任务调度平台

1、拉取xxl-job代码

首先去上述的gitee拉取开源的xxl-job代码,千万不要选择master分支,要选择稳定的版本,这里我选择2.2.0来做演示

image-20250128120107325

2、创建需要的数据库

image-20250128120512644

执行这个sql文件,创建必要的数据库

3、配置yaml文件

首先去xxl-job-admin找到配置文件,并修改portmysqlmailxxl.job.accessToken

image-20250128120939910

image-20250128120943725

image-20250128121544518

  • server.port 可以修改成 8888,避免与Java项目的端口产生冲突

  • 数据库的地址、用户名、密码修改成自己的

  • mail.username和mail.password修改成自己的,如果不用qq邮箱,记得mail.host和mail.from也修改一下

  • accessToken可以理解为秘钥,后面会在你自己的项目中配置和他名字一模一样的秘钥,让你的项目与任务调度器平台互相关联起来。这里我就不做修改了,使用默认的,可以自定义。

4、配置logback.xml

image-20250128122100727

在linux中先创建好自己的路径日志,然后赋予权限,

1
2
3
4
5
# 创建目录和文件
mkdir -p /data/xxl-job
touch /data/xxl-job/xxl-job-admin.log
# 赋予权限
chmod -R 777 /data/xxl-job/xxl-job-admin.log

5、启动任务调度器

配置好之后,用idea打包,并将jar在linux中运行。

img

1
2
3
4
5
#将jar通过后台进程启动,并将启动信息指向/data/xxl-job/xxl-job-admin.log中  
nohup java -jar xxl-job-admin-2.4.1-SNAPSHOT.jar > /data/xxl-job/xxl-job-admin.log 2>&1 &

#查看日志
tail -f /data/xxl-job/xxl-job-admin.log

6、访问任务调度器

访问地址:http://自己的ip:自己设置的端口号/xxl-job-admin

账号:admin ;密码:123456

sprintboot引入xxl-job

1、引入xxl-job-core依赖

在自己的项目中引入xxl-job-core依赖;只需引入这一个依赖就行

1
2
3
4
5
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>

2、配置yaml

在自己的项目中配置yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
xxl:
job:
admin:
# 调度中心服务部署的地址
addresses: http://192.168.139.110:8888/xxl-job-admin
# 执行器通讯TOKEN,要和调度中心服务部署配置的accessToken一致,要不然无法连接注册
accessToken: default_token
executor:
# 执行器AppName
appname: job-demo
# 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
ip: 192.168.139.110
#执行器端口号:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
# 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: /data/xxl-job/xxl-job-admin.log
# 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
logretentiondays: 30

image-20250128123544534

注意!!!!: logpath执行日志存储位置,也是需要自己在linux中创建的,和上述任务调度器平台创建一样,创建完成之后,设置权限。

3、配置日志

去你拉取的xxl-job项目中找到 logback.xml,复制到你自己项目的位置,并且创建该文件,赋予权限

image-20250128123944764

4、编写配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}

/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/


}

xxl-job项目中的XxlJobConfig

上述的配置类代码,其实就是这个xxl-job项目中的XxlJobConfig 一模一样。

image-20250128124447013

5、编写执行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
/**
* XxlJob开发示例(Bean模式)
*
* 开发步骤:
* 1、任务开发:在Spring Bean实例中,开发Job方法;
* 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
* 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
* 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
*
* @author xuxueli 2019-12-11 21:52:51
*/
@Component
public class SampleXxlJob {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);


/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");

for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}


/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

// 业务逻辑
for (int i = 0; i < shardTotal; i++) {
if (i == shardIndex) {
XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobHelper.log("第 {} 片, 忽略", i);
}
}

}


/**
* 3、命令行任务
*
* 参数示例:"ls -a" 或者 "pwd"
*/
@XxlJob("commandJobHandler")
public void commandJobHandler() throws Exception {
String command = XxlJobHelper.getJobParam();
int exitValue = -1;

BufferedReader bufferedReader = null;
try {
// valid
if (command<mark>null || command.trim().length()</mark>0) {
XxlJobHelper.handleFail("command empty.");
return;
}

// command split
String[] commandArray = command.split(" ");

// command process
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(commandArray);
processBuilder.redirectErrorStream(true);

Process process = processBuilder.start();
//Process process = Runtime.getRuntime().exec(command);

BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

// command log
String line;
while ((line = bufferedReader.readLine()) != null) {
XxlJobHelper.log(line);
}

// command exit
process.waitFor();
exitValue = process.exitValue();
} catch (Exception e) {
XxlJobHelper.log(e);
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}

if (exitValue == 0) {
// default success
} else {
XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");
}

}


/**
* 4、跨平台Http任务
*
* 参数示例:
* <pre>
* {
* "url": "http://www.baidu.com",
* "method": "get",
* "data": "hello world"
* }
* </pre>
*/
@XxlJob("httpJobHandler")
public void httpJobHandler() throws Exception {

// param
String param = XxlJobHelper.getJobParam();
if (param<mark>null || param.trim().length()</mark>0) {
XxlJobHelper.log("param["+ param +"] invalid.");

XxlJobHelper.handleFail();
return;
}

// param parse
String url;
String method;
String data;
try {
Map<String, String> paramMap =GsonTool.fromJson(param, Map.class);
url = paramMap.get("url");
method = paramMap.get("method");
data = paramMap.get("data");
} catch (Exception e) {
XxlJobHelper.log(e);
XxlJobHelper.handleFail();
return;
}

// param valid
if (url<mark>null || url.trim().length()</mark>0) {
XxlJobHelper.log("url["+ url +"] invalid.");

XxlJobHelper.handleFail();
return;
}
if (method==null || !Arrays.asList("GET", "POST").contains(method.toUpperCase())) {
XxlJobHelper.log("method["+ method +"] invalid.");

XxlJobHelper.handleFail();
return;
}
method = method.toUpperCase();
boolean isPostMethod = method.equals("POST");

// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();

// connection setting
connection.setRequestMethod(method);
connection.setDoOutput(isPostMethod);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

// do connection
connection.connect();

// data
if (isPostMethod && data!=null && data.trim().length()>0) {
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.write(data.getBytes("UTF-8"));
dataOutputStream.flush();
dataOutputStream.close();
}

// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
}

// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String responseMsg = result.toString();

XxlJobHelper.log(responseMsg);

return;
} catch (Exception e) {
XxlJobHelper.log(e);

XxlJobHelper.handleFail();
return;
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
XxlJobHelper.log(e2);
}
}

}

/**
* 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
*/
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
public void demoJobHandler2() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
}
public void init(){
logger.info("init");
}
public void destroy(){
logger.info("destroy");
}


}

6. xxl-job 任务详解

6.1 执行器

执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能;

另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器

image-20210729232926534

image-20210729232825564

以下是执行器的属性说明:

属性名称说明
AppName是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;
名称执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性;
排序执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;
注册方式调度中心获取执行器地址的方式;
机器地址注册方式为”手动录入”时有效,支持人工维护执行器的地址信息;

自动注册和手动注册的区别和配置

image-20210729233016355

6.2 基础配置

在我们新建任务的时候,里面有很多的配置项,下面我们就来介绍下里面具体的作用

image-20210729233926457

基础配置

  • 执行器:每个任务必须绑定一个执行器, 方便给任务进行分组

  • 任务描述:任务的描述信息,便于任务管理;

  • 负责人:任务的负责人;

  • 报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔

image-20210729234009010

调度配置

  • 调度类型:
    • 无:该类型不会主动触发调度;
    • CRON:该类型将会通过CRON,触发任务调度;
    • 固定速度:该类型将会以固定速度,触发任务调度;按照固定的间隔时间,周期性触发;

image-20210729234114283

任务配置

  • 运行模式:

    BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 “JobHandler” 属性匹配执行器中任务;

  • JobHandler:运行模式为 “BEAN模式” 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值;

  • 执行参数:任务执行所需的参数;

image-20210729234219162

阻塞处理策略

阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;

  • 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO(First Input First Output)队列并以串行方式运行;

  • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;

  • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

image-20210729234256062

路由策略

当执行器集群部署时,提供丰富的路由策略,包括;

  • FIRST(第一个):固定选择第一个机器;

  • LAST(最后一个):固定选择最后一个机器;

  • ROUND(轮询)

  • RANDOM(随机):随机选择在线的机器;

  • CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。

  • LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;

  • LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;

  • FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;

  • BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;

  • SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

image-20210729234409132

7 xxl-job 案例

7.1 轮询

1.修改任务为轮询

image-20210729234513775

2.启动多个微服务

image-20210729234536483

修改yml配置文件

1
2
3
4
5
6
7
8
9
10
11
server:
port: ${port:8881}


xxl:
job:
admin:
addresses: http://192.168.200.146:8888/xxl-job-admin
executor:
appname: xxl-job-executor-sample
port: ${executor.port:9999}

3.启动多个微服务

每个微服务轮询的去执行任务

7.2 分片广播

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务

image-20210729234756221

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务

image-20210729234822935

具体案例

需求:让两个节点同时执行10000个任务,每个节点分别执行5000个任务

①:在xxl-job-executor-sample执行器下新创建任务,路由策略为分片广播

image-20210729234948571

②:分片广播代码

分片参数

 index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;

 total:总分片数,执行器集群的总机器数量;

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.heima.xxljob.job;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Component
public class HelloJob {

@Value("${server.port}")
private String port;


@XxlJob("demoJobHandler")
public void helloJob(){
System.out.println("简单任务执行了。。。。"+port);

}

@XxlJob("shardingJobHandler")
public void shardingJobHandler(){
//分片的参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

//业务逻辑
List<Integer> list = getList();
for (Integer integer : list) {
if(integer % shardTotal == shardIndex){
System.out.println("当前第"+shardIndex+"分片执行了,任务项为:"+integer);
}
}
}

public List<Integer> getList(){
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
list.add(i);
}
return list;
}
}

④:测试

启动多个微服务测试,一次执行可以执行多个任务

  • Title: xxl-job 使用
  • Author: Lu
  • Created at : 2024-07-17 19:40:44
  • Updated at : 2024-07-17 21:40:44
  • Link: https://lusy.ink/2024/07/17/XXL-job/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments