javascript
SpringBatch 自定义ItemReader和可重新启动Reader(十五)
文章目錄
- 一、自定義CustomItemReader
- 二、job 監(jiān)聽器
- 三、配置job
- 四、改造CustomItemReader,發(fā)生異常批處理作業(yè)從停止的地方重新啟動(dòng)
前言:在一些業(yè)務(wù)場(chǎng)景中,可能現(xiàn)有的reader不符合我們的要求,SpringBatch提供自定義reader,實(shí)現(xiàn)ItemReader接口,滿足我們業(yè)務(wù)場(chǎng)景。
SpringBatch其它文章直通車:
- SpringBatch讀單個(gè)文件(FlatFileItemReader)和寫單個(gè)文件(FlatFileItemWriter)(一)
- SpringBatch順序讀取多文件(MultiResourceItemReader)和順序?qū)懳募?MultiResourceItemWriter)(二)
- SpringBatch讀數(shù)據(jù)庫(kù)(MyBatisPagingItemReader)(三)
- SpringBatch讀文件(FlatFileItemReader)寫據(jù)庫(kù)(MyBatisBatchItemWriter)(四)
- SpringBatch 監(jiān)聽器之Job監(jiān)聽器(JobExecutionListener)和Step監(jiān)聽器(StepExecutionListener)(五)
- SpringBatch 監(jiān)聽器之Chunk監(jiān)聽器(ChunkListener)和Skip監(jiān)聽器(SkipListener)(六)
- SpringBatch 多線程(TaskExecutor)啟動(dòng)Job詳解 (七)
- SpringBatch 配置并行啟動(dòng)Job詳解 (八)
- SpringBatch 批處理分區(qū)(Partitioner )分片(九)
- SpringBatch tasklet實(shí)現(xiàn)和用法(十)
- SpringBatch 讀取JSON(JsonItemReader)用法(十一)
- SpringBatch 寫文件JSON(JsonFileItemWriter)用法(十二)
- SpringBatch 讀取xml文件(StaxEventItemReader)用法(十三)
- SpringBatch 寫xml文件(StaxEventItemWriter)用法(十四)
代碼已上傳GitHub上面地址:git源碼地址
一、自定義CustomItemReader
創(chuàng)建了一個(gè)簡(jiǎn)單的ItemReader實(shí)現(xiàn),它從提供的列表中讀取數(shù)據(jù)。我們首先實(shí)現(xiàn)ItemReader最基本的read方法
package com.sl.common;import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException;import java.util.List;/*** 自定義reader* @author shuliangzhao* @Title: CustomItemReader* @ProjectName spring-boot-learn* @Description: TODO* @date 2019/9/21 14:49*/ public class CustomItemReader<T> implements ItemReader<T> {private List<String> list;public CustomItemReader(List<String> list) {this.list = list;}@Overridepublic T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {if (list.size() > 0 && !list.isEmpty()) {return (T)list.remove(0);}return null;} }二、job 監(jiān)聽器
job執(zhí)行之前加載數(shù)據(jù)供reader使用
package com.sl.listener;import com.sl.common.CommonConstants; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.stereotype.Component;/*** @author shuliangzhao* @Title: CustomJobListener* @ProjectName spring-boot-learn* @Description: TODO* @date 2019/9/21 14:59*/ @Component public class CustomJobListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {CommonConstants.getList().add("hello");CommonConstants.getList().add("springbatch");}@Overridepublic void afterJob(JobExecution jobExecution) {} }三、配置job
package com.sl.config;import com.sl.common.CommonConstants; import com.sl.common.CustomItemReader; import com.sl.listener.CustomJobListener; import com.sl.writer.CustomItemWriter; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** 自定義重啟reader* @author shuliangzhao* @Title: CustomConfiguration* @ProjectName spring-boot-learn* @Description: TODO* @date 2019/9/21 14:55*/ @Configuration @EnableBatchProcessing public class CustomConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate CustomJobListener customJobListener;@Autowiredprivate CustomItemWriter customItemWriter;@Beanpublic Job customJob() {return jobBuilderFactory.get("customJob").listener(customJobListener).start(customStep()).build();}@Beanpublic Step customStep() {return stepBuilderFactory.get("customStep").chunk(10).reader(customItemReader()).writer(customItemWriter).build();}@Bean@StepScopepublic CustomItemReader customItemReader() {return new CustomItemReader(CommonConstants.getList());} }敲黑板:如果這個(gè)job在執(zhí)行過(guò)程中發(fā)生錯(cuò)誤,是不能重啟。接下來(lái)我們講解怎么改進(jìn)reader可以讓我們的job在發(fā)生異常,批處理作業(yè)從停止的地方重新啟動(dòng)。
四、改造CustomItemReader,發(fā)生異常批處理作業(yè)從停止的地方重新啟動(dòng)
目前,如果處理被中斷并重新開始,ItemReader必須從頭開始。在許多場(chǎng)景中,這實(shí)際上是有效的,但有時(shí)更可取的做法是,批處理作業(yè)從停止的地方重新啟動(dòng)。關(guān)鍵的區(qū)別通常是閱讀器是有狀態(tài)的還是無(wú)狀態(tài)的。無(wú)狀態(tài)讀取器不需要擔(dān)心可重啟性,但是有狀態(tài)讀取器必須嘗試在重啟時(shí)重新構(gòu)建其最后一個(gè)已知狀態(tài)。出于這個(gè)原因,我們建議盡可能保持自定義讀取器處于無(wú)狀態(tài),因此不必?fù)?dān)心可重啟性。
CustomItemReader還需要實(shí)現(xiàn)接口ItemStream
package com.sl.common;import org.springframework.batch.item.*;import java.util.List;/*** 自定義reader* @author shuliangzhao* @Title: CustomItemReader* @ProjectName spring-boot-learn* @Description: TODO* @date 2019/9/21 14:49*/ public class CustomRestaItemReader<T> implements ItemReader<T> , ItemStream {private List<T> list;int currentIndex = 0;private static final String CURRENT_INDEX = "current.index";public CustomRestaItemReader(List<T> list) {this.list = list;}@Overridepublic T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {if (currentIndex < list.size()) {return list.get(currentIndex++);}return null;}@Overridepublic void open(ExecutionContext executionContext) throws ItemStreamException {if(executionContext.containsKey(CURRENT_INDEX)){currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();}else{currentIndex = 0;}}@Overridepublic void update(ExecutionContext executionContext) throws ItemStreamException {executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());}@Overridepublic void close() throws ItemStreamException {} }實(shí)現(xiàn)reader重啟關(guān)鍵方法是open方法和update方法
總結(jié)
以上是生活随笔為你收集整理的SpringBatch 自定义ItemReader和可重新启动Reader(十五)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringBatch 写xml文件(S
- 下一篇: SpringBatch job执行流程分