Storm Trident API
在Storm Trident中有五種操作類(lèi)型
- Apply Locally:本地操作,所有操作應(yīng)用在本地節(jié)點(diǎn)數(shù)據(jù)上,不會(huì)產(chǎn)生網(wǎng)絡(luò)傳輸??? ?
- Repartitioning:數(shù)據(jù)流重定向,單純的改變數(shù)據(jù)流向,不會(huì)改變數(shù)據(jù)內(nèi)容,這部分會(huì)有網(wǎng)絡(luò)傳輸
- Aggragation:聚合操作,會(huì)有網(wǎng)絡(luò)傳輸
- Grouped streams上的操作
- Merge和Join
一Apply Locally
1.functions函數(shù)操作
函數(shù)的作用是接收一個(gè)tuple(需指定接收tuple的哪個(gè)字段),輸出0個(gè)或多個(gè)tuples。輸出的新字段值會(huì)被追加到原始輸入tuple的后面,如果一個(gè)function不輸出tuple,那就意味這這個(gè)tuple被過(guò)濾掉了,例如下面的例子:
1 class AddAndSubFuction extends BaseFunction{ 2 3 public void execute(TridentTuple tuple, TridentCollector collector) { 4 int res1 = tuple.getInteger(0); 5 int res2 = tuple.getInteger(1); 6 int sub = res1 > res2 ? res1 - res2 : res2 - res1; 7 collector.emit(new Values(res1+res2,sub)); 8 } 9 }?
2.Filter過(guò)濾操作
Filters很簡(jiǎn)單,接收一個(gè)tuple,并決定是否保留這個(gè)tuple,例如
1 class ScoreFilter extends BaseFilter{ 2 3 public boolean isKeep(TridentTuple tuple) { 4 return tuple.getInteger(0) >= 60; 5 } 6 }上述Filter過(guò)濾調(diào)成績(jī)小于60的tuple.
3.partitionAggregate
PartitionAggregate的作用對(duì)每個(gè)Partition中的tuple進(jìn)行聚合,與前面的函數(shù)在原tuple后面追加數(shù)據(jù)不同,PartitionAggregate的輸出會(huì)直接替換掉輸入的tuple,僅數(shù)據(jù)PartitionAggregate中發(fā)射的tuple。
TridentAPI提供了三個(gè)聚合器接口:CombinerAggregator,ReducerAggregator,Aggregator
我們先來(lái)看一看CombinerAggregator,CombinerAggregator接口的定義如下:
?
CombinerAggregator接口只返回一個(gè)tuple,并且這個(gè)tuple也只包含一個(gè)field。init方法會(huì)先執(zhí)行,它負(fù)責(zé)預(yù)處理每一個(gè)接收到的tuple,然后再執(zhí)行combine函數(shù)來(lái)計(jì)算收到的tuples直到最后一個(gè)tuple到達(dá),當(dāng)所有tuple處理完時(shí),CombinerAggregator會(huì)發(fā)射zero函數(shù)的輸出,比如CombinerAggregator的實(shí)現(xiàn)類(lèi)Count的定義如下:
?
當(dāng)你使用aggregate?方法代替PartitionAggregate時(shí),CombinerAggregator的好處就體現(xiàn)出來(lái)了,因?yàn)門(mén)rident會(huì)自動(dòng)優(yōu)化計(jì)算,在網(wǎng)絡(luò)傳輸tuples之前做局部聚合。
我們?cè)賮?lái)看一下ReducerAggregator,ReducerAggregator的定義如下:
?
ReducerAggregator通過(guò)init方法提供一個(gè)初始值,然后為輸入的每個(gè)tuple迭代這個(gè)值,最終產(chǎn)生一個(gè)唯一的tuple并輸出,定義一個(gè)實(shí)例如下:
?
最后看一下通用的聚合器Aggregator,它的定義如下:
public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); } Aggregator接口可以發(fā)射含任意數(shù)量屬性的任意數(shù)據(jù)量的tuples,并且可以在執(zhí)行過(guò)程中的任何時(shí)候發(fā)射:
init:在處理數(shù)據(jù)之前被調(diào)用,它的返回值會(huì)作為一個(gè)狀態(tài)值傳遞給aggregate和complete方法
aggregate:用來(lái)處理每一個(gè)輸入的tuple,它可以更新?tīng)顟B(tài)值也可以發(fā)射tuple
complete:當(dāng)所有tuple都被處理完成后被調(diào)用
有時(shí)候我們需要執(zhí)行多個(gè)聚合器,這在Trident中稱(chēng)為chaining
4.projection投影操作
投影操作的作用是僅僅保留stream指定字段的數(shù)據(jù),和關(guān)系數(shù)據(jù)庫(kù)中投影的概念類(lèi)似
二Repartitioning重定向操作
重定向操作是如何在各個(gè)任務(wù)間對(duì)tuples進(jìn)行分區(qū)。分區(qū)的數(shù)量也有可能改變重定向的結(jié)果。重定向需要網(wǎng)絡(luò)傳輸,下面介紹下重定向函數(shù):
?
三Aggragation聚合操作
Trident有aggregate和 persistentAggregate方法來(lái)做聚合操作。aggregate是獨(dú)立的運(yùn)行在Stream的每個(gè)Batch上的,而persistentAggregate則是運(yùn)行在Stream的所有Batch上并把運(yùn)算結(jié)果存儲(chǔ)在state source中。 運(yùn)行aggregate方法做全局聚合。當(dāng)你用到 ReducerAggregator或Aggregator時(shí),Stream首先被重定向到一個(gè)分區(qū)中,然后其中的聚合函數(shù)便在這個(gè)分區(qū)上運(yùn)行。當(dāng)你用到CombinerAggregator時(shí),Trident會(huì)首先在每個(gè)分區(qū)上做局部聚合,然后把局部聚合后的結(jié)果重定向到一個(gè)分區(qū),因此使用CombinerAggregator會(huì)更高效,可能的話我們需要優(yōu)先考慮使用它。
四Grouped streams
GroupBy操作是根據(jù)特定的字段對(duì)流進(jìn)行重定向的,還有,在一個(gè)分區(qū)內(nèi)部,每個(gè)相同字段的tuple也會(huì)被Group到一起。如果你在grouped Stream上面運(yùn)行aggregators,聚合操作會(huì)運(yùn)行在每個(gè)Group中而不是整個(gè)Batch。persistentAggregate也能運(yùn)行在GroupedSteam上,不過(guò)結(jié)果會(huì)被保存在MapState中,其中的key便是分組的字段。 當(dāng)然,aggregators在GroupedStreams上也可以串聯(lián)。
五Merge和Join
api的最后一部分便是如何把各種流匯聚到一起。最簡(jiǎn)單的方式就是把這些流匯聚成一個(gè)流。我們可以這么做:
topology.merge(stream1, stream2, stream3);?
另一種合并流的方式就是join。一個(gè)標(biāo)準(zhǔn)的join就像是一個(gè)sql,必須有標(biāo)準(zhǔn)的輸入,因此,join只針對(duì)符合條件的Stream。join應(yīng)用在來(lái)自Spout的每一個(gè)小Batch中。join時(shí)候的tuple會(huì)包含:
1.join的字段,如Stream1中的key和Stream2中的x
2.所有非join的字段,根據(jù)傳入join方法的順序,a和b分別代表steam1的val1和val2,c代表Stream2的val1
當(dāng)join的是來(lái)源于不同Spout的stream時(shí),這些Spout在發(fā)射數(shù)據(jù)時(shí)需要同步,一個(gè)Batch所包含的tuple會(huì)來(lái)自各個(gè)Spout。
?
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/senlinyang/p/8081447.html
總結(jié)
以上是生活随笔為你收集整理的Storm Trident API的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 第一周冲刺_周三总结
- 下一篇: 随机验证码 php