Spring Batch是一个轻量级的框架,完全面向Spring的批处理框架,用于企业级大量的数据读写处理系统。以POJO和Spring 框架为基础,
包括日志记录/跟踪,事务管理、 作业处理统计工作重新启动、跳过、资源管理等功能。
Spring Batch官网是这样介绍的自己:一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。
框架主要有以下功能:
Transaction management(事务管理)
Chunk based processing(基于块的处理)
Declarative I/O(声明式的输入输出)
Start/Stop/Restart(启动/停止/再启动)
Retry/Skip(重试/跳过)
如果你的批处理程序需要使用上面的功能,那就大胆地使用它吧。
框架介绍 先用一个图让你有一个大概印象,这个东西是什么:
框架一共有5个主要角色:
JobRepository是用户注册Job的容器,就是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等等信息。
JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。
Job代表着一个具体的任务。
Step代表着一个具体的步骤,一个Job可以包含多个Step(想象把大象放进冰箱这个任务需要多少个步骤你就明白了)。
Item就是输出->处理->输出,一个完整Step流程。
下面简要的介绍一下这5个角色
JobRepository JobRepository用于存储任务执行的状态信息,比如什么时间点执行了什么任务、任务执行结果如何等等。
框架提供了2种实现,一种是通过Map形式保存在内存中,当Java程序重启后任务信息也就丢失了,
并且在分布式下无法获取其他节点的任务执行情况;另一种是保存在数据库中,并且将数据保存在下面6张表里:
BATCH_JOB_INSTANCE
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_EXECUTION
BATCH_STEP_EXECUTION
BATCH_JOB_EXECUTION_CONTEXT
BATCH_STEP_EXECUTION_CONTEXT
Spring Batch框架的JobRepository支持主流的数据库:DB2、Derby、H2、HSQLDB、MySQL、Oracle、PostgreSQL、SQLServer、Sybase。
JobLauncher JobLauncher是任务启动器,该接口只有一个run方法:
1 2 3 public interface JobLauncher { JobExecution run (Job job, JobParameters jobParameters) ; }
除了传入Job对象之外,还需要传入JobParameters对象,后续讲到Job再解释为什么要多传一个JobParameters。
通过JobLauncher可以在Java程序中调用批处理任务,也可以通过命令行或者其他框架
(如定时调度框架Quartz、Web后台框架Spring MVC)中调用批处理任务。
Spring Batch框架提供了一个JobLauncher的实现类SimpleJobLauncher。
Job Job代表着一个任务,一个Job与一个或者多个JobInstance相关联,而一个JobInstance又与一个或者多个JobExecution相关联:
考虑到任务可能不是只执行一次就再也不执行了,更多的情况可能是定时任务,如每天执行一次,每个星期执行一次等等,
那么为了区分每次执行的任务,框架使用了JobInstance。如上图所示,Job是一个EndOfDay(每天最后时刻执行的任务),
那么其中一个JobInstance就代表着2007年5月5日那天执行的任务实例。
框架通过在执行JobLauncher.run(Job, JobParameters)方法时传入的JobParameters来区分是哪一天的任务。
由于2007年5月5日那天执行的任务可能不会一次就执行完成,比如中途被停止,或者出现异常导致中断,
需要多执行几次才能完成,所以框架使用了JobExecution来表示每次执行的任务。
Step 一个Job任务可以分为几个Step步骤,与JobExection相同,每次执行Step的时候使用StepExecution来表示执行的步骤。
每一个Step还包含着一个ItemReader、ItemProcessor、ItemWriter,下面分别介绍这三者。
ItemReader
ItemReader代表着读操作,其接口如下:
1 2 3 public interface ItemReader <T> { T read () ; }
框架已经提供了多种ItemReader接口的实现类,包括对文本文件、CSV文件、XML文件、数据库、JMS消息等读的处理,当然我们也可以自己实现该接口。
ItemProcessor
ItemReader代表着处理操作,其接口如下:
1 2 3 public interface ItemProcessor <I, O> { O process (I item) throws Exception; }
process方法的形参传入I类型的对象,通过处理后返回O型的对象。开发者可以实现自己的业务代码来对数据进行处理。
ItemWriter
ItemReader代表着写操作,其接口如下:
1 2 3 public interface ItemWriter <T> { void write (List<? extends T> items) throws Exception; }
框架已经提供了多种ItemWriter接口的实现类,包括对文本文件、CSV文件、XML文件、数据库、JMS消息等写的处理,当然我们也可以自己实现该接口。
Job监听器 还可以自定义任务监听器,在任务启动和完成之后进行相应的通知和响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class MyJobListener implements JobExecutionListener { @Override public void beforeJob (JobExecution jobExecution) { } @Override public void afterJob (JobExecution jobExecution) { } }
数据校验 我们可以JSR-303(主要实现由hibernate-validator)的注解,来校验ItemReader读取到的数据是否满足要求。
首先让我们的ItemProcessor实现ValidatingItemProcessor接口:
1 2 3 4 5 6 7 public class MyItemProcessor extends ValidatingItemProcessor <User> { @Override public User process (User item) throws ValidationException { super .process(item); return item; } }
然后定义自己的校验器,实现的Validator接口来自于Spring,我们将使用JSR-303的Validator来校验:
1 2 3 4 5 6 7 8 9 10 11 public class MyBeanValidator <T> implements Validator <T>,InitializingBean { private Validator validator; @Override public void afterPropertiesSet () throws Exception { } @Override public void validate (T value) throws ValidationException{ } }
在定义我们的MyItemProcessor时必须将MyBeanValidator设置进去,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 @Bean public ItemProcessor<User,User> processor () { MyItemProcessor processor = new MyItemProcessor (); processor.setValidator(myBeanValidator()); return processor; } @Bean public Validator<User> myBeanValidator () { return new MyBeanValidator <User>(); }
SpringBoot集成实例 Spring Boot对Spring Batch支持的源码位于org.springframework.boot.autoconfigure.batch
下。
Spring Boot为我们自动初始化了Spring Batch存储批处理记录的数据库,且当我们程序启动时,
会自动执行我们定义的Job的Bean,不过我们可以通过配置定时器或手动触发方式启动。
Spring Boot提供如下属性来定制Spring Batch:
1 2 3 4 5 6 7 8 9 #启动时要执行的job,默认执行全部job spring.batch.job.name=job1,job2 #是否自动执行定义的job,默认是 spring.batch.job.enabled=true #是否初始化Spring Batch的数据库,默认为是 spring.batch.initializer.enabled=true spring.batch.schema= #设置Spring Batch的数据库表的前缀 spring.batch.table-prefix=
下面通过一个真实的例子,需要将客户导过来的csv文件导入到我们的业务Oracle数据库中,来说明怎样在SpringBoot中使用批处理框架。
添加maven依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-batch</artifactId > <exclusions > <exclusion > <groupId > org.hsqldb</groupId > <artifactId > hsqldb</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > org.hibernate.validator</groupId > <artifactId > hibernate-validator</artifactId > <version > 6.0.7.Final</version > </dependency > <dependency > <groupId > javax.validation</groupId > <artifactId > validation-api</artifactId > <version > 2.0.1.Final</version > </dependency > <dependency > <groupId > javax.el</groupId > <artifactId > javax.el-api</artifactId > <version > 3.0.1-b04</version > </dependency > <dependency > <groupId > org.glassfish.web</groupId > <artifactId > javax.el</artifactId > <version > 2.2.6</version > </dependency > <dependency > <groupId > com.oracle</groupId > <artifactId > ojdbc6</artifactId > <version > 11.2.0.4.0-atlassian-hosted</version > </dependency >
配置application.yml 1 2 3 4 5 6 7 8 9 10 11 12 spring: profiles: active: dev batch: job: enabled: false datasource: driver-class-name: oracle.jdbc.driver.OracleDriver url: jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:orcl11g username: adm_real password: adm_real
真实csv数据,位于src/main/resources/NT_BSC_BUDGETVTOLL.csv
中
表定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE TABLE "ADM_REAL"."NT_BSC_BUDGETVTOLL"( "F_ID" VARCHAR2(100 BYTE) NOT NULL , "F_YEAR" VARCHAR2(4 BYTE) NULL , "F_TOLLID" VARCHAR2(50 BYTE) NULL , "F_BUDGETID" VARCHAR2(50 BYTE) NULL , "F_CBUDGETID" VARCHAR2(50 BYTE) NULL , "F_VERSION" VARCHAR2(1 BYTE) DEFAULT '1' NULL , "F_AUDITMSG" VARCHAR2(100 BYTE) NULL , "F_TRIALSTATUS" VARCHAR2(1 BYTE) DEFAULT '0' NULL , "F_FIRAUDITER" VARCHAR2(50 BYTE) NULL , "F_FIRAUDITTIME" VARCHAR2(20 BYTE) NULL , "F_FINAUDITER" VARCHAR2(50 BYTE) NULL , "F_FINAUDITTIME" VARCHAR2(64 BYTE) NULL , "F_EDITTIME" VARCHAR2(64 BYTE) NULL , "F_STARTDATE" VARCHAR2(8 BYTE) NULL , "F_ENDDATE" VARCHAR2(8 BYTE) NULL , )
领域模型类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class BudgetVtoll { private String id; private String year; private String tollid; private String budgetid; private String cbudgetid; private String version; @Size(max = 100) private String auditmsg; private String trialstatus; private String firauditer; private String firaudittime; private String finauditer; private String finaudittime; private String edittime; private String startdate; private String enddate; }
数据处理及校验 定义处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 public class CsvItemProcessor extends ValidatingItemProcessor <BudgetVtoll> { @Override public BudgetVtoll process (BudgetVtoll item) throws ValidationException { super .process(item); return item; } }
校验器定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class CsvBeanValidator <T> implements Validator <T>, InitializingBean { private javax.validation.Validator validator; @Override public void validate (T value) throws ValidationException { Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); if (constraintViolations.size() > 0 ) { StringBuilder message = new StringBuilder (); for (ConstraintViolation<T> constraintViolation : constraintViolations) { message.append(constraintViolation.getMessage()).append("\n" ); } throw new ValidationException (message.toString()); } } @Override public void afterPropertiesSet () { ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); } }
任务监听器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class CsvJobListener implements JobExecutionListener { private Logger logger = LoggerFactory.getLogger(this .getClass()); private long startTime; private long endTime; @Override public void beforeJob (JobExecution jobExecution) { startTime = System.currentTimeMillis(); logger.info("任务处理开始" ); } @Override public void afterJob (JobExecution jobExecution) { endTime = System.currentTimeMillis(); logger.info("任务处理结束,总耗时=" + (endTime - startTime) + "ms" ); } }
配置类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 @Configuration @EnableBatchProcessing public class CsvBatchConfig { @Bean @StepScope public FlatFileItemReader<BudgetVtoll> reader (@Value("#{jobParameters['input.file.name']}") String pathToFile) { FlatFileItemReader<BudgetVtoll> reader = new FlatFileItemReader <>(); reader.setResource(new FileSystemResource (pathToFile)); reader.setLineMapper(new DefaultLineMapper <BudgetVtoll>() { { setLineTokenizer(new DelimitedLineTokenizer ("," ) { { setNames(new String []{ "id" ,"year" ,"tollid" ,"budgetid" , "cbudgetid" , "version" , "auditmsg" , "trialstatus" , "firauditer" , "firaudittime" , "finauditer" , "finaudittime" , "edittime" , "startdate" , "enddate" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper <BudgetVtoll>() { { setTargetType(BudgetVtoll.class); } }); } }); return reader; } @Bean public ItemProcessor<BudgetVtoll, BudgetVtoll> processor () { CsvItemProcessor processor = new CsvItemProcessor (); processor.setValidator(csvBeanValidator()); return processor; } @Bean public ItemWriter<BudgetVtoll> writer (DruidDataSource dataSource) { JdbcBatchItemWriter<BudgetVtoll> writer = new JdbcBatchItemWriter <>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider <>()); String sql = "insert into BudgetVtoll " + " (f_id,f_year,f_tollid,f_budgetid,f_cbudgetid,f_version,f_auditmsg,f_trialstatus,f_firauditer,f_firaudittime,f_finauditer,f_finaudittime,f_edittime,f_startdate,f_enddate) " + " values(:id,:year,:tollid,:budgetid,:cbudgetid,:version,:auditmsg,:trialstatus,:firauditer,:firaudittime,:finauditer,:finaudittime,:edittime,:startdate,:enddate)" ; writer.setSql(sql); writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository (DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean (); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType(String.valueOf(DatabaseType.ORACLE)); jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED" ); jobRepositoryFactoryBean.afterPropertiesSet(); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher (DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher (); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob (JobBuilderFactory jobBuilderFactory, Step s1) { return jobBuilderFactory.get("importJob" ) .incrementer(new RunIdIncrementer ()) .flow(s1) .end() .listener(csvJobListener()) .build(); } @Bean public Step step1 (StepBuilderFactory stepBuilderFactory, ItemReader<BudgetVtoll> reader, ItemWriter<BudgetVtoll> writer, ItemProcessor<BudgetVtoll, BudgetVtoll> processor) { return stepBuilderFactory .get("step1" ) .<BudgetVtoll, BudgetVtoll>chunk(65000 ) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public CsvJobListener csvJobListener () { return new CsvJobListener (); } @Bean public Validator<BudgetVtoll> csvBeanValidator () { return new MyBeanValidator <>(); } }
注意,在定义Job的时候,我通过@StepScope
注解,可以通过传递参数的方式将csv文件路径传进去。
测试类 最后再让我们写个测试类看看能不能成功:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class BatchServiceTest { private Logger logger = LoggerFactory.getLogger(this .getClass()); @Autowired private JobLauncher jobLauncher; @Autowired private Job importJob; @Test public void testBatch1 () throws Exception { JobParameters jobParameters = new JobParametersBuilder () .addLong("time" , System.currentTimeMillis()) .addString("input.file.name" , "E:\\NT_BSC_BUDGETVTOLL.csv" ) .toJobParameters(); jobLauncher.run(importJob, jobParameters); logger.info("testBatch1执行完成" ); } }
把csv文件放入resources/目录下面,然后执行测试。看看输出结果:
1 2 3 4 5 6 : Job: [FlowJob: [name=importJob]] launched with the following parameters: [{time=1517654976174, input.file.name=E:\NT_BSC_BUDGETVTOLL.csv}] : 任务处理开始 : Executing step: [step1] : 任务处理结束,总耗时=810ms : Job: [FlowJob: [name=importJob]] completed with the following parameters: [{time=1517654976174, input.file.name=E:\NT_BSC_BUDGETVTOLL.csv}] and the following status: [COMPLETED] : testBatch1执行完成
再去查看数据库里面的数据,已经正常写入了。
并发执行多个Job SpringBatch批处理框架默认使用单线程完成所有任务的执行,官方推荐配置任务执行器来并发执行,
提高批处理的效率。
Spring Core 为我们提供了多种执行器实现(包括多种异步执行器),我们可以根据实际情况灵活选择使用。
类名
描述
是否异步
SyncTaskExecutor
简单同步执行器
否
SimpleAsyncTaskExecutor
简单异步执行器,提供最基本的异步实现
是
WorkManagerTaskExecutor
该类作为通过 JCA 规范进行任务执行的实现
是
ThreadPoolTaskExecutor
线程池任务执行器
是
配置线程池执行Job:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Bean public ThreadPoolTaskExecutor taskExecutor () { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor (); taskExecutor.setCorePoolSize(5 ); taskExecutor.setMaxPoolSize(10 ); taskExecutor.setQueueCapacity(200 ); return taskExecutor; } @Bean public SimpleJobLauncher jobLauncher (ThreadPoolTaskExecutor taskExecutor, DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher (); jobLauncher.setTaskExecutor(taskExecutor); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; }
重点是上面的那句jobLauncher.setTaskExecutor(taskExecutor);
这里的并发表示的是执行不同的Job使用线程池,每个Job实例会分配一个线程去执行。这个是最推荐的做法。
如果你还想对单个Job执行逻辑采用多线程,可以再Step配置中加入线程池支持,不过需要保证你所有的Step都是线程安全的:
1 2 3 4 5 6 7 return stepBuilderFactory .get("logStep1" ) .taskExecutor(new SimpleAsyncTaskExecutor ()) .throttleLimit(10 ) .build();
这里我通过一个实际例子展示如何编写多个Job并发执行。
还是跟上面例子一样,将csv文件导入到表中,但是这个时候有两个csv文件,我要导入到两张表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE TABLE Z_TEST_APP( appid INT , zname VARCHAR2 (20 ), flag VARCHAR2 (2 ), CONSTRAINT app_pk PRIMARY KEY (appid) ); CREATE TABLE Z_TEST_LOG( logid INT , msg VARCHAR2 (20 ), logtime VARCHAR2 (8 ), CONSTRAINT log_pk PRIMARY KEY (logid) );
每个类型任务单独创建一个package,然后里面放两个类。以APP为例:
创建一个App类:
1 2 3 4 5 public class App { private int appid; private String zname; private String flag; }
然后创建一个AppConfig类配置任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Configuration public class AppConfig { @Bean(name = "appReader") @StepScope public FlatFileItemReader<App> reader (@Value("#{jobParameters['input.file.name']}") String pathToFile) { FlatFileItemReader<App> reader = new FlatFileItemReader <>(); reader.setResource(new FileSystemResource (pathToFile)); reader.setLineMapper(new DefaultLineMapper <App>() { { setLineTokenizer(new DelimitedLineTokenizer ("|" ) { { setNames(new String []{ "appid" , "zname" , "flag" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper <App>() { { setTargetType(App.class); } }); } }); return reader; } }
注意,每个Bean配置都加一个name属性,然后自动注入里面需要通过@Qualifier注解来指定当前类中的Bean,
因为如果不指定name,Spring默认只会初始化一个Bean实例。
比如定义Job:
1 2 3 4 5 6 7 8 9 @Bean(name = "zappJob") public Job zappJob (JobBuilderFactory jobBuilderFactory, @Qualifier("zappStep1") Step s1) { return jobBuilderFactory.get("zappJob" ) .incrementer(new RunIdIncrementer ()) .flow(s1) .end() .listener(new MyJobListener ("App" )) .build(); }
另外一个LOG任务的配置也是同样。
好了,定义完成之后开始写测试方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Test public void testTwoJobs () throws Exception { JobParameters jobParameters1 = new JobParametersBuilder () .addLong("time" , System.currentTimeMillis()) .addString("input.file.name" , p.getCsvApp()) .toJobParameters(); jobLauncher.run(zappJob, jobParameters1); JobParameters jobParameters2 = new JobParametersBuilder () .addLong("time" , System.currentTimeMillis()) .addString("input.file.name" , p.getCsvLog()) .toJobParameters(); jobLauncher.run(zlogJob, jobParameters2); logger.info("main线程完成" ); while (true ) { Thread.sleep(2000000L ); } }
这个是运行日志:
1 2 3 4 5 6 7 8 9 10 11 [ taskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zappJob]] launched with t [ taskExecutor-1] com.xncoding.trans.modules.MyJobListener : 任务-App处理开始 [ main] com.xncoding.service.BatchServiceTest : main线程完成 [ taskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zlogJob]] launched with t [ taskExecutor-2] com.xncoding.trans.modules.MyJobListener : 任务-Log处理开始 [ taskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [zappStep1] [ taskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [logStep1] [ taskExecutor-2] com.xncoding.trans.modules.MyJobListener : 任务-Log处理结束,总耗时=495ms [ taskExecutor-1] com.xncoding.trans.modules.MyJobListener : 任务-App处理结束,总耗时=585ms [ taskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zlogJob]] completed with [ taskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zappJob]] completed with
异常处理 默认情况下,Spring Batch遇到异常的时候会终止处理,比如遇到csv文件中解析错误就会终止异常。如果我想忽略掉这些异常继续处理,
可以配置在Reader中忽略异常。
这个在Step的定义中配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 return stepBuilderFactory .get("logStep1" ) .<Log, Log>chunk(5000 ) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .retry(Exception.class) .noRetry(ParseException.class) .retryLimit(1 ) .skip(Exception.class) .skipLimit(200 ) .build();
上面定义了使用可容忍异常模式,遇到Exception异常就重试1次,对于ParseException异常不重试,所有异常都会忽略掉,不会导致程序终止。
但是最大允许跳过200次异常,超过这个数字就终止执行了。
然后改一下csv文件,把某个字段改大点,超过数据库中定义长度:
1 2 3 4 5 1|等哈哈哈|01 2|等哈哈哈|02 3|等哈哈哈|025555 4|等哈哈哈|02 5|等哈哈哈|02
重新执行发现正常执行完成,数据库中只插入了4条数据,id为3的没有。
通用配置 更进一步,如果有多个CSV文件需要导入,那么安装上面的写法。每次都要定义一个新的Config类,一个新的Bean类,代码重复率很高。
实际上可以定义一个通用配置,去掉里面的显示Bean类,通过Java反射机制,还有@StepScope注解,
实现每次运行时候根据JobParameters初始化不同的Job。
如果有一个新的CSV文件需要导入,只需要新建一个Bean,定义好相应的列,然后将Bean的属性列表、插入SQL语句作为参数传入即可。
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Test public void testCommonJobs () throws Exception { JobParameters jobParameters1 = new JobParametersBuilder () .addLong("time" ,System.currentTimeMillis()) .addString(KEY_JOB_NAME, "App" ) .addString(KEY_FILE_NAME, p.getCsvApp()) .addString(KEY_VO_NAME, "com.xncoding.trans.modules.zapp.App" ) .addString(KEY_COLUMNS, String.join("," , new String []{ "appid" , "zname" , "flag" })) .addString(KEY_SQL, "insert into z_test_App (appid, zname, flag) values(:appid, :zname, :flag)" ) .toJobParameters(); jobLauncher.run(commonJob, jobParameters1); JobParameters jobParameters2 = new JobParametersBuilder () .addLong("time" ,System.currentTimeMillis()) .addString(KEY_JOB_NAME, "Log" ) .addString(KEY_FILE_NAME, p.getCsvLog()) .addString(KEY_VO_NAME, "com.xncoding.trans.modules.zlog.Log" ) .addString(KEY_COLUMNS, String.join("," , new String []{ "logid" , "msg" , "logtime" })) .addString(KEY_SQL, "insert into z_test_Log (logid, msg, logtime) values(:logid, :msg, :logtime)" ) .toJobParameters(); jobLauncher.run(commonJob, jobParameters2); logger.info("Main线程执行完成" ); while (true ) { Thread.sleep(2000000L ); } }
FAQ 在进行Oracle操作的时候报一个错误:
原因:Spring Batch 默认是 ISOLATION_SERIALIZABLE
官方文档说明:
1 The default is ISOLATION_SERIALIZABLE, which prevents accidental concurrent execution of the same job (ISOLATION_REPEATABLE_READ would work as well).
而Oracle默认的事务隔离级别是ISOLATION_READ_COMMITTED
解决方案就是修改JobRepositoryFactoryBean
的定义,加一个配置:
1 jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED" );
GitHub源码 springboot-batch