trident原理及编程指南
trident原理及編程指南
@(STORM)[storm, 大數(shù)據(jù)]
- trident原理及編程指南
- 一理論介紹
- 一trident是什么
- 二trident處理單位
- 三事務(wù)類型
- 1spout類型
- 2state類型
- 3實現(xiàn)恰好一次的spout與state組合類型
- 二編程指南
- 1定義輸入流
- 2統(tǒng)計單詞數(shù)量
- 3輸出統(tǒng)計結(jié)果
- 4split的字義
- 三使用kafka作為數(shù)據(jù)源
- 1定義kafka相關(guān)配置
- 2從kafka中讀取消息并處理
- 3提交拓撲
- 四State示例
- 1主類
- 2Aggregator的用法
- 1Aggregator接口
- 2init方法
- 3aggregate方法
- 4complete方法
- 3state的用法
- 1拓撲定義
- 2工廠類NameSumStateFactory
- 3更新類NameSumUpdater
- 4狀態(tài)類NameSumState
- 4state應(yīng)用思路總結(jié)
一、理論介紹
(一)trident是什么?
Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you’re familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.
簡單的說,trident是storm的更高層次抽象,相對storm,它主要提供了3個方面的好處:
(1)提供了更高層次的抽象,將常用的count,sum等封裝成了方法,可以直接調(diào)用,不需要自己實現(xiàn)。
(2)以批次代替單個元組,每次處理一個批次的數(shù)據(jù)。
(3)提供了事務(wù)支持,可以保證數(shù)據(jù)均處理且只處理了一次。
(二)trident處理單位
trident每次處理消息均為batch為單位,即一次處理多個元組。
(三)事務(wù)類型
關(guān)于事務(wù)類型,有2個比較容易混淆的概念:spout的事務(wù)類型以及事務(wù)狀態(tài)。
它們都有3種類型,分別為:事務(wù)型、非事務(wù)型和透明事務(wù)型。
1、spout類型
spout的類型指定了由于下游出現(xiàn)問題(fail被調(diào)用,或者超時無回復(fù))導(dǎo)致元組需要重放時,應(yīng)該怎么發(fā)送元組。
事務(wù)型spout:重放時能保證同一個批次發(fā)送同一批元組。可以保證每一個元組都被發(fā)送且只發(fā)送一個,且同一個批次所發(fā)送的元組是一樣的。
非事務(wù)型spout:沒有任何保障,發(fā)完就算。
透明事務(wù)型spout:同一個批次發(fā)送的元組有可能不同的,它可以保證每一個元組都被發(fā)送且只發(fā)送一次,但不能保證重放時同一個批次的數(shù)據(jù)是一樣的。這對于部分失效的情況尤其有用,假如以kafka作為spout,當(dāng)一個topic的某個分區(qū)失效時,可以用其它分區(qū)的數(shù)據(jù)先形成一個批次發(fā)送出去,如果是事務(wù)型spout,則必須等待那個分區(qū)恢復(fù)后才能繼續(xù)發(fā)送。
這三種類型可以分別通過實現(xiàn)ITransactionalSpout、ITridentSpout、IOpaquePartitionedTridentSpout接口來定義。
2、state類型
state的類型指定了如果將storm的中間輸出或者最終輸出持久化到某個地方(如內(nèi)存),當(dāng)某個批次的數(shù)據(jù)重放時應(yīng)該如果更新狀態(tài)。state對于下游出現(xiàn)錯誤的情況尤其有用。
事務(wù)型狀態(tài):同一批次tuple提供的結(jié)果是相同的。
非事務(wù)型狀態(tài):沒有回滾能力,更新操作是永久的。
透明事務(wù)型狀態(tài):更新操作基于先前的值,這樣由于這批數(shù)據(jù)發(fā)生變化,對應(yīng)的結(jié)果也會發(fā)生變化。透明事務(wù)型狀態(tài)除了保存當(dāng)前數(shù)據(jù)外,還要保存上一批數(shù)據(jù),當(dāng)數(shù)據(jù)重放時,可以基于上一批數(shù)據(jù)作更新。
注意,此處的狀態(tài)應(yīng)該是原子性的,比如將狀態(tài)寫入hbase,則應(yīng)該全部寫入,或者全部沒寫入,不能說寫入一半,另一半沒寫入,這連事務(wù)型也無法保證恰好一次了。比如說寫入本地磁盤,就有可能導(dǎo)致這種情況,如果寫到一半出錯,則無法保證恰好一次了,因為磁盤沒有類似于數(shù)據(jù)庫的commit、rollback操作。
3、實現(xiàn)恰好一次的spout與state組合類型
由上表可以看出:
(1)當(dāng)spout與state均為transcational或者均為opaque時,可以實現(xiàn)恰好一次。
(2)當(dāng)spout為tansactional,state為opaque時,也可以實現(xiàn)恰好一次。
(3)但當(dāng)spout為opaque,state為transactional時,不可以實現(xiàn)恰好一次。因此opaque spout重發(fā)時,它的內(nèi)容可能與上一次不同,而state如果在上個批次已經(jīng)更新過但這個批次最終fail了,則spout重發(fā)時,會在已經(jīng)fail掉的批次上更新,而上一個批次是不應(yīng)該計算在內(nèi)的。如果state是transactional的,則它同時保存了上一次狀態(tài)及當(dāng)前狀態(tài),所以可以基于上一次的狀態(tài)作更新,就不會有這個問題。
二、編程指南
代碼如下
package org.ljh.tridentdemo;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple;public class TridentWordCount {public static class Split extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {collector.emit(new Values(word));}}}public static StormTopology buildTopology(LocalDRPC drpc) {FixedBatchSpout spout =new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),new Values("how many apples can you eat"), new Values("to be or not to be the person"));spout.setCycle(true);//創(chuàng)建拓撲對象TridentTopology topology = new TridentTopology();//這個流程用于統(tǒng)計單詞數(shù)據(jù),結(jié)果將被保存在wordCounts中TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);//這個流程用于查詢上面的統(tǒng)計結(jié)果topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();}public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCounter", conf, buildTopology(drpc));for (int i = 0; i < 100; i++) {System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));Thread.sleep(1000);}} else {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));}} }實例實現(xiàn)了最基本的wordcount功能,然后將結(jié)果輸出。關(guān)鍵步驟如下:
1、定義輸入流
FixedBatchSpout spout =new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),new Values("how many apples can you eat"), new Values("to be or not to be the person"));spout.setCycle(true);(1)使用FixedBatchSpout創(chuàng)建一個輸入spout,spout的輸出字段為sentence,每3個元組作為一個batch。
(2)數(shù)據(jù)不斷的重復(fù)發(fā)送。
2、統(tǒng)計單詞數(shù)量
TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);這個流程用于統(tǒng)計單詞數(shù)據(jù),結(jié)果將被保存在wordCounts中。6行代碼的含義分別為:
(1)首先從spout中讀取消息,spout1定義了zookeeper中用于保存這個拓撲的節(jié)點名稱。
(2)并行度設(shè)置為16,即16個線程同時從spout中讀取消息。
(3)each中的三個參數(shù)分別為:輸入字段名稱,處理函數(shù),輸出字段名稱。即從字段名稱叫sentence的數(shù)據(jù)流中讀取數(shù)據(jù),然后經(jīng)過new Split()處理后,以word作為字段名發(fā)送出去。其中new Split()后面介紹,它的功能就是將輸入的內(nèi)容以空格為界作了切分。
(4)將字段名稱為word的數(shù)據(jù)流作分組,即相同值的放在一組。
(5)將已經(jīng)分好組的數(shù)據(jù)作統(tǒng)計,結(jié)果放到MemoryMapState,然后以count作為字段名稱將結(jié)果發(fā)送出去。這步驟會同時存儲數(shù)據(jù)及狀態(tài),并將返回TridentState對象。
(6)并行度設(shè)置。
3、輸出統(tǒng)計結(jié)果
topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));這個流程從上述的wordCounts對象中讀取結(jié)果,并返回。6行代碼的含義分別為:
(1)等待一個drpc調(diào)用,從drpc服務(wù)器中接受words的調(diào)用來提供消息。調(diào)用代碼如下:
drpc.execute("words", "cat the dog jumped")(2)輸入為上述調(diào)用中提供的參數(shù),經(jīng)過Split()后,以word作為字段名稱發(fā)送出去。
(3)以word的值作分組。
(4)從wordCounts對象中查詢結(jié)果。4個參數(shù)分別代表:數(shù)據(jù)來源,輸入數(shù)據(jù),內(nèi)置方法(用于從map中根據(jù)key來查找value),輸出名稱。
(5)過濾掉空的查詢結(jié)果,如本例中,cat和dog都沒有結(jié)果。
(6)將結(jié)果作統(tǒng)計,并以sum作為字段名稱發(fā)送出去,這也是DRPC調(diào)用所返回的結(jié)果。如果沒有這一行,最后的輸出結(jié)果
加上這一行后,結(jié)果為:
DRPC RESULT: [[180]]4、split的字義
public static class Split extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {collector.emit(new Values(word));}} }注意它最后會發(fā)送數(shù)據(jù)。
5、創(chuàng)建并啟動拓撲
public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCounter", conf, buildTopology(drpc));for (int i = 0; i < 100; i++) {System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));Thread.sleep(1000);}} else {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));} }(1)當(dāng)無參數(shù)運行時,啟動一個本地的集群,及自已創(chuàng)建一個drpc對象來輸入。
(2)當(dāng)有參數(shù)運行時,設(shè)置worker數(shù)量為3,然后提交拓撲到集群,并等待遠程的drpc調(diào)用。
三、使用kafka作為數(shù)據(jù)源
package com.netease.sytopology;import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.Arrays;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import storm.kafka.BrokerHosts; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import storm.kafka.trident.OpaqueTridentKafkaSpout; import storm.kafka.trident.TridentKafkaConfig; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;/** 本類完成以下內(nèi)容*/ public class SyTopology {public static final Logger LOG = LoggerFactory.getLogger(SyTopology.class);private final BrokerHosts brokerHosts;public SyTopology(String kafkaZookeeper) {brokerHosts = new ZkHosts(kafkaZookeeper);}public StormTopology buildTopology() {TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());// TransactionalTridentKafkaSpout kafkaSpout = new// TransactionalTridentKafkaSpout(kafkaConfig);OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);TridentTopology topology = new TridentTopology();// TridentState wordCounts =topology.newStream("kafka4", kafkaSpout).each(new Fields("str"), new Split(),new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);// .persistentAggregate(new HazelCastStateFactory(), new Count(),// new Fields("aggregates_words")).parallelismHint(2);return topology.build();}public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {String kafkaZk = args[0];SyTopology topology = new SyTopology(kafkaZk);Config config = new Config();config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);String name = args[1];String dockerIp = args[2];config.setNumWorkers(9);config.setMaxTaskParallelism(5);config.put(Config.NIMBUS_HOST, dockerIp);config.put(Config.NIMBUS_THRIFT_PORT, 6627);config.put(Config.STORM_ZOOKEEPER_PORT, 2181);config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));StormSubmitter.submitTopology(name, config, topology.buildTopology());}static class Split extends BaseFunction {public void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(",")) {try {FileWriter fw = new FileWriter(new File("/home/data/test/ma30/ma30.txt"),true);fw.write(word);fw.flush();fw.close();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}collector.emit(new Values(word));}}} }本例將從kafka中讀取消息,然后對消息根據(jù)“,”作拆分,并寫入一個本地文件。
1、定義kafka相關(guān)配置
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);其中ma30是訂閱的topic名稱。
2、從kafka中讀取消息并處理
topology.newStream("kafka4", kafkaSpout).each(new Fields("str"), new Split(),new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),new Fields("count")).parallelismHint(16);(1)指定了數(shù)據(jù)來源,并指定zookeeper中用于保存數(shù)據(jù)的位置,即保存在/transactional/kafka4。
(2)指定處理方法及發(fā)射的字段。
(3)根據(jù)word作分組。
(4)計數(shù)后將狀態(tài)寫入MemoryMapState
3、提交拓撲:
storm jar target/sytopology2-0.0.1-SNAPSHOT.jar com.netease.sytopology.SyTopology 192.168.172.98:2181/kafka test3 192.168.172.98此時可以在/home/data/test/ma30/ma30.txt看到split的結(jié)果
四、State示例
trident通過spout的事務(wù)性與state的事務(wù)處理,保證了恰好一次的語義。這里介紹了如何使用state。
完整代碼請見 https://github.com/lujinhong/tridentdemo
1、主類
主類定義了拓撲的整體邏輯,這個拓撲通過一個固定的spout循環(huán)產(chǎn)生數(shù)據(jù),然后統(tǒng)計消息中每個名字出現(xiàn)的次數(shù)。
拓撲中先將消息中的內(nèi)容提取出來成name, age, title, tel4個field,然后通過project只保留name字段供統(tǒng)計,接著按照name分區(qū)后,為每個分區(qū)進行聚合,最后將聚合結(jié)果通過state寫入map中。
storm.trident.Stream Origin_Stream = topology.newStream("tridentStateDemoId", spout).parallelismHint(3).shuffle().parallelismHint(3).each(new Fields("msg"), new Splitfield(),new Fields("name", "age", "title", "tel")).parallelismHint(3).project(new Fields("name")) //其實沒什么必要,上面就不需要發(fā)射BCD字段,但可以示范一下project的用法.parallelismHint(3).partitionBy(new Fields("name")); //根據(jù)name的值作分區(qū)Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue")).partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());2、Aggregator的用法
這里涉及了一些trident常用的API,但project等相對容易理解,這里只介紹partitionAggregate的用法。
再看看上面代碼中對partitionAggregate的使用:
Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue"))第一,三個參數(shù)分別表示輸入流的名稱與輸出流的名稱。中間的NameCountAggregator是一個Aggregator的對象,它定義了如何對輸入流進行聚合。我們看一下它的代碼:
public class NameCountAggregator implements Aggregator<Map<String, Integer>> {private static final long serialVersionUID = -5141558506999420908L;@Overridepublic Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();}//判斷某個名字是否已經(jīng)存在于map中,若無,則put,若有,則遞增@Overridepublic void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}}//將聚合后的結(jié)果emit出去@Overridepublic void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }@Overridepublic void prepare(Map conf, TridentOperationContext context) {}@Overridepublic void cleanup() {}}(1)Aggregator接口
它實現(xiàn)了Aggregator接口,這個接口有3個方法:
public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); }init方法:在處理batch之前被調(diào)用。init的返回值是一個表示聚合狀態(tài)的對象,該對象會被傳遞到aggregate和complete方法。
aggregate方法:為每個在batch分區(qū)的輸入元組所調(diào)用,更新狀態(tài)
complete方法:當(dāng)batch分區(qū)的所有元組已經(jīng)被aggregate方法處理完后被調(diào)用。
除了實現(xiàn)Aggregator接口,還可以實現(xiàn)ReducerAggregator或者CombinerAggregator,它們使用更方便。詳見《從零開始學(xué)storm》或者官方文檔
https://storm.apache.org/documentation/Trident-API-Overview.html
下面我們看一下這3個方法的實現(xiàn)。
(2)init方法
@Override public Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>(); }僅初始化了一個HashMap對象,這個對象會作為參數(shù)傳給aggregate和complete方法。對一個batch只執(zhí)行一次。
(3)aggregate方法
aggregate方法對于batch內(nèi)的每一個tuple均執(zhí)行一次。這里將這個batch內(nèi)的名字出現(xiàn)的次數(shù)放到init方法所初始化的map中。
@Override public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);} }(4)complete方法
這里在complete將aggregate處理完的結(jié)果發(fā)送出去,實際上可以在任何地方emit,比如在aggregate里面。
這個方法對于一個batch也只執(zhí)行一次。
3、state的用法
(1)拓撲定義
先看一下主類中如何將結(jié)果寫入state:
partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());它的定義為:
TridentState storm.trident.Stream.partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater)其中的第二個參數(shù)比較容易理解,就是輸入流的名稱,這里是名字與它出現(xiàn)的個數(shù)。下面先看一下Facotry。
(2)工廠類:NameSumStateFactory
很簡單,它實現(xiàn)了StateFactory,只有一個方法makeState,返回一個State類型的對象。
public class NameSumStateFactory implements StateFactory {private static final long serialVersionUID = 8753337648320982637L;@Overridepublic State makeState(Map arg0, IMetricsContext arg1, int arg2, int arg3) {return new NameSumState(); } }(3)更新類:NameSumUpdater
這個類繼承自BaseStateUpdater,它的updateState對batch的內(nèi)容進行處理,這里是將batch的內(nèi)容放到一個map中,然后調(diào)用setBulk方法
public class NameSumUpdater extends BaseStateUpdater<NameSumState> {private static final long serialVersionUID = -6108745529419385248L;public void updateState(NameSumState state, List<TridentTuple> tuples, TridentCollector collector) {Map<String,Integer> map=new HashMap<String,Integer>();for(TridentTuple t: tuples) {map.put(t.getString(0), t.getInteger(1));}state.setBulk(map);} }(4)狀態(tài)類:NameSumState
這是state最核心的類,它實現(xiàn)了大部分的邏輯。NameSumState實現(xiàn)了State接口:
public interface State {void beginCommit(Long txid); void commit(Long txid); }分別在提交之前與提交成功的時候調(diào)用,在這里只打印了一些信息。
另外NameSumState還定義了如何處理NameSumUpdater傳遞的消息:
public void setBulk(Map<String, Integer> map) {// 將新到的tuple累加至map中for (Entry<String, Integer> entry : map.entrySet()) {String key = entry.getKey();if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());}}System.out.println("-------");// 將map中的當(dāng)前狀態(tài)打印出來。for (Entry<String, Integer> entry : this.map.entrySet()) {String Key = entry.getKey();Integer Value = entry.getValue();System.out.println(Key + "|" + Value);} }即將NameSumUpdater傳送過來的內(nèi)容寫入一個HashMap中,并打印出來。
此處將state記錄在一個HashMap中,如果需要記錄在其它地方,如mysql,則使用jdbc寫入mysql代替下面的map操作即可。
事實上,這個操作不一定要在state中執(zhí)行,可以在任何類中,但建議還是在state類中實現(xiàn)。
4、state應(yīng)用思路總結(jié)
(1)使用state,你不再需要比較事務(wù)id,在數(shù)據(jù)庫中同時寫入多個值等內(nèi)容,而是專注于你的邏輯實現(xiàn)
(2)除了實現(xiàn)State接口,更常用的是實現(xiàn)MapState接口,下次補充。
(3)在拓撲中指定了StateFactory,這個工廠類找到相應(yīng)的State類。而Updater則每個批次均會調(diào)用它的方法。State中則定義了如何保存數(shù)據(jù),這里將數(shù)據(jù)保存在內(nèi)存中的一個HashMap,還可以保存在mysql, hbase等等。
(4)trident會自動比較txid的值,如果和當(dāng)前一樣,則不更改狀態(tài),如果是當(dāng)前txid的下一個值,則更新狀態(tài)。這種邏輯不需要用戶處理。
(5)如果需要實現(xiàn)透明事務(wù)狀態(tài),則需要保存當(dāng)前值與上一個值,在update的時候2個要同時處理。即邏輯由自己實現(xiàn)。在本例子中,大致思路是在NameSumState中創(chuàng)建2個HashMap,分別對應(yīng)當(dāng)前與上一個狀態(tài)的值,而NameSumUpdater每次更新這2個Map。
總結(jié)
以上是生活随笔為你收集整理的trident原理及编程指南的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。