javascript
使用 Spring Batch 构建企业级批处理应用
https://www.ibm.com/developerworks/cn/java/j-lo-springbatch1/index.html
https://www.ibm.com/developerworks/cn/java/j-lo-springbatch2/
引言
總述
本系列文章旨在通過示例搭建以及特性介紹,詳細講述如何利用 Spring Batch 開發(fā)企業(yè)批處理應(yīng)用。本系列文章共分為三部分,第一部分初步介紹了批處理以及 Spring Batch 的相關(guān)概念,同時搭建了一個簡單的基于 Spring Batch 的批處理應(yīng)用。第二部分介紹了 Step Flow 以及并發(fā)支持。第三部分則主要介紹了 Spring Batch 對任務(wù)監(jiān)控的支持。下面讓我們進入第一部分內(nèi)容。
什么是批處理
在現(xiàn)代企業(yè)應(yīng)用當(dāng)中,面對復(fù)雜的業(yè)務(wù)以及海量的數(shù)據(jù),除了通過龐雜的人機交互界面進行各種處理外,還有一類工作,不需要人工干預(yù),只需要定期讀入大批量數(shù)據(jù),然后完成相應(yīng)業(yè)務(wù)處理并進行歸檔。這類工作即為“批處理”。
從上面的描述可以看出,批處理應(yīng)用有如下幾個特點:
- 數(shù)據(jù)量大,少則百萬,多則上億的數(shù)量級。
- 不需要人工干預(yù),由系統(tǒng)根據(jù)配置自動完成。
- 與時間相關(guān),如每天執(zhí)行一次或每月執(zhí)行一次。
同時,批處理應(yīng)用又明顯分為三個環(huán)節(jié):
- 讀數(shù)據(jù),數(shù)據(jù)可能來自文件、數(shù)據(jù)庫或消息隊列等
- 數(shù)據(jù)處理,如電信支撐系統(tǒng)的計費處理
- 寫數(shù)據(jù),將輸出結(jié)果寫入文件、數(shù)據(jù)庫或消息隊列等
因此,從系統(tǒng)架構(gòu)上,應(yīng)重點考慮批處理應(yīng)用的事務(wù)粒度、日志監(jiān)控、執(zhí)行、資源管理(尤其存在并發(fā)的情況下)。從系統(tǒng)設(shè)計上,應(yīng)重點考慮數(shù)據(jù)讀寫與業(yè)務(wù)處理的解耦,提高復(fù)用性以及可測試性。
什么是 Spring Batch
Spring Batch 作為 Spring 的子項目,是一款基于 Spring 的企業(yè)批處理框架。通過它可以構(gòu)建出健壯的企業(yè)批處理應(yīng)用。Spring Batch 不僅提供了統(tǒng)一的讀寫接口、豐富的任務(wù)處理方式、靈活的事務(wù)管理及并發(fā)處理,同時還支持日志、監(jiān)控、任務(wù)重啟與跳過等特性,大大簡化了批處理應(yīng)用開發(fā),將開發(fā)人員從復(fù)雜的任務(wù)配置管理過程中解放出來,使他們可以更多地去關(guān)注核心的業(yè)務(wù)處理過程。
另外我們還需要知道,Spring Batch 是一款批處理應(yīng)用框架,不是調(diào)度框架。它只關(guān)注批處理任務(wù)相關(guān)的問題,如事務(wù)、并發(fā)、監(jiān)控、執(zhí)行等,并不提供相應(yīng)的調(diào)度功能。因此,如果我們希望批處理任務(wù)定期執(zhí)行,可結(jié)合 Quartz 等成熟的調(diào)度框架實現(xiàn)。
下面將通過一個示例詳細介紹如何使用 Spring Batch 搭建批處理應(yīng)用。這個示例比較簡單,對系統(tǒng)中所有用戶發(fā)送一封繳費提醒通知。此處,我們簡單地將繳費提醒輸出到控制臺。當(dāng)然,隨著介紹的深入,我將逐漸豐富該功能,使其最終完整展示 Spring Batch 的各種特性。
環(huán)境搭建
首先,從 Spring 官方網(wǎng)站下載 Spring Batch 發(fā)布包(見?參考資源)。本文基于 Spring Batch 2.1.6(當(dāng)前最新版本為 2.1.8)以及 Spring 2.5.6 版本構(gòu)建。我們可以看到 Spring Batch 共包含 spring-batch-core 和 spring-batch-infrastructure 兩個包。spring-batch-core 主要包含批處理領(lǐng)域相關(guān)類,而 spring-batch-infrastructure 提供了一個基礎(chǔ)訪問處理框架。
接下來,讓我們新建一個 Eclipse 工程,并將 Spring Batch 以及 Spring 相關(guān)包添加到依賴環(huán)境,如 圖 1 所示
圖 1. 依賴環(huán)境
環(huán)境搭建完成后,讓我們看一下如何一步步構(gòu)建一個批處理應(yīng)用。
構(gòu)建應(yīng)用
如“引言”中所述 Spring Batch 按照關(guān)注點的不同,將整個批處理過程分為三部分:讀、處理、寫,從而將批處理應(yīng)用進行合理解耦。同時,Spring Batch 還針對讀、寫操作提供了多種實現(xiàn),如消息、文件、數(shù)據(jù)庫。對于數(shù)據(jù)庫,還提供了 Hibernate、iBatis、JPA 等常見 ORM 框架的讀、寫接口支持。
對象定義
首先我們需要編寫用戶以及消息類,比較簡單,如清單 1 和 清單 2 所示:
清單 1. User 類
| 1 2 3 4 5 6 7 8 9 10 | package org.springframework.batch.sample; public class User { ????private String name; ????private Integer age; ????public String getName() {return name;} ????public void setName(String name) {this.name = name;} ????public Integer getAge() {return age;} ????public void setAge(Integer age) {this.age = age;} } |
清單 2. Message 類
| 1 2 3 4 5 6 7 | package org.springframework.batch.sample; public class Message { ????private String content; ????public String getContent() {return content;} ????public void setContent(String content) {this.content = content;} } |
讀寫及處理接口
首先,所有 Spring Batch 的讀操作均需要實現(xiàn) ItemReader 接口,而且 Spring Batch 為我們提供了多種默認(rèn)實現(xiàn),尤其是基于 ORM 框架的讀接口,同時支持基于游標(biāo)和分頁兩類操作。因此,多數(shù)情況下我們并不需要手動編寫 ItemReader 類,而是直接使用相應(yīng)實現(xiàn)類即可。
在該示例中,我們使用 org.springframework.batch.item.file.FlatFileItemReader 類從文件中進行信息讀入,用戶信息格式定義如 清單 3 所示。
清單 3. 用戶信息
| 1 2 3 4 5 6 7 8 9 10 | User1,20 User2,21 User3,22 User4,23 User5,24 User6,25 User7,26 User8,27 User9,28 User10,29 |
該類封裝了文件讀操作,僅僅需要我們手動設(shè)置 LineMapper 與訪問文件路徑即可。Spring Batch 通過 LineMapper 可以將文件中的一行映射為一個對象。我們不難發(fā)現(xiàn),Spring Batch 將文件操作封裝為類似 Spring JDBC 風(fēng)格的接口,這也與 Spring 一貫倡導(dǎo)的接口統(tǒng)一是一致的。此處我們使用 org.springframework.batch.item.file.mapping.DefaultLineMapper 進行行映射。讀操作的配置信息如 清單 4 所示:
清單 4. message_job.xml
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | <beans:bean id="messageReader" ??????class="org.springframework.batch.item.file.FlatFileItemReader"> ????<beans:property name="lineMapper" ref="lineMapper"> ????</beans:property> ????<beans:property name="resource" ????value="classpath:/users.txt"></beans:property> </beans:bean> <beans:bean id="lineMapper" ????class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> ????<beans:property name="lineTokenizer"> ????????<beans:bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> ????????</beans:bean> ????</beans:property> ????<beans:property name="fieldSetMapper"> ????????<beans:bean class="org.springframework.batch.sample.UserMapper"> ????????</beans:bean> ????</beans:property> </beans:bean> |
從清單我們可以知道,DefaultLineMapper 需要設(shè)置 lineTokenizer 和 fieldSetMapper 兩個屬性,首先通過 lineTokenizer 完成文件行拆分,并封裝為一個屬性結(jié)果集,因為我們使用“,”分隔用戶屬性,所以需要將 lineTokenizer 設(shè)置為 DelimitedLineTokenizer。最后通過 fieldSetMapper 完成將結(jié)果集封裝為一個 POJO 對象。具體實現(xiàn)如 清單 5 所示:
清單 5. UserMapper 類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | package org.springframework.batch.sample; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.validation.BindException; public class UserMapper implements FieldSetMapper<User> { ????public User mapFieldSet(FieldSet fs) throws BindException { ????????User u = new User(); ????????u.setName(fs.readString(0)); ????????u.setAge(fs.readInt(1)); ????????return u; ????} } |
該接口的實現(xiàn)方式與 Spring JDBC 的 RowMapper 極其相似。
接下來,再讓我們看一下如何實現(xiàn)寫操作。Spring Batch 所有寫操作均需要實現(xiàn) ItemWriter 接口。該接口只有一個方法 void write(List<? extends T> items),參數(shù)是輸出結(jié)果的列表。之所以如此定義,是為了便于我們進行批量操作,以提高性能。每次傳入的列表由事務(wù)提交粒度確定,也就是說 Spring Batch 每次將提交的結(jié)果集傳入寫操作接口。因為我們要做的僅僅是將繳費通知輸出到控制臺,所以,寫操作實現(xiàn)如 清單 6 所示:
清單 6. MessagesItemWriter 類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | package org.springframework.batch.sample; import java.util.List; import org.springframework.batch.item.ItemWriter; public class MessagesItemWriter implements ItemWriter<Message>{ ????public void write(List<? extends Message> messages) throws Exception { ????????System.out.println("write results"); ????????for (Message m : messages) { ????????????System.out.println(m.getContent()); ????????} ????} } |
同 ItemReader 一樣,Spring Batch 也為我們提供了多樣的寫操作支持,具體可閱讀 Spring Batch 參考手冊,此處不再贅述。
最后,再看一下如何實現(xiàn)業(yè)務(wù)處理。Spring Batch 提供了 ItemProcessor 接口用于完成相應(yīng)業(yè)務(wù)處理。在本示例中,即為根據(jù)用戶信息生成一條繳費通知信息,如 清單 7 所示:
清單 7. MessagesItemProcessor 類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | package org.springframework.batch.sample; import org.springframework.batch.item.ItemProcessor; public class MessagesItemProcessor implements ItemProcessor<User, Message> { ????public Message process(User user) throws Exception { ????????Message m = new Message(); ????????m.setContent("Hello " + user.getName() ????????????????+ ",please pay promptly at the end of this month."); ????????return m; ????} } |
任務(wù)定義
通過上面一節(jié),我們已經(jīng)完成了批處理任務(wù)的讀數(shù)據(jù)、處理過程、寫數(shù)據(jù)三個過程。那么,我們?nèi)绾螌⑦@三部分結(jié)合在一起完成批處理任務(wù)呢?
Spring Batch 將批處理任務(wù)稱為一個 Job,同時,Job 下分為多個 Step。Step 是一個獨立的、順序的處理步驟,包含該步驟批處理中需要的所有信息。多個批處理 Step 按照一定的流程組成一個 Job。通過這樣的設(shè)計方式,我們可以靈活配置 Job 的處理過程。
接下來,讓我們看一下如何配置繳費通知的 Job,如 清單 8 所示:
清單 8. message_job.xml
| 1 2 3 4 5 6 7 8 9 10 | <job id="messageJob"> ??<step id="messageStep"> ????<tasklet> ???????<chunk reader="messageReader" processor="messageProcessor" ???????????writer="messageWriter" commit-interval="5" ???????????chunk-completion-policy=""> ????????</chunk> ????</tasklet> ???</step> </job> |
如上,我們定義了一個名為“messageJob”的 Job,該 Job 僅包含一個 Step。在配置 Step 的過程中,我們不僅要指定讀數(shù)據(jù)、處理、寫數(shù)據(jù)相關(guān)的 bean,還要指定 commit-interval 和 chunk-completion-policy 屬性。前者指定了該 Step 中事務(wù)提交的粒度,取值為 5 即表明每當(dāng)處理完畢讀入的 5 條數(shù)據(jù)時,提交一次事務(wù)。后者指定了 Step 的完成策略,即當(dāng)什么情況發(fā)生時表明該 Step 已經(jīng)完成,可以轉(zhuǎn)入后續(xù)處理。由于沒有明確指定相應(yīng)的類,Spring Batch 使用默認(rèn)策略,即當(dāng)讀入數(shù)據(jù)為空時認(rèn)為 Step 結(jié)束。
最后,我們還需要配置一個 JobRepository 并為其指定一個事務(wù)管理器,該類用于對 Job 進行管理,如 清單 9 所示:
清單 9. message_job.xml
| 1 2 3 4 5 6 7 | <beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> ????????<beans:property name="transactionManager" ref="transactionManager" /> </beans:bean> <beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/> |
因為我們整個示例不需要數(shù)據(jù)庫操作,所以選擇了使用 MapJobRepositoryFactoryBean 和 ResourcelessTransactionManager。
所有配置完成以后,進入最后一步——任務(wù)執(zhí)行。
任務(wù)執(zhí)行
那么如何運行一個 Job 呢? Spring Batch 提供了 JobLauncher 接口用于運行 Job,并提供了一個默認(rèn)實現(xiàn) SimpleJobLauncher。先讓我們看一下具體執(zhí)行代碼,如 清單 10 所示:
清單 10. Main 類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | public class Main { ????public static void main(String[] args) { ????????ClassPathXmlApplicationContext c = ?????????????????new ClassPathXmlApplicationContext("message_job.xml"); ????????SimpleJobLauncher launcher = new SimpleJobLauncher(); ????????launcher.setJobRepository((JobRepository) c.getBean("jobRepository")); ????????launcher.setTaskExecutor(new SimpleAsyncTaskExecutor()); ????????try { ?????????????launcher.run((Job) c.getBean("messageJob"), new JobParameters()); ????????} catch (Exception e) { ????????e.printStackTrace(); ????????} ????} } |
首先,我們需要為 JobLauncher 指定一個 JobRepository,該類負責(zé)創(chuàng)建一個 JobExecution 對象來執(zhí)行 Job,此處直接從上下文獲取即可。其次,需要指定一個任務(wù)執(zhí)行器,我們使用 Spring Batch 提供的 SimpleAsyncTaskExecutor。最后,通過 run 方法來執(zhí)行指定的 Job,該方法包含兩個參數(shù),需要執(zhí)行的 Job 以及執(zhí)行參數(shù)。您可以通過運行示例工程查看運行結(jié)果。由于 MessageItemWriter 在每次輸出結(jié)果前,先打印了一行提示,因此您可以明顯看出輸出分 2 組進行打印,即事務(wù)被提交了 2 次(因為我們設(shè)置的事務(wù)粒度為 5。)。
從業(yè)務(wù)功能上考慮,同一任務(wù)應(yīng)該盡量避免重復(fù)執(zhí)行(即相同條件下的任務(wù)只能成功運行一次),試想如果本示例中發(fā)送繳費通知過多只能導(dǎo)致用戶不滿,那么電信計費批處理任務(wù)重復(fù)執(zhí)行則將導(dǎo)致重復(fù)計費,從而使用戶遭受損失。幸運的是,Spring Batch 已經(jīng)為我們考慮好了這些。
對于 Spring Batch 來說,JobParameters 相同的任務(wù)只能成功運行一次。您如果在示例 Main 類中連續(xù)運行同一 Job,將會得到如下異常(見 清單 11 ):
清單 11. 異常信息
| 1 2 3 | org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={}.? If you want to run this job again, change the parameters. |
因此,如果我們希望該任務(wù)是周期執(zhí)行的(如每月執(zhí)行一次),那么必須保證周期內(nèi)參數(shù)是唯一。假如該客戶要求我們每月為用戶發(fā)送一次繳費通知。我們的任務(wù)執(zhí)行可以如 清單 12 所示:
清單 12. Main 類
| 1 2 3 4 5 | Map<String,JobParameter> parameters = new HashMap<String,JobParameter>(); parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10")); launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); parameters.put(RUN_MONTH_KEY,new JobParameter("2011-11")); launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); |
在示例中,我將執(zhí)行月份作為 Job 的參數(shù)傳入,分別執(zhí)行了 10、11 月兩個月的任務(wù)。
任務(wù)重試
既然相同參數(shù)的任務(wù)只能成功執(zhí)行一次,那么,如果任務(wù)失敗該如何處理?此時,需要考慮的是,既然任務(wù)步驟有事務(wù)提交粒度,那么可能任務(wù)已經(jīng)提交了部分處理結(jié)果,這部分不應(yīng)該被重復(fù)處理。也就是說,此時應(yīng)該有重試操作。
在 Spring Batch 中,通過配置可以實現(xiàn)步驟 Step 的重試,如 清單 13 所示:
清單 13. message_job.xml
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | <job id="messageJob" restartable="true"> ????<step id="messageStep"> ????????<tasklet> ????????????<chunk reader="messageReader" processor="messageProcessor" ???????????????????????????????????????????????????writer="messageWriter" ????????????????commit-interval="5" chunk-completion-policy="" retry-limit="2"> ????????????????<retryable-exception-classes> ????????????????????<include class="java.lang.RuntimeException" /> ????????????????</retryable-exception-classes> ????????????</chunk> ????????</tasklet> ????</step> </job> |
我們可以看到,主要分兩部分:首先,需要設(shè)置重試次數(shù),其次是當(dāng)執(zhí)行過程中捕獲到哪些異常時需要重試。如果在執(zhí)行過程中捕獲到重試異常列表中的異常信息,則進行重試操作。如果重試操作達到最大次數(shù)仍提示異常,則認(rèn)為任務(wù)執(zhí)行失敗。對于異常信息的配置,除了通過 include 配置包含列表外,也可以通過 exclude 配置排除列表。
由于通過配置進行的 Step 重試是自動的,因此較難控制(多用于網(wǎng)絡(luò)訪問異常等不需要人工干預(yù)的情況)。可以考慮一下本示例,如果有一個用戶的信息有問題,名字為空,不能發(fā)送繳費通知,步驟重試便不合適了,此時我們可以對 Job 進行重試操作。
Spring Batch 允許重復(fù)執(zhí)行未成功的 Job,而每次執(zhí)行即為一次重試操作。示例代碼如 清單 14 所示:
清單 14. Main 類
| 1 2 3 4 5 | Map<String,JobParameter> parameters = new HashMap<String,JobParameter>(); parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10")); launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); Thread.sleep(10000); launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); |
您可以通過如下步驟查看運行結(jié)果:首先,將 users.txt 文件中的第 7 行(之所以指定該行,便于驗證事務(wù)提交以及重復(fù)執(zhí)行的起始位置)的用戶名修改為空。其次,運行示例。最后,在程序出現(xiàn)異常提示時,更新第 7 行的用戶名(為了便于演示,程序在兩次任務(wù)執(zhí)行過程中等待 10 秒鐘)。
您可以在控制臺中很明顯的看到,任務(wù)先打印了 5 條記錄(第一次事務(wù)提交),然后出現(xiàn)異常信息,待我們將錯誤更正后,又打印了 5 條記錄,任務(wù)最終成功完成。
從輸出結(jié)果,我們可以知道 Spring Batch 是從出錯的事務(wù)邊界內(nèi)第一條記錄重復(fù)執(zhí)行的,這樣便確保了數(shù)據(jù)完整性,而且所有這一切對于用戶均是透明的。
那么 Spring Batch 是如何做到這一步的呢?這與 Spring Batch 的運行時管理是分不開的。
運行時管理
Spring Batch 提供了如 表 1 所示的類用于記錄每個 Job 的運行信息:
表 1. 運行時類信息
| JobInstance | 該類記錄了 Job 的運行實例。舉例:10 月和 11 月分別執(zhí)行同一 Job,將生成兩個 JobInstance。主要信息有:標(biāo)識、版本、Job 名稱、Job 參數(shù) |
| JobExecution | 該類記錄了 Job 的運行記錄。如上面的示例,Job 第一次運行失敗,第二次運行成功,那么將形成兩條運行記錄,但是對應(yīng)的是同一個運行實例。主要信息有:Job 的運行時間、運行狀態(tài)等。 |
| JobParameters | 該類記錄了 Job 的運行參數(shù) |
| ExecutionContext | 該類主要用于開發(fā)人員存儲任務(wù)運行過程中的相關(guān)信息(以鍵值對形式),主要分為 Job 和 Step 兩個范圍 |
| StepExecution | 該類與 JobExecution 類似,主要記錄了 Step 的運行記錄。包括此次運行讀取記錄條數(shù)、輸出記錄條數(shù)、提交次數(shù)、回滾次數(shù)、讀跳過條數(shù)、處理跳過條數(shù)、寫跳過條數(shù)等信息 |
Spring Batch 通過 JobRepository 接口維護所有 Job 的運行信息,此外 JobLauncher 的 run 方法也返回一個 JobExecution 對象,通過該對象可以方便的獲得 Job 其他的運行信息,代碼如 清單 15 所示:
清單 15. Main 類
| 1 2 3 4 5 6 7 | Map<String,JobParameter> parameters = new HashMap<String,JobParameter>(); parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10")); JobExecution je = ???????launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); System.out.println(je); System.out.println(je.getJobInstance()); System.out.println(je.getStepExecutions()); |
輸出信息如 清單 16 所示:
清單 16. 輸出結(jié)果
| 1 2 3 4 5 6 7 8 9 10 11 | JobExecution: id=0, version=2, startTime=Tue Nov 15 21:00:09 CST 2011, endTime=Tue Nov 15 21:00:09 CST 2011, lastUpdated=Tue Nov 15 21:00:09 CST 2011, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]] ?JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob] ?[StepExecution: id=1, version=5, name=messageStep, status=COMPLETED, ?exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, ??writeSkipCount=0, processSkipCount=0, commitCount=3 , rollbackCount=0, ??exitDescription=] |
從日志您可以發(fā)現(xiàn)事務(wù)一共提交了 3 次,這與前面的說明是不一致的。之所以會如此是因為當(dāng)事務(wù)提交粒度恰好可以被記錄數(shù)整除時,事務(wù)會有一次空提交。
關(guān)于 Spring Batch 運行時信息管理,將在講解 Job 監(jiān)控時再詳細介紹,此處不再贅述,你也可以查看 Spring Batch 參考資料了解相關(guān)信息。
總結(jié)
本文通過一個簡單示例演示了如何構(gòu)建 Spring Batch 應(yīng)用,同時介紹了 Spring Batch 的相關(guān)核心概念。希望您通過本文可以掌握 Spring Batch 的基本功能。在接下來的文章中,我將繼續(xù)介紹 Spring Batch 的兩個重要特性:Job 流和并發(fā)。
下載資源
- 示例代碼?(batch_sample.zip | 34KB)
相關(guān)主題
- 本系列?第 2 部分:主要介紹了 Spring Batch 的 Step Flow 以及并發(fā)處理兩項重要特性。
- Spring Batch 主頁,可以初步了解 Spring Batch 的基本架構(gòu)。
- Spring Batch 發(fā)布包,您可以在這里找到各個版本的 Spring Batch 發(fā)布包。
- Spring Batch 入門,教你如何入門。
- Spring Batch 參考手冊,詳細了解 Spring Batch 框架。
- Spring 參考手冊,Spring Framework 知識學(xué)習(xí)。
- “Spring Richclient 中的安全認(rèn)證管理”(developerWorks,2011 年 7 月):作為企業(yè)級開發(fā)框架,Spring Richclient 為我們提供了完善的安全認(rèn)證管理功能,使我們能夠方便構(gòu)建安全的企業(yè)級應(yīng)用。本文將詳細介紹 Spring Richclient 中安全認(rèn)證管理的實現(xiàn)方式以及使用方法。
- “Struts2、Spring、Hibernate 高效開發(fā)的最佳實踐”(developerWorks,2011 年 8 月):Struts2、Spring、Hibernate(SSH)是最常用的 Java EE Web 組件層的開發(fā)技術(shù)搭配,網(wǎng)絡(luò)中和許多 IT 技術(shù)書籍中都有它們的開發(fā)教程,但是通常的教程都會讓很多程序員陷入痛苦的配置與修改配置的過程。本文利用 SSH 中的技術(shù)特性,利用 Java 反射技術(shù),按照規(guī)約優(yōu)于配置的原理,基于 SSH 設(shè)定編寫了一個通用開發(fā)框架,這使得開發(fā)者可以專注于業(yè)務(wù)邏輯的開發(fā)。
- “如何將基于 Struts、Spring 和 Hibernate 的應(yīng)用從 Tomcat 遷移到 WebSphere Application Server”(developerWorks,2011 年 11 月):本文向讀者介紹基于 Eclipse 開發(fā)的 Struts、Spring 和 Hibernate 開源應(yīng)用和開發(fā)環(huán)境的特點,并通過實例介紹從 Tomcat 遷移到 WebSphere 所遇到的問題及其解決方案。
- “基于 Spring 和 iBATIS 的動態(tài)可更新多數(shù)據(jù)源持久層”(developerWorks,2012 年 2 月):開發(fā)擁有多重數(shù)據(jù)源的項目時,經(jīng)常希望能夠通過用戶界面來動態(tài)配置數(shù)據(jù)源。本文針對這一問題提出了創(chuàng)新的解決方案,通過使用 Spring+iBATIS 的組合,來實現(xiàn)可動態(tài)更新的多重數(shù)據(jù)源的持久層,從而可以通過用戶界面自主地管理所需的數(shù)據(jù)源。
- developerWorks Java 技術(shù)專區(qū):這里有數(shù)百篇關(guān)于 Java 編程各個方面的文章。
前言
在本系列文章的第 1 部分,我們搭建了一個用戶繳費通知的批處理任務(wù)。盡管這個簡單的應(yīng)用展現(xiàn)了 Spring Batch 的基本功能,但是它與真實的應(yīng)用相去甚遠。在實際應(yīng)用中,我們的 Job 可能必須要包含多個 Step,為了提高性能,我們可能需要考慮 Job 的并發(fā)問題。Spring Batch 在這些方面又提供了哪些好的特性呢?讓我們繼續(xù)。
Step Flow
通過前文我們已經(jīng)知道,Step 是一個獨立的、順序的處理步驟,包含完整的輸入、處理以及輸出。但是在企業(yè)應(yīng)用中,我們面對的更多情況是多個步驟按照一定的順序進行處理。因此如何維護步驟之間的執(zhí)行順序是我們需要考慮的。Spring Batch 提供了 Step Flow 來解決這個問題。
示例改進
讓我們回到用戶繳費通知的 Job??蛻籼岢隽诉M一步的需求:計費、扣費、繳費通知要確保順序執(zhí)行。首先,為每個用戶生成賬單,然后從用戶余額上進行扣除,對于余額不足的用戶,發(fā)送繳費通知。下面看一下如何使用 Step Flow 實現(xiàn)該需求。
在講解 Step Flow 之前,我們先對第 1 部分的示例進行改進,將其由文件操作遷移到數(shù)據(jù)庫上,這樣便于我們后續(xù)的講解。數(shù)據(jù)庫初始化腳本見 init_db_mysql.sql(位于示例代碼包 batch_sample 根目錄下),具體配置如清單 1 所示:
清單 1. billing_job.xml
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | <beans:bean id="jobRepository" ???class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> ???<beans:property name="dataSource" ref="dataSource" /> ???<beans:property name="transactionManager" ref="transactionManager" /> </beans:bean> <beans:bean id="userDbReader" ????class="org.springframework.batch.item.database.JdbcPagingItemReader"> ????<beans:property name="dataSource" ref="dataSource" /> ????<beans:property name="rowMapper" ref="userDbMapper" /> ????<beans:property name="queryProvider" ref="userQueryProvider" /> </beans:bean> <beans:bean id="userDbMapper" ????class="org.springframework.batch.sample.UserDbMapper" /> <beans:bean id="userQueryProvider" ????class="org.springframework.batch.item.database.support.MySqlPagingQueryProvider"> ????<beans:property name="selectClause" value="u.id,u.name,u.age,u.balance" /> ????<beans:property name="fromClause" value="users u" /> ????<beans:property name="sortKey" value="u.id" /> </beans:bean> <beans:bean id="messageDbWriter" ????class="org.springframework.batch.item.database.JdbcBatchItemWriter"> ????<beans:property name="dataSource" ref="dataSource" /> ????<beans:property name="sql" ????value="insert into messages(id,user_id,content) values(:id,:user.id,:content)" /> ????<beans:property name="itemSqlParameterSourceProvider" ????????ref="itemSqlParameterSourceProvider" /> </beans:bean> <beans:bean id="itemSqlParameterSourceProvider" class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" ?/> |
我們分別使用 Spring Batch 提供的 JdbcPagingItemReader 和 JdbcBatchItemWriter 進行讀寫。同時,我將 jobRepository 修改為 JobRepositoryFactoryBean,因此,運行示例前,您還需要執(zhí)行 Spring Batch 提供的 schema-mysql.sql(位于 core 包 org\springframework\batch\core 目錄下)。相關(guān)內(nèi)容將在第 3 部分詳細講解,此處不再贅述。
第一個流程
在配置 Step 時,我們可以指定其 next 屬性,該屬性指向另一個 Step。通過配置 Step 的 next 屬性,我們便可以輕易實現(xiàn)上述流程。具體如清單 2 所示
清單 2. billing_job.xml
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | <job id="billingJob" restartable="true"> ????<step id="billingStep" next="payStep"> ????????<tasklet> ????????????<chunk reader="userDbReader" processor="billingProcessor" ?????????????writer="billDbWriter" commit-interval="5" chunk-completion-policy=""> ????????????</chunk> ????????</tasklet> ????</step> ????<step id="payStep" next="messageStep"> ????????<tasklet> ??????????<chunk reader="billDbReader" processor="payProcessor" writer="payDbWriter" ????????????commit-interval="5" chunk-completion-policy=""? skip-limit="100" > <skippable-exception-classes> ??????<include class="org.springframework.batch.sample.MoneyNotEnoughException" /> ????????????????</skippable-exception-classes> ????????????</chunk> ????????</tasklet> ????</step> ????<step id="messageStep"> ????????<tasklet> ????????????<chunk reader="billArrearsDbReader" processor="messageProcessor" ????????????????????writer="messageDbWriter" commit-interval="5" ????????????????????chunk-completion-policy=""> ????????????</chunk> ????????</tasklet> ????</step> </job> |
我們將 billStep 的 next 設(shè)置為 payStep,將 payStep 的 next 設(shè)置為 messageStep,同時分別指定了讀、處理、寫接口。Spring Batch 在運行 billingJob 時,首先執(zhí)行 billingStep,查找用戶信息生成賬單費用,然后執(zhí)行 payStep,查找賬單信息生成扣費記錄,如果用戶余額不足則跳過。最后,查找欠費賬單,生成繳費通知。只有當(dāng)上一步執(zhí)行成功后,才會執(zhí)行下一步。
billStep 和 payStep 的 ItemProcessor 實現(xiàn)分別如清單 3 和清單 4 所示:
清單 3.BillingItemProcessor 類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | public class BillingItemProcessor implements ItemProcessor#<User, Bill> { ????public Bill process(User item) throws Exception { ????????Bill b = new Bill(); ????????b.setUser(item); ????????b.setFees(70.00); ????????b.setPaidFees(0.0); ????????b.setUnpaidFees(70.00); ????????b.setPayStatus(0);/*unpaid*/ ????????return b; ????} } |
清單 4.PaymentItemProcessor 類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public class PaymentItemProcessor implements ItemProcessor<Bill, PayRecord> { ?public PayRecord process(Bill item) throws Exception { ????????if (item.getUser().getBalance() <= 0) { ????????????return null; ????????} ????????if (item.getUser().getBalance() >= item.getUnpaidFees()) { ????????????// create payrecord ????????????PayRecord pr = new PayRecord(); ????????????pr.setBill(item); ????????????pr.setPaidFees(item.getUnpaidFees()); ????????????// update balance ????????????item.getUser().setBalance(item.getUser().getBalance() - ?????????????????item.getUnpaidFees()); ????????????// update bill ????????????item.setPaidFees(item.getUnpaidFees()); ????????????item.setUnpaidFees(0.0); ????????????item.setPayStatus(1);/* paid */ ????????????return pr; ????????} else { ????????????throw new MoneyNotEnoughException(); ????????} ????} } |
在清單 3 中,我們?yōu)槊總€用戶生成一條 70 元的賬單,已繳費用為 0,未繳費用為 70。在清單 4 中,將賬單金額從用戶余額中扣除,并更新賬單已繳和未繳費用,如果余額不足,提示異常(通過清單 2 可知,我們對于此類異常進行了跳過處理)。
此外,我們現(xiàn)在的繳費通知需要基于欠費賬單生成,因此,我們需要新提供一個繳費通知的 ItemProcessor,具體如清單 5 所示:
清單 5.ArrearsMessagesItemProcessor 類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public class ArrearsMessagesItemProcessor implements ????????ItemProcessor<Bill, Message> { ????public Message process(Bill item) throws Exception { ????????if (item.getPayStatus() == 0) {/*unpaid*/ ????????????Message m = new Message(); ????????????m.setUser(item.getUser()); ????????????m.setContent("Hello " + item.getUser().getName() ????????????????????+ ",please pay promptly at end of this month."); ????????????return m; ????????} ????????return null; ????} } |
每個 Step 的讀寫接口可參照 billing_job.xml,均使用 Spring Batch 提供的實現(xiàn)類,此處不再贅述(此處需要特別注意 payDbWriter,由于扣費時,我們需要同時生成扣費記錄,并更新用戶和賬單,因此我們使用了 CompositeItemWriter)。至此,我們已經(jīng)完成了第一步,實現(xiàn)了基本的多步驟順序處理,您可以運行 Main2,并通過數(shù)據(jù)庫查看運行結(jié)果(bills、payrecords、messages)。
條件流程和流程決策
通過上面的 Step Flow,我們已經(jīng)滿足了客戶的初步需求,但是客戶又提出進一步要求:能否當(dāng)所有用戶費用均足夠的情況下,不再執(zhí)行繳費通知處理。因為查詢一遍欠費賬單在一定程度上還是降低了處理性能。Spring Batch 提供了條件流程和流程決策來支持類似應(yīng)用場景。
首先,讓我們看一下如何使用條件流程來實現(xiàn)該需求。Step 通過在 next 元素上設(shè)置 on 屬性來支持條件流程,on 屬性取值為 Step 的結(jié)束狀態(tài),如 COMPLETED、FAILED 等,同時還支持 * 以及 ? 通配符,具體可閱讀?參考手冊。
由于我們希望當(dāng)存在余額不足的情況時,也就是 payStep 的跳過條數(shù)大于 0 時,再執(zhí)行繳費通知 Step,因此,我們需要特殊指定一種結(jié)束狀態(tài)。此處,我們可以為 Step 添加一個監(jiān)聽器,以返回指定的結(jié)束狀態(tài)。
修改后的 payStep 如清單 6 所示,監(jiān)聽器實現(xiàn)如清單 7 所示:
清單 6. billing_job.xml
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | <step id="payStep"> ????<tasklet> ????????<chunk reader="billDbReader" processor="payProcessor" writer="payDbWriter" ????????????commit-interval="5" chunk-completion-policy="" skip-limit="100"> ????????????<skippable-exception-classes> ????????????????<include ????????class="org.springframework.batch.sample.MoneyNotEnoughException" /> ????????????</skippable-exception-classes> ????????</chunk> ????</tasklet> <next on="COMPLETED WITH SKIPS" to="messageStep"/> ????<listeners> ????????<listener ref="payStepCheckingListener"></listener> ????</listeners> </step> |
清單 7. PayStepCheckingListener 類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | public class PayStepCheckingListener extends StepExecutionListenerSupport { ????@Override ????public ExitStatus afterStep(StepExecution stepExecution) { ????????String exitCode = stepExecution.getExitStatus().getExitCode(); ????????if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) ????????????????&& stepExecution.getSkipCount() > 0) { ????????????return new ExitStatus("COMPLETED WITH SKIPS"); ????????} else { ????????????return null; ????????} ????} } |
接下來,再讓我們看一下如何使用流程決策來實現(xiàn)該功能。多數(shù)情況下,Step 的結(jié)束狀態(tài)并不能夠滿足較為復(fù)雜的條件流程,此時便用到了流程決策器。通過它,我們可以根據(jù) Job 和 Step 的各種執(zhí)行情況返回相應(yīng)的執(zhí)行狀態(tài)來控制流程。
首先,我們需要定義一個流程決策器,代碼如清單 8 所示:
清單 8. MessagesDecider 類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | public class MessagesDecider implements JobExecutionDecider { ????public FlowExecutionStatus decide(JobExecution jobExecution, ????????????StepExecution stepExecution) { ????????String exitCode = stepExecution.getExitStatus().getExitCode(); ????????if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) ????????????????&& stepExecution.getSkipCount() > 0) { ????????????return new FlowExecutionStatus("COMPLETED WITH SKIPS"); ????????} else { ????????????return FlowExecutionStatus.COMPLETED; ????????} ????} } |
與 StepExecutionListener 不同,該類的 decide 方法返回一個 FlowExecutionStatus 對象。與之對應(yīng),Job 配置修改為如清單 9 所示:
清單 9. billing_job2.xml
| 1 2 3 4 5 6 7 8 9 10 11 12 | <job id="billingJob" restartable="true"> ????<step id="billingStep" next="payStep"> ????</step> ????<step id="payStep" next="decider"> ????</step> ????<decision id="decider" decider="messagesDecider"> ????????<next on="COMPLETED WITH SKIPS" to="messageStep" /> ????????<end on="COMPLETED" /> ????</decision> ????<step id="messageStep"> ????</step> </job> |
可以看到 payStep 的 next 變成了 decider,在 decider 中根據(jù)返回結(jié)果確定執(zhí)行路徑:如果存在跳過的情況,執(zhí)行 messageStep,否則直接結(jié)束 Job(注意:此處我們用到了 end 元素)。
通過上面的講述,我們大體了解了 Spring Batch 對于條件流程的支持(此外,我們可以通過設(shè)置 Step 的 next 屬性為先前執(zhí)行的 Step,從而實現(xiàn)支持循環(huán)的 Job,但是筆者并不認(rèn)為這是實現(xiàn)循環(huán)任務(wù)的一個好方案,故在此處不做詳細講解),接下來再讓我們看一下批處理中另一項重要特征——并發(fā)。
并發(fā)處理
如果我們的批處理任務(wù)足夠簡單,硬件配置及網(wǎng)絡(luò)環(huán)境也足夠好,那么我們完全可以將批處理任務(wù)設(shè)計為單線程,但現(xiàn)實是企業(yè)應(yīng)用對于硬件的要求要比硬件自身發(fā)展快的多,更何況還有那么多的企業(yè)要在較差的硬件環(huán)境中運行自己的企業(yè)應(yīng)用并希望擁有一個可以接受的性能。因此在企業(yè)應(yīng)用中,尤其是涉及到大批量數(shù)據(jù)處理,并發(fā)是不可避免的。那么,Spring Batch 在并發(fā)方面又提供了哪些功能支持呢?
首先,Spring Batch 提供了 Step 內(nèi)的并發(fā),這也是最簡單的一種并發(fā)處理支持。通過為 Step 設(shè)置 task-executor 屬性,我們便可以使當(dāng)前 Step 以并發(fā)方式執(zhí)行。同時,還可以通過 throttle-limit 設(shè)置并發(fā)線程數(shù)(默認(rèn)為 4)。也就是說您不必修改任何業(yè)務(wù)處理邏輯,僅僅通過修改配置即可以實現(xiàn)同步到異步的切換。
如果我們希望示例中的 billingStep 以并發(fā)方式執(zhí)行,且并發(fā)任務(wù)數(shù)為 5,那么只需要做如下配置即可,見清單 10:
清單 10. billing_job3.xml
| 1 2 3 4 5 6 7 8 9 10 | <step id="billingStep" next="payStep"> ????<tasklet task-executor="taskExecutor" throttle-limit="5"> ????????<chunk reader="userDbReader" processor="billingProcessor" ????????writer="billDbWriter" commit-interval="5" chunk-completion-policy=""> ????????</chunk> ????</tasklet> </step> <beans:bean id="taskExecutor" ????class="org.springframework.core.task.SimpleAsyncTaskExecutor"> </beans:bean> |
從清單可以看出,我們?yōu)?billingStep 指定了一個異步任務(wù)執(zhí)行器 SimpleAsyncTaskExecutor,該執(zhí)行器將會按照配置創(chuàng)建指定數(shù)目的線程來進行數(shù)據(jù)處理。通過這種方式,避免了我們手動創(chuàng)建并管理線程的工作,使我們只需要關(guān)注業(yè)務(wù)處理本身。
需要補充說明的是,Spring Core 為我們提供了多種執(zhí)行器實現(xiàn)(包括多種異步執(zhí)行器),我們可以根據(jù)實際情況靈活選擇使用。當(dāng)然,像我們此處需要并發(fā)處理時,必須使用異步執(zhí)行器。幾種主要實現(xiàn)如表 1 所示:
表 1. 任務(wù)執(zhí)行器列表
| SyncTaskExecutor | 簡單同步執(zhí)行器 | 否 |
| ThrottledTaskExecutor | 該執(zhí)行器為其他任意執(zhí)行器的裝飾類,并完成提供執(zhí)行次數(shù)限制的功能 | 視被裝飾的執(zhí)行器而定 |
| SimpleAsyncTaskExecutor | 簡單異步執(zhí)行器,提供了一種最基本的異步執(zhí)行實現(xiàn) | 是 |
| WorkManagerTaskExecutor | 該類作為通過 JCA 規(guī)范進行任務(wù)執(zhí)行的實現(xiàn),其包含 JBossWorkManagerTaskExecutor 和 GlassFishWorkManagerTaskExecutor 兩個子類 | 是 |
| ThreadPoolTaskExecutor | 線程池任務(wù)執(zhí)行器 | 是 |
其次,Spring Batch 還支持 Step 間的并發(fā),這是通過 Split Flow 實現(xiàn)的。讓我們看看 Split Flow 是如何使用的。在此之前,讓我們設(shè)想一下,假如客戶基于上面的示例提出進一步需求:每月為用戶生成扣費通知,并生成賬單、扣費、繳費通知(對于費用不足的情況)。
當(dāng)然,要實現(xiàn)上述需求有很多種方式,比如,按照生成賬單、扣費通知、扣費、繳費通知的順序串行執(zhí)行,然而,此種處理方式勢必會降低性能,即使我們可以使用 Step 多線程處理來提高性能,可仍不是最優(yōu)方式。那么我們該如何改進呢?顯然,我們可以將生成扣費通知和扣費并行執(zhí)行,因為這兩步是完全獨立的。修改后的 billing_job 如清單 11 所示:
清單 11. billing_job3.xml
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | <job id="billingJob" restartable="true"> ????<step id="billingStep" next="splitStep"> ????????<tasklet task-executor="taskExecutor" throttle-limit="5"> ????????????<chunk reader="userDbReader" processor="billingProcessor" ????????writer="billDbWriter" commit-interval="5" chunk-completion-policy=""> ????????????</chunk> ????????</tasklet> ????</step> ????<split id="splitStep" task-executor="taskExecutor"? next="decider"> ????????<flow> ????????????<step id="billingMessageStep"> ????????????????<tasklet> ??????????????????<chunk reader="billDbReader" processor="billMessageItemProcessor" ????????????????????writer="messageDbWriter" commit-interval="5" ????????????????????????????chunk-completion-policy=""> ????????????????????</chunk> ????????????????</tasklet> ????????????</step> ????????</flow> ????????<flow> ????????????<step id="payStep"> ????????????????<tasklet> ???????????????????<chunk reader="billDbReader" processor="payProcessor" ???????????????????writer="payDbWriter" commit-interval="5" chunk-completion-policy="" ???????????????????skip-limit="100"> ????????????????????<skippable-exception-classes> ??????????????????????<include ??????????????????class="org.springframework.batch.sample.MoneyNotEnoughException" /> ??????????????????????</skippable-exception-classes> ????????????????????</chunk> ????????????????</tasklet> ????????????</step> ????????</flow> ????</split> ????<decision id="decider" decider="messagesDecider"> ????????<next on="COMPLETED WITH SKIPS" to="paymentMessageStep" /> ????????<end on="COMPLETED" /> ????</decision> ????<step id="paymentMessageStep"> ????????<tasklet> ????????????<chunk reader="billArrearsDbReader" processor="messageProcessor" ????????????????????writer="messageDbWriter" commit-interval="5" ????????????????????chunk-completion-policy=""> ????????????</chunk> ????????</tasklet> ????</step> </job> |
從清單 10 可以看出,billingStep 的下一步變成了一個 split 元素,該元素下包含兩個 flow?!癴low”顧名思義包含一系列可執(zhí)行的 step,示例中兩個 flow 分別包含 billingMessageStep(生成扣費通知)和 payStep 兩個 step。Spring Batch 執(zhí)行 split 時,將會并行執(zhí)行其下所有 flow,而且只有當(dāng)所有 step 均執(zhí)行完畢之后,才會執(zhí)行 split 元素的下一步,當(dāng)然,前提是您為 split 元素指定的"task-executor"為 SimpleAsyncTaskExecutor,該屬性默認(rèn)為 SyncTaskExecutor,即串行執(zhí)行。
通過上述兩種方式,我們可以實現(xiàn) Job 的并發(fā)處理,但顯然該方式有其局限性,即僅限于單機。
讓我們設(shè)想一下如下場景:在上述繳費任務(wù)中,用戶生成賬單非常慢(也許是因為業(yè)務(wù)處理過于復(fù)雜,也許因為生成賬單的過程中同時處理了好多關(guān)聯(lián)信息)。這種場景我們該怎么優(yōu)化呢?顯然,即便我們將該步驟配置為并行,那么它的優(yōu)化空間也是有限的,因為線程并發(fā)到一定數(shù)量之后必定受限于系統(tǒng)硬件配置。這個時候,我們自然會想到修改部署方式,將耗時操作分配到多個機器上并行執(zhí)行。那么基于 Spring Batch 我們該如何實現(xiàn)呢?此處便用到了 PartitionStep。讓我們看一下它是如何執(zhí)行的,其時序圖如圖 1 所示:
圖 1. PartitionStep 序列圖
從圖中我們可以看到,PartitionStep 并不負責(zé)讀、寫數(shù)據(jù),它只是根據(jù)配置的策略(PartitionHandler)將 StepExecution 進行分解,并委派到指定的 Step 上并行執(zhí)行(該 Step 可能是本地,也可能是遠程),執(zhí)行完畢后,將所有執(zhí)行結(jié)果進行合并(由 StepExecutionAggregator 完成)作為自身的執(zhí)行結(jié)果。利用 PartitionStep,在委派 Step 為遠程調(diào)用的情況下,我們可以很容易通過增加從機數(shù)目的方式來提高任務(wù)運行效率,大大提高了系統(tǒng)的可伸縮性。而且此種方式并不會影響 PartitionStep 所在 Job 的執(zhí)行順序,因為 PartitionStep 只有當(dāng)所有委派 Step 完成之后,才會繼續(xù)往下執(zhí)行。
不過使用 PartitionStep 需要注意以下幾點:
- 由于數(shù)據(jù)的讀寫以及處理均在從機上進行,因此需要確保并發(fā)的從機之間不會重復(fù)讀取數(shù)據(jù)(當(dāng)然,這個問題是所有批處理應(yīng)用采用主從和集群架構(gòu)時所必須考慮的問題,而并非只有 Spring Batch 才會有)。
- 確保分解到各個從機上的 StepExecution 是不同的。在 StepExecutionSplitter 的默認(rèn)實現(xiàn) SimpleStepExecutionSplitter 中,首先通過一個 Partitioner 得到分解后的 ExecutionContext,然后針對每個 ExecutionContext,創(chuàng)建 StepExecution(當(dāng)然,如果 Step 為重復(fù)執(zhí)行,那么將會得到上次運行的 ExecutionContext 和 StepExecution,而非重新創(chuàng)建)。
從第二點可以看出,通過在 ExecutionContext 設(shè)置唯一的信息,我們便可以保證每個從機讀取的數(shù)據(jù)是不同的。
主從方式的具體配置如清單 12 所示:
清單 12. partition.xml
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | <beans:bean name="step" ????class="org.springframework.batch.core.partition.support.PartitionStep"> ????<beans:property name="partitionHandler"> ????????<beans:bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler"> ????????????<beans:property name="step" ref="remoteStep" /> ????????????<beans:property name="gridSize" value="10" /> ????????????<beans:property name="taskExecutor" ref="taskExecutor" /> ????????</beans:bean> ????</beans:property> ????<beans:property name="stepExecutionSplitter"> ??????<beans:bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter"> ????????????<beans:constructor-arg ref="jobRepository" /> ????????????<beans:constructor-arg ref="messageStep" /> ????????????<beans:constructor-arg ref="simplePartitioner" /> ????????</beans:bean> ????</beans:property> ????<beans:property name="jobRepository" ref="jobRepository" /> </beans:bean> <step id="messageStep"> ????<tasklet task-executor="taskExecutor"> ????????<chunk reader="messageReader" processor="messageProcessor" ????????writer="messageWriter" commit-interval="5" chunk-completion-policy="" ????????????????retry-limit="2"> ????????????<retryable-exception-classes> ????????????????<include class="java.lang.RuntimeException" /> ????????????</retryable-exception-classes> ????????</chunk> ????</tasklet> </step> <beans:bean id="remoteStep" ????class="org.springframework.remoting.httpinvoker.HttpInvokerProxyFactoryBean"> ????<beans:property name="serviceInterface" ????????value="org.springframework.batch.core.Step" /> ????<beans:property name="serviceUrl" ?????????value="${batch.remote.base.url}/steps/messageStep" /> </beans:bean> |
此處只采用 Spring Batch 的默認(rèn)實現(xiàn),將 Step 發(fā)送到一臺從機上執(zhí)行,當(dāng)然,您完全可以基于 Spring Batch 當(dāng)前接口,輕易擴展出分發(fā)到 N 臺從機上執(zhí)行的實現(xiàn)。
此外,在耗時的 Step 比較獨立的情況下(如發(fā)送扣費通知的 Step,后續(xù) Step 不會依賴扣費通知 Step 的任何輸出結(jié)果),我們還可以采用另一種主從架構(gòu)。在主機上配置一個標(biāo)準(zhǔn)的 Step,其 ItemWriter 負責(zé)將讀取的記錄以 Message 的形式發(fā)送給消息中間件(當(dāng)然,該方案并未充分利用 Spring Batch 的特性,而是由消息中間件完成并發(fā)處理)。
總結(jié)
通過本文的講解,您已經(jīng)基本了解了 Spring Batch 中對流程、條件以及并發(fā)的支持。利用 Spring Batch 提供的這些特性,我們完全可以構(gòu)建出高性能、高可擴展性和可維護性的批處理應(yīng)用。在本系列文章的最后一部分,我將繼續(xù)給您介紹 Spring Batch 關(guān)于批處理監(jiān)控方面的內(nèi)容。
相關(guān)主題
- 本系列?第 1 部分:一步步了解如何開發(fā)基于 Spring Batch 的批處理程序和相關(guān)核心概念。
- Spring Batch 主頁,可以初步了解 Spring Batch 的基本架構(gòu)。
- Spring Batch 發(fā)布包,您可以在這里找到各個版本的 Spring Batch 發(fā)布包。
- Spring Batch 入門:教你如何入門。
- Spring Batch 參考手冊,詳細了解 Spring Batch 框架。
- Spring 參考手冊,Spring Framework 知識學(xué)習(xí)。
- “Spring Richclient 中的安全認(rèn)證管理”(developerWorks,2011 年 7 月):作為企業(yè)級開發(fā)框架,Spring Richclient 為我們提供了完善的安全認(rèn)證管理功能,使我們能夠方便構(gòu)建安全的企業(yè)級應(yīng)用。本文將詳細介紹 Spring Richclient 中安全認(rèn)證管理的實現(xiàn)方式以及使用方法。
- “Struts2、Spring、Hibernate 高效開發(fā)的最佳實踐”(developerWorks,2011 年 8 月):Struts2、Spring、Hibernate(SSH)是最常用的 Java EE Web 組件層的開發(fā)技術(shù)搭配,網(wǎng)絡(luò)中和許多 IT 技術(shù)書籍中都有它們的開發(fā)教程,但是通常的教程都會讓很多程序員陷入痛苦的配置與修改配置的過程。本文利用 SSH 中的技術(shù)特性,利用 Java 反射技術(shù),按照規(guī)約優(yōu)于配置的原理,基于 SSH 設(shè)定編寫了一個通用開發(fā)框架,這使得開發(fā)者可以專注于業(yè)務(wù)邏輯的開發(fā)。
- “如何將基于 Struts、Spring 和 Hibernate 的應(yīng)用從 Tomcat 遷移到 WebSphere Application Server”(developerWorks,2011 年 11 月):本文向讀者介紹基于 Eclipse 開發(fā)的 Struts、Spring 和 Hibernate 開源應(yīng)用和開發(fā)環(huán)境的特點,并通過實例介紹從 Tomcat 遷移到 WebSphere 所遇到的問題及其解決方案。
- “基于 Spring 和 iBATIS 的動態(tài)可更新多數(shù)據(jù)源持久層”(developerWorks,2012 年 2 月):開發(fā)擁有多重數(shù)據(jù)源的項目時,經(jīng)常希望能夠通過用戶界面來動態(tài)配置數(shù)據(jù)源。本文針對這一問題提出了創(chuàng)新的解決方案,通過使用 Spring+iBATIS 的組合,來實現(xiàn)可動態(tài)更新的多重數(shù)據(jù)源的持久層,從而可以通過用戶界面自主地管理所需的數(shù)據(jù)源。
- developerWorks Java 技術(shù)專區(qū):這里有數(shù)百篇關(guān)于 Java 編程各個方面的文章。
?
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/articles/9057822.html
總結(jié)
以上是生活随笔為你收集整理的使用 Spring Batch 构建企业级批处理应用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 集群调度框架的架构演进之路
- 下一篇: 分布式定时任务调度系统技术选型--转