SpringBoot系列 - 批处理

Spring Batch是一个轻量级的框架,完全面向Spring的批处理框架,用于企业级大量的数据读写处理系统。以POJO和Spring 框架为基础, 包括日志记录/跟踪,事务管理、 作业处理统计工作重新启动、跳过、资源管理等功能。

Spring Batch官网是这样介绍的自己:一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。

框架主要有以下功能:

  • Transaction management(事务管理)
  • Chunk based processing(基于块的处理)
  • Declarative I/O(声明式的输入输出)
  • Start/Stop/Restart(启动/停止/再启动)
  • Retry/Skip(重试/跳过)

如果你的批处理程序需要使用上面的功能,那就大胆地使用它吧。

框架介绍

先用一个图让你有一个大概印象,这个东西是什么:

框架一共有5个主要角色:

  1. JobRepository是用户注册Job的容器,就是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等等信息。
  2. JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。
  3. Job代表着一个具体的任务。
  4. Step代表着一个具体的步骤,一个Job可以包含多个Step(想象把大象放进冰箱这个任务需要多少个步骤你就明白了)。
  5. Item就是输出->处理->输出,一个完整Step流程。

下面简要的介绍一下这5个角色

JobRepository

JobRepository用于存储任务执行的状态信息,比如什么时间点执行了什么任务、任务执行结果如何等等。 框架提供了2种实现,一种是通过Map形式保存在内存中,当Java程序重启后任务信息也就丢失了, 并且在分布式下无法获取其他节点的任务执行情况;另一种是保存在数据库中,并且将数据保存在下面6张表里:

  • BATCH_JOB_INSTANCE
  • BATCH_JOB_EXECUTION_PARAMS
  • BATCH_JOB_EXECUTION
  • BATCH_STEP_EXECUTION
  • BATCH_JOB_EXECUTION_CONTEXT
  • BATCH_STEP_EXECUTION_CONTEXT

Spring Batch框架的JobRepository支持主流的数据库:DB2、Derby、H2、HSQLDB、MySQL、Oracle、PostgreSQL、SQLServer、Sybase。

JobLauncher

JobLauncher是任务启动器,该接口只有一个run方法:

1
2
3
public interface JobLauncher {
JobExecution run(Job job, JobParameters jobParameters);
}

除了传入Job对象之外,还需要传入JobParameters对象,后续讲到Job再解释为什么要多传一个JobParameters。 通过JobLauncher可以在Java程序中调用批处理任务,也可以通过命令行或者其他框架 (如定时调度框架Quartz、Web后台框架Spring MVC)中调用批处理任务。 Spring Batch框架提供了一个JobLauncher的实现类SimpleJobLauncher。

Job

Job代表着一个任务,一个Job与一个或者多个JobInstance相关联,而一个JobInstance又与一个或者多个JobExecution相关联:

考虑到任务可能不是只执行一次就再也不执行了,更多的情况可能是定时任务,如每天执行一次,每个星期执行一次等等, 那么为了区分每次执行的任务,框架使用了JobInstance。如上图所示,Job是一个EndOfDay(每天最后时刻执行的任务), 那么其中一个JobInstance就代表着2007年5月5日那天执行的任务实例。 框架通过在执行JobLauncher.run(Job, JobParameters)方法时传入的JobParameters来区分是哪一天的任务。

由于2007年5月5日那天执行的任务可能不会一次就执行完成,比如中途被停止,或者出现异常导致中断, 需要多执行几次才能完成,所以框架使用了JobExecution来表示每次执行的任务。

Step

一个Job任务可以分为几个Step步骤,与JobExection相同,每次执行Step的时候使用StepExecution来表示执行的步骤。 每一个Step还包含着一个ItemReader、ItemProcessor、ItemWriter,下面分别介绍这三者。

ItemReader

ItemReader代表着读操作,其接口如下:

1
2
3
public interface ItemReader<T> {
T read();
}

框架已经提供了多种ItemReader接口的实现类,包括对文本文件、CSV文件、XML文件、数据库、JMS消息等读的处理,当然我们也可以自己实现该接口。

ItemProcessor

ItemReader代表着处理操作,其接口如下:

1
2
3
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}

process方法的形参传入I类型的对象,通过处理后返回O型的对象。开发者可以实现自己的业务代码来对数据进行处理。

ItemWriter

ItemReader代表着写操作,其接口如下:

1
2
3
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}

框架已经提供了多种ItemWriter接口的实现类,包括对文本文件、CSV文件、XML文件、数据库、JMS消息等写的处理,当然我们也可以自己实现该接口。

Job监听器

还可以自定义任务监听器,在任务启动和完成之后进行相应的通知和响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 监听器实现JobExecutionListener接口,并重写其beforeJob,afterJob方法即可
*/
public class MyJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {

}

@Override
public void afterJob(JobExecution jobExecution) {

}
}

数据校验

我们可以JSR-303(主要实现由hibernate-validator)的注解,来校验ItemReader读取到的数据是否满足要求。

首先让我们的ItemProcessor实现ValidatingItemProcessor接口:

1
2
3
4
5
6
7
public class MyItemProcessor extends ValidatingItemProcessor<User> {
@Override
public User process(User item) throws ValidationException {
super.process(item);
return item;
}
}

然后定义自己的校验器,实现的Validator接口来自于Spring,我们将使用JSR-303的Validator来校验:

1
2
3
4
5
6
7
8
9
10
11
public class MyBeanValidator<T> implements Validator<T>,InitializingBean {
private Validator validator;

@Override
public void afterPropertiesSet() throws Exception {
}

@Override
public void validate(T value)throws ValidationException{
}
}

在定义我们的MyItemProcessor时必须将MyBeanValidator设置进去,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public ItemProcessor<User,User> processor(){
//新建ItemProcessor接口的实现类返回
MyItemProcessor processor = new MyItemProcessor();
processor.setValidator(myBeanValidator());
return processor;
}

@Bean
public Validator<User> myBeanValidator(){
return new MyBeanValidator<User>();
}

SpringBoot集成实例

Spring Boot对Spring Batch支持的源码位于org.springframework.boot.autoconfigure.batch下。

Spring Boot为我们自动初始化了Spring Batch存储批处理记录的数据库,且当我们程序启动时, 会自动执行我们定义的Job的Bean,不过我们可以通过配置定时器或手动触发方式启动。

Spring Boot提供如下属性来定制Spring Batch:

1
2
3
4
5
6
7
8
9
#启动时要执行的job,默认执行全部job
spring.batch.job.name=job1,job2
#是否自动执行定义的job,默认是
spring.batch.job.enabled=true
#是否初始化Spring Batch的数据库,默认为是
spring.batch.initializer.enabled=true
spring.batch.schema=
#设置Spring Batch的数据库表的前缀
spring.batch.table-prefix=

下面通过一个真实的例子,需要将客户导过来的csv文件导入到我们的业务Oracle数据库中,来说明怎样在SpringBoot中使用批处理框架。

添加maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<exclusions>
<exclusion>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
</exclusion>
</exclusions>
</dependency>

<!--添加hibernate-validator依赖,作为数据校验使用-->
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.0.7.Final</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.1.Final</version>
</dependency>
<dependency>
<groupId>javax.el</groupId>
<artifactId>javax.el-api</artifactId>
<version>3.0.1-b04</version>
</dependency>
<dependency>
<groupId>org.glassfish.web</groupId>
<artifactId>javax.el</artifactId>
<version>2.2.6</version>
</dependency>
<!--添加hibernate-validator依赖 END-->

<!-- Oracle驱动包 -->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.4.0-atlassian-hosted</version>
</dependency>

配置application.yml

1
2
3
4
5
6
7
8
9
10
11
12
###################  spring配置  ###################
spring:
profiles:
active: dev
batch:
job:
enabled: false
datasource:
driver-class-name: oracle.jdbc.driver.OracleDriver
url: jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:orcl11g
username: adm_real
password: adm_real

真实csv数据,位于src/main/resources/NT_BSC_BUDGETVTOLL.csv

表定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE "ADM_REAL"."NT_BSC_BUDGETVTOLL"
(
"F_ID" VARCHAR2(100 BYTE) NOT NULL,
"F_YEAR" VARCHAR2(4 BYTE) NULL,
"F_TOLLID" VARCHAR2(50 BYTE) NULL,
"F_BUDGETID" VARCHAR2(50 BYTE) NULL,
"F_CBUDGETID" VARCHAR2(50 BYTE) NULL,
"F_VERSION" VARCHAR2(1 BYTE) DEFAULT '1' NULL,
"F_AUDITMSG" VARCHAR2(100 BYTE) NULL,
"F_TRIALSTATUS" VARCHAR2(1 BYTE) DEFAULT '0' NULL,
"F_FIRAUDITER" VARCHAR2(50 BYTE) NULL,
"F_FIRAUDITTIME" VARCHAR2(20 BYTE) NULL,
"F_FINAUDITER" VARCHAR2(50 BYTE) NULL,
"F_FINAUDITTIME" VARCHAR2(64 BYTE) NULL,
"F_EDITTIME" VARCHAR2(64 BYTE) NULL,
"F_STARTDATE" VARCHAR2(8 BYTE) NULL,
"F_ENDDATE" VARCHAR2(8 BYTE) NULL,
)

领域模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class BudgetVtoll {
private String id;
private String year;
private String tollid;
private String budgetid;
private String cbudgetid;
private String version;
/**
* 使用JSR-303注解来校验数据
*/
@Size(max = 100)
private String auditmsg;
private String trialstatus;
private String firauditer;
private String firaudittime;
private String finauditer;
private String finaudittime;
private String edittime;
private String startdate;
private String enddate;

// 下面省略get/set方法
}

数据处理及校验

定义处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
public class CsvItemProcessor extends ValidatingItemProcessor<BudgetVtoll> {
@Override
public BudgetVtoll process(BudgetVtoll item) throws ValidationException {
/*
* 需要执行super.process(item)才会调用自定义校验器
*/
super.process(item);
/*
* 对数据进行简单的处理和转换 todo
*/
return item;
}
}

校验器定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {
private javax.validation.Validator validator;

@Override
public void validate(T value) throws ValidationException {
/*
* 使用Validator的validate方法校验数据
*/
Set<ConstraintViolation<T>> constraintViolations = validator.validate(value);
if (constraintViolations.size() > 0) {
StringBuilder message = new StringBuilder();
for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message.append(constraintViolation.getMessage()).append("\n");
}
throw new ValidationException(message.toString());
}
}

/**
* 使用JSR-303的Validator来校验我们的数据,在此进行JSR-303的Validator的初始化
*/
@Override
public void afterPropertiesSet() {
ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
validator = validatorFactory.usingContext().getValidator();
}
}

任务监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CsvJobListener implements JobExecutionListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private long startTime;
private long endTime;

@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
logger.info("任务处理开始");
}

@Override
public void afterJob(JobExecution jobExecution) {
endTime = System.currentTimeMillis();
logger.info("任务处理结束,总耗时=" + (endTime - startTime) + "ms");
}
}

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@Configuration
@EnableBatchProcessing
public class CsvBatchConfig {
/**
* ItemReader定义,用来读取数据
* 1,使用FlatFileItemReader读取文件
* 2,使用FlatFileItemReader的setResource方法设置csv文件的路径
* 3,对此对cvs文件的数据和领域模型类做对应映射
*
* @return FlatFileItemReader
*/
@Bean
@StepScope
public FlatFileItemReader<BudgetVtoll> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) {
FlatFileItemReader<BudgetVtoll> reader = new FlatFileItemReader<>();
// reader.setResource(new ClassPathResource(pathToFile));
reader.setResource(new FileSystemResource(pathToFile));
reader.setLineMapper(new DefaultLineMapper<BudgetVtoll>() {
{
setLineTokenizer(new DelimitedLineTokenizer(",") {
{
setNames(new String[]{
"id","year","tollid","budgetid", "cbudgetid", "version", "auditmsg", "trialstatus",
"firauditer", "firaudittime", "finauditer", "finaudittime", "edittime", "startdate", "enddate"
});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<BudgetVtoll>() {
{
setTargetType(BudgetVtoll.class);
}
});
}
});
return reader;
}

/**
* ItemProcessor定义,用来处理数据
*
* @return
*/
@Bean
public ItemProcessor<BudgetVtoll, BudgetVtoll> processor() {
//使用我们自定义的ItemProcessor的实现CsvItemProcessor
CsvItemProcessor processor = new CsvItemProcessor();
//为processor指定校验器为CsvBeanValidator()
processor.setValidator(csvBeanValidator());
return processor;
}

/**
* ItemWriter定义,用来输出数据
* spring能让容器中已有的Bean以参数的形式注入,Spring Boot已经为我们定义了dataSource
*
* @param dataSource
* @return
*/
@Bean
public ItemWriter<BudgetVtoll> writer(DruidDataSource dataSource) {
JdbcBatchItemWriter<BudgetVtoll> writer = new JdbcBatchItemWriter<>();
//我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());

String sql = "insert into BudgetVtoll " + " (f_id,f_year,f_tollid,f_budgetid,f_cbudgetid,f_version,f_auditmsg,f_trialstatus,f_firauditer,f_firaudittime,f_finauditer,f_finaudittime,f_edittime,f_startdate,f_enddate) "
+ " values(:id,:year,:tollid,:budgetid,:cbudgetid,:version,:auditmsg,:trialstatus,:firauditer,:firaudittime,:finauditer,:finaudittime,:edittime,:startdate,:enddate)";
//在此设置要执行批处理的SQL语句
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}

/**
* JobRepository,用来注册Job的容器
* jobRepositor的定义需要dataSource和transactionManager,Spring Boot已为我们自动配置了
* 这两个类,Spring可通过方法注入已有的Bean
*
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public JobRepository jobRepository(DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {

JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType(String.valueOf(DatabaseType.ORACLE));
// 下面事务隔离级别的配置是针对Oracle的
jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
jobRepositoryFactoryBean.afterPropertiesSet();
return jobRepositoryFactoryBean.getObject();
}

/**
* JobLauncher定义,用来启动Job的接口
*
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public SimpleJobLauncher jobLauncher(DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
return jobLauncher;
}

/**
* Job定义,我们要实际执行的任务,包含一个或多个Step
*
* @param jobBuilderFactory
* @param s1
* @return
*/
@Bean
public Job importJob(JobBuilderFactory jobBuilderFactory, Step s1) {
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(s1)//为Job指定Step
.end()
.listener(csvJobListener())//绑定监听器csvJobListener
.build();
}

/**
* step步骤,包含ItemReader,ItemProcessor和ItemWriter
*
* @param stepBuilderFactory
* @param reader
* @param writer
* @param processor
* @return
*/
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<BudgetVtoll> reader, ItemWriter<BudgetVtoll> writer,
ItemProcessor<BudgetVtoll, BudgetVtoll> processor) {
return stepBuilderFactory
.get("step1")
.<BudgetVtoll, BudgetVtoll>chunk(65000)//批处理每次提交65000条数据
.reader(reader)//给step绑定reader
.processor(processor)//给step绑定processor
.writer(writer)//给step绑定writer
.build();
}

@Bean
public CsvJobListener csvJobListener() {
return new CsvJobListener();
}

@Bean
public Validator<BudgetVtoll> csvBeanValidator() {
return new MyBeanValidator<>();
}
}

注意,在定义Job的时候,我通过@StepScope注解,可以通过传递参数的方式将csv文件路径传进去。

测试类

最后再让我们写个测试类看看能不能成功:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class BatchServiceTest {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importJob;

@Test
public void testBatch1() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", "E:\\NT_BSC_BUDGETVTOLL.csv")
.toJobParameters();
jobLauncher.run(importJob, jobParameters);
logger.info("testBatch1执行完成");
}
}

把csv文件放入resources/目录下面,然后执行测试。看看输出结果:

1
2
3
4
5
6
: Job: [FlowJob: [name=importJob]] launched with the following parameters: [{time=1517654976174, input.file.name=E:\NT_BSC_BUDGETVTOLL.csv}]
: 任务处理开始
: Executing step: [step1]
: 任务处理结束,总耗时=810ms
: Job: [FlowJob: [name=importJob]] completed with the following parameters: [{time=1517654976174, input.file.name=E:\NT_BSC_BUDGETVTOLL.csv}] and the following status: [COMPLETED]
: testBatch1执行完成

再去查看数据库里面的数据,已经正常写入了。

并发执行多个Job

SpringBatch批处理框架默认使用单线程完成所有任务的执行,官方推荐配置任务执行器来并发执行, 提高批处理的效率。

Spring Core 为我们提供了多种执行器实现(包括多种异步执行器),我们可以根据实际情况灵活选择使用。

类名 描述 是否异步
SyncTaskExecutor 简单同步执行器
SimpleAsyncTaskExecutor 简单异步执行器,提供最基本的异步实现
WorkManagerTaskExecutor 该类作为通过 JCA 规范进行任务执行的实现
ThreadPoolTaskExecutor 线程池任务执行器

配置线程池执行Job:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(200);
return taskExecutor;
}

@Bean
public SimpleJobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, DruidDataSource dataSource,
PlatformTransactionManager transactionManager) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
return jobLauncher;
}

重点是上面的那句jobLauncher.setTaskExecutor(taskExecutor);

这里的并发表示的是执行不同的Job使用线程池,每个Job实例会分配一个线程去执行。这个是最推荐的做法。

如果你还想对单个Job执行逻辑采用多线程,可以再Step配置中加入线程池支持,不过需要保证你所有的Step都是线程安全的:

1
2
3
4
5
6
7
return stepBuilderFactory
.get("logStep1")
//设置每个Job通过并发方式执行,一般来讲一个Job就让它串行完成的好
.taskExecutor(new SimpleAsyncTaskExecutor())
//并发任务数为 10,默认为4
.throttleLimit(10)
.build();

这里我通过一个实际例子展示如何编写多个Job并发执行。

还是跟上面例子一样,将csv文件导入到表中,但是这个时候有两个csv文件,我要导入到两张表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE Z_TEST_APP
(
appid INT,
zname VARCHAR2 (20),
flag VARCHAR2 (2),
CONSTRAINT app_pk PRIMARY KEY (appid)
);

CREATE TABLE Z_TEST_LOG
(
logid INT,
msg VARCHAR2 (20),
logtime VARCHAR2 (8),
CONSTRAINT log_pk PRIMARY KEY (logid)
);

每个类型任务单独创建一个package,然后里面放两个类。以APP为例:

创建一个App类:

1
2
3
4
5
public class App {
private int appid;
private String zname;
private String flag;
}

然后创建一个AppConfig类配置任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class AppConfig {
/**
* ItemReader定义,用来读取数据
* 1,使用FlatFileItemReader读取文件
* 2,使用FlatFileItemReader的setResource方法设置csv文件的路径
* 3,对此对cvs文件的数据和领域模型类做对应映射
*
* @return FlatFileItemReader
*/
@Bean(name = "appReader")
@StepScope
public FlatFileItemReader<App> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) {
FlatFileItemReader<App> reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource(pathToFile));
reader.setLineMapper(new DefaultLineMapper<App>() {
{
setLineTokenizer(new DelimitedLineTokenizer("|") {
{
setNames(new String[]{
"appid", "zname", "flag"
});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<App>() {
{
setTargetType(App.class);
}
});
}
});
return reader;
}

// ....后面省略
}

注意,每个Bean配置都加一个name属性,然后自动注入里面需要通过@Qualifier注解来指定当前类中的Bean, 因为如果不指定name,Spring默认只会初始化一个Bean实例。

比如定义Job:

1
2
3
4
5
6
7
8
9
@Bean(name = "zappJob")
public Job zappJob(JobBuilderFactory jobBuilderFactory, @Qualifier("zappStep1") Step s1) {
return jobBuilderFactory.get("zappJob")
.incrementer(new RunIdIncrementer())
.flow(s1)//为Job指定Step
.end()
.listener(new MyJobListener("App"))//绑定监听器csvJobListener
.build();
}

另外一个LOG任务的配置也是同样。

好了,定义完成之后开始写测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void testTwoJobs() throws Exception {
JobParameters jobParameters1 = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", p.getCsvApp())
.toJobParameters();
jobLauncher.run(zappJob, jobParameters1);

JobParameters jobParameters2 = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", p.getCsvLog())
.toJobParameters();
jobLauncher.run(zlogJob, jobParameters2);

logger.info("main线程完成");
while (true) {
Thread.sleep(2000000L);
}
}

这个是运行日志:

1
2
3
4
5
6
7
8
9
10
11
[ taskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=zappJob]] launched with t
[ taskExecutor-1] com.xncoding.trans.modules.MyJobListener : 任务-App处理开始
[ main] com.xncoding.service.BatchServiceTest : main线程完成
[ taskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zlogJob]] launched with t
[ taskExecutor-2] com.xncoding.trans.modules.MyJobListener : 任务-Log处理开始
[ taskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [zappStep1]
[ taskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [logStep1]
[ taskExecutor-2] com.xncoding.trans.modules.MyJobListener : 任务-Log处理结束,总耗时=495ms
[ taskExecutor-1] com.xncoding.trans.modules.MyJobListener : 任务-App处理结束,总耗时=585ms
[ taskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zlogJob]] completed with
[ taskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zappJob]] completed with

异常处理

默认情况下,Spring Batch遇到异常的时候会终止处理,比如遇到csv文件中解析错误就会终止异常。如果我想忽略掉这些异常继续处理, 可以配置在Reader中忽略异常。

这个在Step的定义中配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
return stepBuilderFactory
.get("logStep1")
.<Log, Log>chunk(5000)//批处理每次提交5000条数据
.reader(reader)//给step绑定reader
.processor(processor)//给step绑定processor
.writer(writer)//给step绑定writer
.faultTolerant()
.retry(Exception.class) // 重试
.noRetry(ParseException.class)
.retryLimit(1) //每条记录重试一次
.skip(Exception.class)
.skipLimit(200) //一共允许跳过200次异常
// .taskExecutor(new SimpleAsyncTaskExecutor()) //设置每个Job通过并发方式执行,一般来讲一个Job就让它串行完成的好
// .throttleLimit(10) //并发任务数为 10,默认为4
.build();

上面定义了使用可容忍异常模式,遇到Exception异常就重试1次,对于ParseException异常不重试,所有异常都会忽略掉,不会导致程序终止。 但是最大允许跳过200次异常,超过这个数字就终止执行了。

然后改一下csv文件,把某个字段改大点,超过数据库中定义长度:

1
2
3
4
5
1|等哈哈哈|01
2|等哈哈哈|02
3|等哈哈哈|025555
4|等哈哈哈|02
5|等哈哈哈|02

重新执行发现正常执行完成,数据库中只插入了4条数据,id为3的没有。

通用配置

更进一步,如果有多个CSV文件需要导入,那么安装上面的写法。每次都要定义一个新的Config类,一个新的Bean类,代码重复率很高。

实际上可以定义一个通用配置,去掉里面的显示Bean类,通过Java反射机制,还有@StepScope注解, 实现每次运行时候根据JobParameters初始化不同的Job。

如果有一个新的CSV文件需要导入,只需要新建一个Bean,定义好相应的列,然后将Bean的属性列表、插入SQL语句作为参数传入即可。

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 测试一个配置类,可同时运行多个任务
* @throws Exception 异常
*/
@Test
public void testCommonJobs() throws Exception {
JobParameters jobParameters1 = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.addString(KEY_JOB_NAME, "App")
.addString(KEY_FILE_NAME, p.getCsvApp())
.addString(KEY_VO_NAME, "com.xncoding.trans.modules.zapp.App")
.addString(KEY_COLUMNS, String.join(",", new String[]{
"appid", "zname", "flag"
}))
.addString(KEY_SQL, "insert into z_test_App (appid, zname, flag) values(:appid, :zname, :flag)")
.toJobParameters();
jobLauncher.run(commonJob, jobParameters1);

JobParameters jobParameters2 = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.addString(KEY_JOB_NAME, "Log")
.addString(KEY_FILE_NAME, p.getCsvLog())
.addString(KEY_VO_NAME, "com.xncoding.trans.modules.zlog.Log")
.addString(KEY_COLUMNS, String.join(",", new String[]{
"logid", "msg", "logtime"
}))
.addString(KEY_SQL, "insert into z_test_Log (logid, msg, logtime) values(:logid, :msg, :logtime)")
.toJobParameters();
jobLauncher.run(commonJob, jobParameters2);

logger.info("Main线程执行完成");

while (true) {
Thread.sleep(2000000L);
}
}

FAQ

在进行Oracle操作的时候报一个错误:

1
ORA-08177: 无法连续访问此事务处理

原因:Spring Batch 默认是 ISOLATION_SERIALIZABLE

官方文档说明:

1
The default is ISOLATION_SERIALIZABLE, which prevents accidental concurrent execution of the same job (ISOLATION_REPEATABLE_READ would work as well).

而Oracle默认的事务隔离级别是ISOLATION_READ_COMMITTED

解决方案就是修改JobRepositoryFactoryBean的定义,加一个配置:

1
jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");

GitHub源码

springboot-batch