Storm源码分析之四: Trident源码分析
Storm源碼分析之四: Trident源碼分析
@(STORM)[storm]
- Storm源碼分析之四 Trident源碼分析
- 一概述
- 0小結
- 1簡介
- 2關鍵類
- 1Spout的創建
- 2spout的消息流
- 3spout調用的整體流程
- 4spout如何被 加載到拓撲中
- 二Spout
- 一Spout的創建
- 1ItridentSpout
- 2BatchCoordinator
- 3Emmitter
- 4一個示例
- 二spout實際的消息流
- 1MasterBatchCoordinator
- 2TridentSpoutCoordinator
- 3TridentSpoutExecutor
- 一Spout的創建
- 三bolt
- 一概述
- 1組件的基本關系
- 2用戶視角與源碼視角
- 二基礎類
- 1Stream
- 1成員變量
- 2projectionValidation
- 3project
- 2Node SpoutNode PartitionNode ProcessorNode
- 詳細分析見書
- 3Group
- 1成員變量
- 2構造方法
- 3outgoingNodes
- 4incommingNodes
- 4GraphGrouper
- 1成員變量
- 2構造方法
- 3reindex
- 4nodeGroup
- 5outgoingGroups
- 6incomingGroups
- 7merge
- 8mergeFully
- 1Stream
- 一概述
- 四在TridentTopologyBuilder中設置Spoutbolt
- 一參考內容
- 一概述
- 二基礎類
- 1GlobalStreamId
- 三TridentTopology
- 1生成bolt的名稱genBoltIds
- 2添加節點addNode
- 3添加節點addSourceNode
- 四TridentTopologyBuilder
一、概述
0、小結
TridentTopologyBuilder與TridentTopology調用MBC/TSC/TSE設置spout與2個bolt,而這三個類通過調用用戶代碼Spout中定義的Coordinator與Emitter完成真正的邏輯。
最后構建好的拓撲會提交至nimbus,nimbus開始調度這個拓撲,開始運行。
1、簡介
trident是storm的更高層次抽象,相對storm,它主要提供了3個方面的好處:
(1)提供了更高層次的抽象,將常用的count,sum等封裝成了方法,可以直接調用,不需要自己實現。
(2)以批次代替單個元組,每次處理一個批次的數據。
(3)提供了事務支持,可以保證數據均處理且只處理了一次。
本文介紹了在一個Trident拓撲中,spout是如何被產生并被調用的。首先介紹了用戶如何創建一個Spout以及其基本原理,然后介紹了Spout的實際數據流,最后解釋了在創建topo時如何設置一個Spout。
2、關鍵類
MaterBatchCorodeinator —————> ITridentSpout.Coordinator#isReady
|
|
v
TridentSpoutCoordinator —————> ITridentSpout.Coordinator#[initialTransaction, success, close]
|
|
v
TridentSpoutExecutor —————> ITridentSpout.Emitter#(emitBatch, success(),close)
Spout中涉及2組類,第一組類定義了用戶如何創建一個Spout,這些用戶的代碼會被第二組的類調用。第二組類定義了實際的數據流是如何發起并傳送的。
(1)Spout的創建
涉及三個類:ItridentSpout, BatchCoordinator, Emitter,其中后面2個是第一個的內部類。
用戶創建一個Spout需要實現上述三個接口。比如storm-kafka中的Spout就是實現了這3個接口或者其子接口。
(2)spout的消息流
也是涉及三個類:MasterBatchCoordinator, TridentSpoutCoordinator, TridentSpoutExecutor。它們除了自身固定的邏輯以外,還會調用用戶的代碼,就是上面介紹的Spout代碼。
它們的定義分別為:
可以看出來,MasterBatchCoordinator才是真正的spout,另外2個都是bolt。
MasterBatchCoordinator會調用用戶定義的BatchCoordinator的isReady()方法,返回true的話,則會發送一個id為batch的消息流,從而開始一個數據流轉。TridentSpoutCoordinator接到MBC的batch流后,會調用BatchCoordinator的initialTransaction()初始化一個消息,并繼續向外發送 batch流。TridentSpoutExecutor接到batch流后,會調用用戶代碼中的TridentSpoutExecutor#emitBatch()方法,開始發送實際的業務數據。
3、spout調用的整體流程
1、MasterBatchCoordinator是Trident中真正的Spout,它可以包含多個TridentSpoutCoordinator的節點。MBC在nextTuple()中向外發送id為batch的流,作為整個數據流的起點。MBC會先判斷正在處理的事務數是否少于maxTransactionActive,是的話就繼續向外發送batch流。
if(_activeTx.size() < _maxTransactionActive) {Long curr = _currTransaction;for(int i=0; i<_maxTransactionActive; i++) {if(!_activeTx.containsKey(curr) && isReady(curr)) {// by using a monotonically increasing attempt id, downstream tasks// can be memory efficient by clearing out state for old attempts// as soon as they see a higher attempt id for a transactionInteger attemptId = _attemptIds.get(curr);if(attemptId==null) {attemptId = 0;} else {attemptId++;}_attemptIds.put(curr, attemptId);for(TransactionalState state: _states) {state.setData(CURRENT_ATTEMPTS, _attemptIds);}TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);_activeTx.put(curr, new TransactionStatus(attempt));_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);_throttler.markEvent();}curr = nextTransactionId(curr);} }2、TSC收到batch流后,在execute()方法中,繼續向外發送batch流。
long txid = attempt.getTransactionId();Object prevMeta = _state.getPreviousState(txid);Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));_state.overrideState(txid, meta);collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));3、TSE收到$batch流后,調用用戶Emitter類中的emitBatch()方法,開始向外發送數據。
_collector.setBatch(info.batchId); _emitter.emitBatch(attempt, input.getValue(1), _collector); _activeBatches.put(attempt.getTransactionId(), attempt);4、當整個消息被成功處理完后,會調用MBC的ack()方法,ack方法會將事務的狀態從PROCESSING改為PROCESSED:
if(status.status==AttemptStatus.PROCESSING) {status.status = AttemptStatus.PROCESSED; }當然,如果fail掉了,則會調用fail()方法。
當sync()方法接收到事務狀態為PROCESSED時,將其改為COMMITTING的狀態,并向外發送id為$commit的流。
5、TSE處理$commit流
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);_activeBatches.remove(attempt.getTransactionId());} else {throw new FailedException("Received commit for different transaction attempt");}}收到$commit流的節點會開始提交操作,但trident會按事務號順序提交事務的,所以由提交bolt來決定是否現在提交,還是先緩存下來之后再提交。
6、當$commit流處理完后,MBC的ack()方法會被再次調用,同時向外發送$success流
else if(status.status==AttemptStatus.COMMITTING) {//如果當前狀態是COMMITTING,則將事務從_activeTx及_attemptIds去掉,并發送$success流。_activeTx.remove(tx.getTransactionId());_attemptIds.remove(tx.getTransactionId());_collector.emit(SUCCESS_STREAM_ID, new Values(tx));_currTransaction = nextTransactionId(tx.getTransactionId());for(TransactionalState state: _states) {state.setData(CURRENT_TX, _currTransaction);}7、TSC處理$commit流
if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {_state.cleanupBefore(attempt.getTransactionId());_coord.success(attempt.getTransactionId());}8、TSE處理$success流
else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {// valid to delete before what's been committed since // those batches will never be accessed again_activeBatches.headMap(attempt.getTransactionId()).clear();_emitter.success(attempt);}至此整個流程全部完成。
總結說就是消息是從MasterBatchCoordinator開始的,它是一個真正的spout,而TridentSpoutCoordinator與TridentSpoutExecutor都是bolt,MasterBatchCoordinator發起協調消息,最后的結果是TridentSpoutExecutor發送業務消息。而發送協調消息與業務消息的都是調用用戶Spout中BatchCoordinator與Emitter中定義的代碼。
可以參考《storm源碼分析》P458的流程圖
4、spout如何被 加載到拓撲中
(1)在TridentTopologyBuilder的buildTopololg方法中設置了topo的相關信息
(2)在TridentTopology中調用newStream方法,將spout節點加入拓撲。
包括MBC, TSC, TSE等均是在上面2個類中被調用,從而形成一個完整的拓撲。
二、Spout
(一)Spout的創建
1、ItridentSpout
在Trident中用戶定義的Spout需要實現ItridentSpout接口。我們先看看ItridentSpout的定義
package storm.trident.spout;import backtype.storm.task.TopologyContext; import storm.trident.topology.TransactionAttempt; import backtype.storm.tuple.Fields; import java.io.Serializable; import java.util.Map; import storm.trident.operation.TridentCollector;public interface ITridentSpout<T> extends Serializable {public interface BatchCoordinator<X> {X initializeTransaction(long txid, X prevMetadata, X currMetadata); void success(long txid); boolean isReady(long txid)void close();}public interface Emitter<X> {void emitBatch(TransactionAttempt tx, X coordinatorMeta, TridentCollector collector);void success(TransactionAttempt tx);void close();}BatchCoordinator<T> getCoordinator(String txStateId, Map conf, TopologyContext context);Emitter<T> getEmitter(String txStateId, Map conf, TopologyContext context); Map getComponentConfiguration();Fields getOutputFields(); }它有2個內部接口,分別是BatchCoordinator和Emitter,分別是用于協調的Spout接口和發送消息的Bolt接口。實現一個Spout的主要工作就在于實現這2個接口,創建實際工作的Coordinator和Emitter。Spout中提供了2個get方法用于分別用于指定使用哪個Coordinator和Emitter類,這些類會由用戶定義。稍后我們再分析Coordinator和Emitter的內容。
除此之外,還提供了getComponentConfiguration用于獲取配置信息,getOutputFields獲取輸出field。
我們再看看2個內部接口的代碼。
2、BatchCoordinator
public interface BatchCoordinator<X> {X initializeTransaction(long txid, X prevMetadata, X currMetadata);void success(long txid);boolean isReady(long txid);void close(); }(1)initializeTransaction方法返回一個用戶定義的事務元數據。X是用戶自定義的與事務相關的數據類型,返回的數據會存儲到zk中。
其中txid為事務序列號,prevMetadata是前一個事務所對應的元數據。若當前事務為第一個事務,則其為空。currMetadata是當前事務的元數據,如果是當前事務的第一次嘗試,則為空,否則為事務上一次嘗試所產生的元數據。
(2)isReady方法用于判斷事務所對應的數據是否已經準備好,當為true時,表示可以開始一個新事務。其參數是當前的事務號。
BatchCoordinator中實現的方法會被部署到多個節點中運行,其中isReady是在真正的Spout(MasterBatchCoordinator)中執行的,其余方法在TridentSpoutCoordinator中執行。
3、Emmitter
public interface Emitter<X> {void emitBatch(TransactionAttempt tx, X coordinatorMeta, TridentCollector collector);void success(TransactionAttempt tx);void close(); }消息發送節點會接收協調spout的$batch和$success流。
(1)當收到$batch消息時,節點便調用emitBatch方法來發送消息。
(2)當收到$success消息時,會調用success方法對事務進行后處理
4、一個示例
參考 DiagnosisEventSpout
(1)Spout的代碼
package com.packtpub.storm.trident.spout;import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import storm.trident.spout.ITridentSpout;import java.util.Map;@SuppressWarnings("rawtypes") public class DiagnosisEventSpout implements ITridentSpout<Long> {private static final long serialVersionUID = 1L;BatchCoordinator<Long> coordinator = new DefaultCoordinator();Emitter<Long> emitter = new DiagnosisEventEmitter();@Overridepublic BatchCoordinator<Long> getCoordinator(String txStateId, Map conf, TopologyContext context) {return coordinator;}@Overridepublic Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {return emitter;}@Overridepublic Map getComponentConfiguration() {return null;}@Overridepublic Fields getOutputFields() {return new Fields("event");} }(2)BatchCoordinator的代碼
package com.packtpub.storm.trident.spout;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import storm.trident.spout.ITridentSpout.BatchCoordinator;import java.io.Serializable;public class DefaultCoordinator implements BatchCoordinator<Long>, Serializable {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(DefaultCoordinator.class);@Overridepublic boolean isReady(long txid) {return true;}@Overridepublic void close() {}@Overridepublic Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {LOG.info("Initializing Transaction [" + txid + "]");return null;}@Overridepublic void success(long txid) {LOG.info("Successful Transaction [" + txid + "]");} }(3)Emitter的代碼
package com.packtpub.storm.trident.spout;import com.packtpub.storm.trident.model.DiagnosisEvent; import storm.trident.operation.TridentCollector; import storm.trident.spout.ITridentSpout.Emitter; import storm.trident.topology.TransactionAttempt;import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger;public class DiagnosisEventEmitter implements Emitter<Long>, Serializable {private static final long serialVersionUID = 1L;AtomicInteger successfulTransactions = new AtomicInteger(0);@Overridepublic void emitBatch(TransactionAttempt tx, Long coordinatorMeta, TridentCollector collector) {for (int i = 0; i < 10000; i++) {List<Object> events = new ArrayList<Object>();double lat = new Double(-30 + (int) (Math.random() * 75));double lng = new Double(-120 + (int) (Math.random() * 70));long time = System.currentTimeMillis();String diag = new Integer(320 + (int) (Math.random() * 7)).toString();DiagnosisEvent event = new DiagnosisEvent(lat, lng, time, diag);events.add(event);collector.emit(events);}}@Overridepublic void success(TransactionAttempt tx) {successfulTransactions.incrementAndGet();}@Overridepublic void close() {}}(4)最后,在創建topo時指定spout
TridentTopology topology = new TridentTopology();DiagnosisEventSpout spout = new DiagnosisEventSpout();Stream inputStream = topology.newStream("event", spout);(二)spout實際的消息流
以上的內容說明了如何在用戶代碼中創建一個Spout,以及其基本原理。但創建Spout后,它是怎么被加載到拓撲真正的Spout中呢?我們繼續看trident的實現。
1、MasterBatchCoordinator
總體而言,MasterBatchCoordinator作為一個數據流的真正起點:
* 首先調用open方法完成初始化,包括讀取之前的拓撲處理到的事務序列號,最多同時處理的tuple數量,每個事務的嘗試次數等。
* 然后nextTuple會改變事務的狀態,或者是創建事務并發送$batch流。
* 最后,ack方法會根據流的狀態向外發送$commit流,或者是重新調用sync方法,開始創建新的事務。
總而言之,MasterBatchCoordinator作為拓撲數據流的真正起點,通過循環發送協調信息,不斷的處理數據流。MasterBatchCoordinator的真正作用在于協調消息的起點,里面所有的map,如_activeTx,_attemptIds等都只是為了保存當前正在處理的情況而已。
(1)MasterBatchCoordinator是一個真正的spout
public class MasterBatchCoordinator extends BaseRichSpout一個Trident拓撲的真正邏輯就是從MasterBatchCoordinator開始的,先調用open方法完成一些初始化,然后是在nextTuple中發送$batch和$commit流。
(2)看一下open方法
@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);for(String spoutId: _managedSpoutIds) {//每個MasterBatchSpout可以處理多個ITridentSpout,這里將多個spout的元數據放到_states這個Map中。稍后再看看放進來的是什么內容。_states.add(TransactionalState.newCoordinatorState(conf, spoutId));}//從zk中獲取當前的transation事務序號,當拓撲新啟動時,需要從zk恢復之前的狀態。也就是說zk存儲的是下一個需要提交的事務序號,而不是已經提交的事務序號。_currTransaction = getStoredCurrTransaction();_collector = collector;//任何時刻中,一個spout task最多可以同時處理的tuple數量,即已經emite,但未acked的tuple數量。Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);if(active==null) {_maxTransactionActive = 1;} else {_maxTransactionActive = active.intValue();}//每一個事務的當前嘗試編號,即_currTransaction這個事務序號中,各個事務的嘗試次數。_attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);for(int i=0; i<_spouts.size(); i++) {//將各個Spout的Coordinator保存在_coordinators這個List中。String txId = _managedSpoutIds.get(i);_coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));}}(3)再看一下nextTuple()方法,它只調用了sync()方法,主要完成了以下功能:
* 如果事務狀態是PROCESSED,則將其狀態改為COMMITTING,然后發送commit流。接收到commit流的節點會調用finishBatch方法,進行事務的提交和后處理
* 如果_activeTx.size()小于_maxTransactionActive,則新建事務,放到_activeTx中,同時向外發送$batch流,等待Coordinator的處理。( 當ack方法被 調用時,這個事務會被從_activeTx中移除)
注意:當前處于acitve狀態的應該是序列在[_currTransaction,_currTransaction+_maxTransactionActive-1]之間的事務。
完整代碼見最后。
(4)繼續往下,看看ack方法。
@Override public void ack(Object msgId) {//獲取某個事務的狀態TransactionAttempt tx = (TransactionAttempt) msgId;TransactionStatus status = _activeTx.get(tx.getTransactionId());if(status!=null && tx.equals(status.attempt)) {//如果當前狀態是PROCESSING,則改為PROCESSEDif(status.status==AttemptStatus.PROCESSING) {status.status = AttemptStatus.PROCESSED;} else if(status.status==AttemptStatus.COMMITTING) {//如果當前狀態是COMMITTING,則將事務從_activeTx及_attemptIds去掉,并發送$success流。_activeTx.remove(tx.getTransactionId());_attemptIds.remove(tx.getTransactionId());_collector.emit(SUCCESS_STREAM_ID, new Values(tx));_currTransaction = nextTransactionId(tx.getTransactionId());for(TransactionalState state: _states) {state.setData(CURRENT_TX, _currTransaction); }}//由于有些事務狀態已經改變,需要重新調用sync()繼續后續處理,或者發送新tuple。sync();} }(5)還有fail方法和declareOutputFileds方法。
@Override public void fail(Object msgId) {TransactionAttempt tx = (TransactionAttempt) msgId;TransactionStatus stored = _activeTx.remove(tx.getTransactionId());if(stored!=null && tx.equals(stored.attempt)) {_activeTx.tailMap(tx.getTransactionId()).clear();sync();} }@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {// in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,// when it sees the earlier txid it should know to emit nothingdeclarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx")); }2、TridentSpoutCoordinator
TridentSpoutCoordinator接收來自MasterBatchCoordinator的$success流與$batch流,并通過調用用戶代碼,實現真正的邏輯。此外還向TridentSpoutExecuter發送$batch流,以觸發后者開始真正發送業務數據流。
(1)TridentSpoutCoordinator是一個bolt
public class TridentSpoutCoordinator implements IBasicBolt(2)在創建TridentSpoutCoordinator時,需要傳遞一個ITridentSpout對象,
public TridentSpoutCoordinator(String id, ITridentSpout spout) {_spout = spout;_id = id;}然后使用這個對象來獲取到用戶定義的Coordinator:
_coord = _spout.getCoordinator(_id, conf, context);(3)_state和_underlyingState保存了zk中的元數據信息
_underlyingState = TransactionalState.newCoordinatorState(conf, _id); _state = new RotatingTransactionalState(_underlyingState, META_DIR);(4)在execute方法中,TridentSpoutCoordinator接收$success流與$batch流,先看看$success流:
if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) { _state.cleanupBefore(attempt.getTransactionId()); _coord.success(attempt.getTransactionId()); }即接收到$success流時,調用用戶定義的Coordinator中的success方法。同時還清理了zk中的數據。
(5)再看看$batch流
當收到$batch流流時,初始化一個事務并將其發送出去。由于在trident中消息有可能是重放的,因此需要prevMeta。注意,trident是在bolt中初始化一個事務的。
3、TridentSpoutExecutor
TridentSpoutExecutor接收來自TridentSpoutCoordinator的消息流,包括$commit,$success與$batch流,前面2個分別調用emmitter的commit與success方法,$batch則調用emmitter的emitBatch方法,開始向外發送業務數據。
對于分區類型的spout,有可能是OpaquePartitionedTridentSpoutExecutor等分區類型的executor。
(1) TridentSpoutExecutor與是一個bolt
publicclassTridentSpoutExecutorimplementsITridentBatchBolt(2)核心的execute方法
@Override public void execute(BatchInfo info, Tuple input) {// there won't be a BatchInfo for the success streamTransactionAttempt attempt = (TransactionAttempt) input.getValue(0);if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);_activeBatches.remove(attempt.getTransactionId());} else {throw new FailedException("Received commit for different transaction attempt");}} else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {// valid to delete before what's been committed since // those batches will never be accessed again_activeBatches.headMap(attempt.getTransactionId()).clear();_emitter.success(attempt);} else { _collector.setBatch(info.batchId);//發送業務消息_emitter.emitBatch(attempt, input.getValue(1), _collector);_activeBatches.put(attempt.getTransactionId(), attempt);} }三、bolt
(一)概述
1、組件的基本關系
(1)trident拓撲最終會轉化為一個spout和多個bolt,每個bolt對應一個SubTopologyBolt,它通過TridentBoltExecutor適配成一個bolt。而每個SubTopologyBOlt則由很多節點組成,具體點說這個節點包括(Stream|Node)2部分,注意,Node不是Stream自身的成員變量,而是一個具體的處理節點。Stream定義了哪些數據流,Node定義和如何進行操作,Node包含了一個ProjectedProccessor等處理器,用于定義如何進行數據處理。
(2)一個SubTopologyBOlt包含多個Group,但大多數情況下是一個Group。看TridentTopology#genBoltIds()的代碼。在一個SubTopologyBolt中,含有多個節點組是可能的。例如在含有DRPC的Topology中,查詢操作也存儲操作可以被分配到同一個SubTopologyBolt中。于是該bolt可能收到來自2個節點組的消息。
(3)一個Group有多個Node。符合一定條件的Node會被merge()成一個Group,每個Node表示一個操作。
(4)每個Node與一個Stream一一對應。注意Stream不是指端到端的完整流,而是每一個步驟的處理對象,所有的Stream組合起來才形成完整的流。看Stream的成員變量。
(5)每個Node可能有多個父stream,但多個的情況只在merge()調用multiReduce()時使用。每個Stream與node之間創建一條邊。見TridentTopology#addSourceNode()方法。
2、用戶視角與源碼視角
在用戶角度來看,他通過newStream(),each(),filter()待方案對Stream進行操作。而在代碼角度,這些操作會被轉化為各種Node節點,它些節點組合成一個SubTopologyBolt,然后經過TridentBoltExecutor適配后成為一個bolt。
從用戶層面來看TridentTopology,有兩個重要的概念一是Stream,另一個是作用于Stream上的各種Operation。在實現層面來看,無論是stream,還是后續的operation都會轉變成為各個Node,這些Node之間的關系通過重要的數據結構圖來維護。具體到TridentTopology,實現圖的各種操作的組件是jgrapht。
說到圖,兩個基本的概念會閃現出來,一是結點,二是描述結點之間關系的邊。要想很好的理解TridentTopology就需要緊盯圖中結點和邊的變化。
TridentTopology在轉換成為普通的StormTopology時,需要將原始的圖分成各個group,每個group將運行于一個獨立的bolt中。TridentTopology又是如何知道哪些node應該在同一個group,哪些應該處在另一個group中的呢;如何來確定每個group的并發度(parallismHint)的呢。這些問題的解決都與jgrapht分不開。
關于jgrapht的更多信息,請參考其官方網站 http://jgrapht.org
========================================================
在用戶看來,所有的操作就是各種各樣的數據流與operation的組合,這些組合會被封裝成一個Node(即一個Node包含輸入流+操作+輸出流),符合一定規則的Node會被組合與一個組,組會被放到一個bolt中。
一個blot節點中可能含有多個操作,各個操作間需要進行消息傳遞
(二)基礎類
1、Stream
Stream主要定義了數據流的各種操作,如each(),pproject()等。
(1)成員變量
Node _node; TridentTopology _topology; String _name;三個成員變量:
* Node對象,這表明Stream與Node是一一對應的,每個節點對應一個Stream對象。
* name:這個Stream的名稱,也等于是這這個Node的名稱。
* TridentTopology: 這個Stram所屬的拓撲,使用這個變量,可以調用addSourceNode()等方法。
其中_node變量被使用很少。
(2)projectionValidation()
這個方法用于檢查是否對一個不存在的field進行了操作。
private void projectionValidation(Fields projFields) {if (projFields == null) {return;}Fields allFields = this.getOutputFields();for (String field : projFields) {if (!allFields.contains(field)) {throw new IllegalArgumentException("Trying to select non-existent field: '" + field + "' from stream containing fields fields: <" + allFields + ">");}} }stream中定義了定義了各種各樣的trident操作,下面分別介紹
(3)project()
public Stream project(Fields keepFields) {projectionValidation(keepFields);return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields))); }首先檢查一下需要project的field是否存在。然后就在TridentTopology中新增一個節點。
第一個參數就是Stream自身,第二個參數是一個Node的子類–ProcessorNode。創建ProcessorNode時,最后一個參數ProjectedProcessor用于指定如何對流進行操作。
addSourcedNode把source和node同時添加進一個拓撲,即一個流與一個節點。注意這里的節點不是source這個Stream自身的成員變量_node,而是一個新建的節點,比如在project()方法中的節點就是一個使用ProjectedProcessor創建的ProcessorNode。
2、Node SpoutNode PartitionNode ProcessorNode
(1)Node表示拓撲中的一個節點,后面3個均是其子類。事實上拓撲中的節點均用于產生數據或者對數據進行處理。一個拓撲有多個spout/bolt,每個spout/bolt有一個或者多個Group,我個Group有多個Node。
詳細分析見書。
3、Group
節點組是構建SubTopologyBolt的基礎,也是Topology中執行優化的基本操作單元,Trident會通過不斷的合并節點組來達到最優處理的目的。Group中包含了一組連通的節點。
(1)成員變量
public final Set<Node> nodes = new HashSet<>(); private final DirectedGraph<Node, IndexedEdge> graph; private final String id = UUID.randomUUID().toString();nodes表示節點組中含有的節點。
graph表示拓撲的有向圖。(是整個拓撲的構成的圖)
id用于唯一標識一個group。
(2)構造方法
public Group(DirectedGraph graph, List<Node> nodes) {this.graph = graph;this.nodes.addAll(nodes); }初始狀態時,每個Group只有一個Node.
public Group(DirectedGraph graph, Node n) {this(graph, Arrays.asList(n)); }將2個Group合成一個新的Group。
public Group(Group g1, Group g2) {this.graph = g1.graph;nodes.addAll(g1.nodes);nodes.addAll(g2.nodes); }(3)outgoingNodes()
通過遍歷組中節點的方式來獲取該節點組所有節點的子節點,這些子節點可能屬于該節點組,也可能屬于其它節點組。
public Set<Node> outgoingNodes() {Set<Node> ret = new HashSet<>();for(Node n: nodes) {ret.addAll(TridentUtils.getChildren(graph, n));}return ret; }(4)incommingNodes()
用于獲取該節點組中所有節點的父節點,這些父節點可能屬于該節點組,也可能屬于其它節點組。
4、GraphGrouper
GraphGrouper提供了對節點組進行操作及合并的基本方法。
(1)成員變量
final DirectedGraph<Node, IndexedEdge> graph; final Set<Group> currGroups; final Map<Node, Group> groupIndex = new HashMap<>();graph:與Group相同,即這個拓撲的整個圖。
currGroups:當前graph對應的節點組。節點組之間是沒有交集的。
groupIndex:是一個反向索引,用于快速查詢每個節點所在的節點組。
(2)構造方法
public GraphGrouper(DirectedGraph<Node, IndexedEdge> graph, Collection<Group> initialGroups) {this.graph = graph;this.currGroups = new LinkedHashSet<>(initialGroups);reindex(); }就是為上面幾個變量進行初始化。
(3)reindex()
public void reindex() {groupIndex.clear();for(Group g: currGroups) {for(Node n: g.nodes) {groupIndex.put(n, g);}} }根據currGroups的內容重構groupIndex。
(4)nodeGroup()
public Group nodeGroup(Node n) {return groupIndex.get(n); }查詢某個node屬于哪個group。
(5)outgoingGroups()
計算節點組與哪些節點組之間存在有向邊,即2個節點組是相連的。其基本算法是遍歷每一個節點的子節點,若該子節點所在的節點組與自身不同,則獲得子節點所在的節點組。
public Collection<Group> outgoingGroups(Group g) {Set<Group> ret = new HashSet<>();for(Node n: g.outgoingNodes()) {Group other = nodeGroup(n);if(other==null || !other.equals(g)) {ret.add(other); }}return ret; }(6)incomingGroups()
用于獲取該節點組的父節點組,算法與上面類似。
public Collection<Group> incomingGroups(Group g) {Set<Group> ret = new HashSet<>();for(Node n: g.incomingNodes()) {Group other = nodeGroup(n);if(other==null || !other.equals(g)) {ret.add(other); }}return ret; }(7)merge()
合并2個節點組。
private void merge(Group g1, Group g2) {Group newGroup = new Group(g1, g2);currGroups.remove(g1);currGroups.remove(g2);currGroups.add(newGroup);for(Node n: newGroup.nodes) {groupIndex.put(n, newGroup);} }(8)mergeFully
這個方法是GraphGrouper的核心算法,它用來計算何時可以對2個節點組進行合并。基本思想是:如果一個節點組只有一個父節點組,那么將這個節點組與父節點組合并;如果一個節點組只有一個子節點組,那么將子節點組與自身節點組合并。反復進行這個過程。
public void mergeFully() {boolean somethingHappened = true;while(somethingHappened) {somethingHappened = false;for(Group g: currGroups) {Collection<Group> outgoingGroups = outgoingGroups(g);if(outgoingGroups.size()==1) {Group out = outgoingGroups.iterator().next();if(out!=null) {merge(g, out);somethingHappened = true;break;}}Collection<Group> incomingGroups = incomingGroups(g);if(incomingGroups.size()==1) {Group in = incomingGroups.iterator().next();if(in!=null) {merge(g, in);somethingHappened = true;break;}} }} }四、在TridentTopologyBuilder中設置Spout、bolt
(一)參考內容
http://www.cnblogs.com/hseagle/p/3490635.html
TridentTopology是storm提供的高層使用接口,常見的一些SQL中的操作tridenttopology提供的api中都有類似的影射。
從TridentTopology到vanilla topology(普通的topology)由三個層次組
成:
1. 面向最終用戶的概念stream, operation
2. 利用planner將tridenttopology轉換成vanilla topology
3. 執行vanilla topology
從TridentTopology到基本的Topology有三層,下圖是一個全局視圖。
從用戶層面來看TridentTopology,有兩個重要的概念一是Stream,另一個是作用于Stream上的各種Operation。在實現層面來看,無論是stream,還是后續的operation都會轉變成為各個Node,這些Node之間的關系通過重要的數據結構圖來維護。具體到TridentTopology,實現圖的各種操作的組件是jgrapht。
說到圖,兩個基本的概念會閃現出來,一是結點,二是描述結點之間關系的邊。要想很好的理解TridentTopology就需要緊盯圖中結點和邊的變化。
TridentTopology在轉換成為普通的StormTopology時,需要將原始的圖分成各個group,每個group將運行于一個獨立的bolt中。TridentTopology又是如何知道哪些node應該在同一個group,哪些應該處在另一個group中的呢;如何來確定每個group的并發度(parallismHint)的呢。這些問題的解決都與jgrapht分不開。
關于jgrapht的更多信息,請參考其官方網站 http://jgrapht.org
========================================================
在用戶看來,所有的操作就是各種各樣的數據流與operation的組合,這些組合會被封裝成一個Node(即一個Node包含輸入流+操作+輸出流),符合一定規則的Node會被組合與一個組,組會被放到一個bolt中。
一個blot節點中可能含有多個操作,各個操作間需要進行消息傳遞。
=====================================
1、【待完善】通過上面的分析,一個Spout是準備好了,但如何將它加載到拓撲中,并開始真正的數據流:
(1)在TridentTopology中調用newStream方法,將spout節點加入拓撲。
(2)在TridentTopologyBuilder的buildTopololg方法中設置了topo的相關信息
2、拓撲創建的總體流程
(1)在用戶代碼中創建TridentTopology對象
(2)在用戶代碼中指定spout節點和bolt節點
比如:
(3)在用戶代碼中創建拓撲
topology.build();(4)topology.build()會調用TridentTopologyBuilder#buildTopology()
(5)用戶代碼中提交拓撲
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));(一)概述
(二)基礎類
1、GlobalStreamId
這是由trift生成的類,有2個核心成員變量
public GlobalStreamId(String componentId,String streamId)分別記錄了某個component的ID與其對應的streamId,如
"$mastercoord-" + batchGroup MasterBatchCoordinator.BATCH_STREAM_ID表示這個component會消費這個stream的消息。
(三)TridentTopology
主要流程:
(1)創建各種各樣的節點,包括spout/bolt
(2)spout全部放到一個set中
(3)bolt的每一個節點放入一個group中
(4)對group進行各種的merge操作(如g1的所有輸出均到g2,則將它們合并)
(5)直到剩余少量的mergeGroup,作為bolt
(6)TridentTopologyBuilder.buildTopology()對這些spout/mergeGroup進行分組配置。
1、生成bolt的名稱:genBoltIds
genBoltIds用于為bolt生成一個唯一的id,它使用字母b開頭,然后是一個數字id,接著是group的名稱,然后是第2個id, 第2個group的名稱….。而group的名稱是由這個group包含的Node名稱組成的。
private static Map<Group, String> genBoltIds(Collection<Group> groups) {Map<Group, String> ret = new HashMap<>();int ctr = 0;for(Group g: groups) {if(!isSpoutGroup(g)) {List<String> name = new ArrayList<>();name.add("b");name.add("" + ctr);String groupName = getGroupName(g);if(groupName!=null && !groupName.isEmpty()) {name.add(getGroupName(g)); }ret.put(g, Utils.join(name, "-"));ctr++;}}return ret; }private static String getGroupName(Group g) {TreeMap<Integer, String> sortedNames = new TreeMap<>();for(Node n: g.nodes) {if(n.name!=null) {sortedNames.put(n.creationIndex, n.name);}}List<String> names = new ArrayList<>();String prevName = null;for(String n: sortedNames.values()) {if(prevName==null || !n.equals(prevName)) {prevName = n;names.add(n);}}return Utils.join(names, "-"); }2、添加節點:addNode()
protected Stream addNode(Node n) {registerNode(n);return new Stream(this, n.name, n); }這個方法很簡單,而且,它只在newStream()及newDRPCStream中調用,很明顯這是用于提供一個新的數據源的。而下面的addSourceNode()是用于在bolt中添加下一個處理節點的。
3、添加節點:addSourceNode()
創建一個新節點,指定新節點的父節點(可能多個)。指定多個sources的情況只在merge()方法中被調用multiReduce()時調用。因此這里只關注一個source的情形。
protected Stream addSourcedNode(Stream source, Node newNode) {return addSourcedNode(Arrays.asList(source), newNode); } protected Stream addSourcedNode(List<Stream> sources, Node newNode) {registerSourcedNode(sources, newNode);return new Stream(this, newNode.name, newNode); }addSourcedNode把source和node同時添加進一個拓撲,即一個流與一個節點。注意這里的節點不是source這個Stream自身的成員變量_node,而是一個新建的節點,比如在project()方法中的節點就是一個使用ProjectedProcessor創建的ProcessorNode。
return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));除了注冊新節點 registerNode(newNode)以外,還在每個stream和節點間創建一條邊。
protected void registerSourcedNode(List<Stream> sources, Node newNode) {registerNode(newNode);int streamIndex = 0;for(Stream s: sources) {_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));streamIndex++;} }向圖中添加一個節點。然后若節點中的stateInfo成員不為空,則將該節點放入與存儲序號(StateId)相對應的哈希表_colocate中。_colocate變量將所有訪問同一存儲的節點關聯在一起,并將他們放在一個Bolt中執行。
protected void registerNode(Node n) {_graph.addVertex(n);if(n.stateInfo!=null) {String id = n.stateInfo.id;if(!_colocate.containsKey(id)) {_colocate.put(id, new ArrayList());}_colocate.get(id).add(n);} }(四)TridentTopologyBuilder
總結
以上是生活随笔為你收集整理的Storm源码分析之四: Trident源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Builder 模式
- 下一篇: 遍历HashMap的最佳方法