Trident API 概览
Trident API 概覽
? 在網上看到了很多有TRIDENT相關API的翻譯,看來看去,總覺得沒有說清楚很多東西,所以自己結合使用的經驗翻譯了一篇出來;翻譯完以后,也發現
在自己的翻譯中也有很多地方是表達不清楚的··不過多少感覺有些個人的理解編織在里面了。大俠們勿噴哈!
原文地址:http://storm.apache.org/releases/1.1.0/Trident-API-Overview.html
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
在Trident中核心的數據模式是“流”,以一系列batch的方式進行處理。一個流分區在集群中不同的節點上,并且對于流的操作在各個分區上并行執行。
?
在Trident中有五類操作:
1.在本地的分區上執行的操作,不會引起網絡傳輸;
2.對一個流進行重新分區但并不改變流中的內容(會有網絡傳輸);
3.將網絡傳輸作為操作的一部分的聚合操作;
4.在分組后的流上進行的操作;
5.合并和鏈接(原文:Merges and joins)
?
本地分區操作
本地分區操作不引起網絡傳輸,獨立運行于每一個批量分區中
Function:
?
一個函數接收一批輸入字段并且發送零個或者更多的tuple來作為輸出。輸出tuple的字段被追加到原始的輸入tuple的字段后面。假如一個函數不發送任何的tuple,原始輸入的
tuple就會被過濾掉。否則,原始輸入tuple中的字段會被包含在每一個輸出tuple中。假如你有一個像下面一樣的函數:
?
public class MyFunction extends BaseFunction {
????public void execute(TridentTuple tuple, TridentCollector collector) {
????????for(int i=0; i < tuple.getInteger(0); i++) {//獲取原始輸入tuple中的第一個字段的值,???//然后i從0開始,如果i小于這個值,那么發送一個新的tuple;
//tuple中顯示的發送一個i的值
????????????collector.emit(new Values(i));
????????}
????}
}
現在假設你在mystream(Trident的一個拓撲)變量中有一個流,包含的字段有["a","b","c"],并且有如下的3個tuple要經過該函數:
[1, 2, 3]
[4, 1, 6]
[3, 0, 8]
假如你運行如下代碼:
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))//輸入流中使用 "b"這個字段的值,輸出流中申明一個"d"字段的值
附加解釋
?運行過程如下: ????????????????????????????????????????????????????????????????????????????????????????????????????????????
首先[1, 2, 3]進去了函數中,取到"b"字段中的值為2,那么可以發送兩個tuple,其中 "d"字段的值分別為0 ,1 同時由于原始輸入tuple中的字段會被保留,所以輸出的兩個tuple為如下格式:[1,2 ,3 ,0] [1,2,3,1];同理然后[4,1,6]進入函數,輸出流為[4,1,6,0] ????????????????????????????????????????????????????????????????????????????????最后[3,0,8]進入函數,由于不滿足循環條件,沒有輸出tuple;所以[3,0,8]被直接過濾掉了。 ???????????????????????????????????????
進過該函數處理的輸出tuple擁有一下字段 ["a", "b", "c", "d"],輸出的tuple看起來是這個樣子的:
[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]
?
Filter:
?
過濾器接收一個tuple,并決定是否保留該tuple。假設你有這樣一個過濾器:
?
public class MyFilter extends BaseFilter {
????public boolean isKeep(TridentTuple tuple) {//返回true就會被保留,返回false就不會被保留了
????????return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;//判斷條件是第一個字段值為1,第二個字段值為2的tuple才會被保留下來
????}
}
現在假設你有一些擁有 ["a", "b", "c"]這些字段的tuple,他們的值如下:
[1, 2, 3]
[2, 1, 1]
[2, 3, 4]
那么如果你運行如下代碼:
mystream.filter(new MyFilter())
經過處理后的輸出結果tuple就會變成下面這樣:
[1, 2, 3]
?
map和flatMap(這兩個函數比較新,在比較舊的storm版本中沒有這兩個函數)
?
?map返回一個由提供的mapping函數對原始輸入流作用后的結果所組成的流(注:意味著原來字段對應的值被替換掉了),這種操作可以應用于一個流轉換為另一個流(一對一);
?
?舉個例子,假如這里有一個單詞流,并且你想把這個單詞流中的值轉換為大寫的方式,你就可以像下面這樣定義一個mapping函數了:
???
???注:文中所提到的單詞流,個人理解應該是這樣的 ["world"] -----> [a],[b],[c]
??
public class UpperCase extends MapFunction {
?@Override
?public Values execute(TridentTuple input) {
???return new Values(input.getString(0).toUpperCase());
?}
}
?
這個mapping函數可以被應用于流上,來產生一個把原始輸入流中的單詞轉換為大寫形式的新流;
?
mystream.map(new UpperCase())
注:個人理解------處理后的結果是 ["world"]------> [A],[B],[C]
?flatMap和map很相似,但是擁有將一個流轉換為多個流的能力(一對多),然后把生成的元素平壓到一個新的流中。(注:這句話怎么理解呢?額,不好表達清楚,有厲害的可以幫忙翻譯翻譯;我有一些個人的理解,但是還沒想好怎么組織語言)
?
?舉個例子,有一個句子的流,而且你想你想把這個句子的流轉換為單詞的流,那么你就需要像下面這樣來定義flatMap函數:
?
public class Split extends FlatMapFunction {
??@Override
??public Iterable<Values> execute(TridentTuple input) {//其實函數看起來很簡單
????List<Values> valuesList = new ArrayList<>();
????for (String word : input.getString(0).split(" ")) {
??????valuesList.add(new Values(word));
????}
????return valuesList;
??}
}
?
?這個flatMap函數可以作用于一個句子流,然后生成一個單詞流:
?
mystream.flatMap(new Split())
?
?當然這些操作完全支持鏈式調用,那么你就可以通過如下的方式來將一個句子流轉換一個大寫單詞流:
?
mystream.flatMap(new Split()).map(new UpperCase())
?
?如果你不把輸出字段作為參數傳遞給map和faltMap,map和faltMap會把輸入字段作為輸出字段使用
?
?假如你想用新的輸出字段來替換舊的輸入字段,那么你可以像下面這樣在調用方法的時候,增加一個Fields參數
?
mystream.map(new UpperCase(), new Fields("uppercased"))
?輸出流會忽略輸入流中的字段,并只保留 "uppercased"這個字段。flatMap同理,例子如下:
mystream.flatMap(new Split(), new Fields("word"))
?
?
?
Peek:(這個函數比較新,在比較舊的storm版本中沒有這個函數)
?
?peek用來對流中流過的每一個Trident tuple做一些額外的操作,這個功能在debug中會很有用,當tuple經過管道中的某個特定點的時候你可以觀察到這些tuple。
?
?舉個例子,下面的代碼將會在單詞被轉換為大寫的結果傳遞給groupBy之前打印他們:
?
mystream.flatMap(new Split()).map(new UpperCase())
?.peek(new Consumer() {
??????@Override public void accept(TridentTuple input) {//這個函數中,你只能獲得tuple,然后用這個tuple的數據做一些事情,
//比如打印出來看一看,發送個電子郵件什么的.但是你不可能對流產生任何的影響
?????????System.out.println(input.getString(0)); } })
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
?
?
?
min 和 minBy?:(這兩個函數比較新,在比較舊的storm版本中沒有這兩個函數)
?
?min和minBy可以返回一個trident流中一個分區中一批tuple中的最小值;
?
?假如,一個trident流中包含字段["device-id","count"]并且以分區的方式發送流;
?
?Partition 0: ??????
?[123, 2]
?[113, 54]
?[23, ?28]
?[237, 37]
?[12, ?23]
?[62, ?17]
?[98, ?42]
?
?Partition 1:
?[64, ?18]
?[72, ?54]
?[2, ??28]
?[742, 71]
?[98, ?45]
?[62, ?12]
?[19, ?174]
?
?Partition 2:
?[27, ?94]
?[82, ?23]
?[9, ??86]
?[74, ?37]
?[51, ?49]
?[37, ?98]
?
?
?當binBy操作像下面這樣應用與上面的流中的tuple上時,結果是在每個分區上發送count是最小值的tuple。
?
mystream.minBy(new Fields("count"))
?
?上面代碼在3個分區中運行的結果是:
?
?Partition 0:
?[123, 2]
?
?Partition 1:
?[62, ?12]
?
?Partition 2:
?[82, ?23]
?
?你可以在public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)和public Stream min(Comparator<TridentTuple> comparator)方法中查看其他min和minBy操作;
?下面的例子演示了這些API是如何使用不同的比較器來找出一批tuple中的最小值的:
?
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
?
TridentTopology topology = new TridentTopology();
????????
Stream vehiclesStream = topology.newStream("spout1", spout).each(allFields,new Debug("##### vehicles"));
Stream slowVehiclesStream =vehiclesStream .min(new SpeedComparator()) // Comparator w.r.t speed on received tuple..each(vehicleField, new Debug("#### slowest vehicle"));
vehiclesStream.minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple..each(vehicleField, new Debug("#### least efficient vehicle"));
?
//這兩個類的地址在:https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
????????@Override
????????public int compare(TridentTuple tuple1, TridentTuple tuple2) {
????????????Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
????????????Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
????????????return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
????????}
???????}
static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {
@Override
????????public int compare(TridentTuple tuple1, TridentTuple tuple2) {
????????????Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
????????????Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
????????????return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
????????}
????}
?
?
?
max和maxBy:(這兩個函數比較新,在比較舊的storm版本中沒有這兩個函數)
?
?max 和 maxBy 可以返回一個trident流中一個分區中一批tuple中的最大值;
?
?假如,一個trident流中包含字段["device-id","count"]并且以分區的方式發送流;
?
Partition 0: ??????
[123, 2]
[113, 54]
[23, ?28]
[237, 37]
[12, ?23]
[62, ?17]
[98, ?42]
?
Partition 1:
[64, ?18]
[72, ?54]
[2, ??28]
[742, 71]
[98, ?45]
[62, ?12]
[19, ?174]
?
Partition 2:
[27, ?94]
[82, ?23]
[9, ??86]
[74, ?37]
[51, ?49]
[37, ?98]
?當maxBy操作像下面這樣應用與上面的流中的tuple上時,結果是在每個分區上發送count是最大值的tuple。
?
mystream.maxBy(new Fields("count"))
?
??上面代碼在3個分區中運行的結果是:
??
??Partition 0:
??[113, 54]
?
??Partition 1:
??[19, ?174]
?
??Partition 2:
??[37, ?98]
??
??
??你可以在public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)和public Stream max(Comparator<TridentTuple> comparator)方法中查看其他max和maxBy操作;
??下面的例子演示了這些API是如何使用不同的比較器來找出一批tuple中的最大值的:
??
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
?
TridentTopology topology = new TridentTopology();
?
Stream vehiclesStream = topology.newStream("spout1", spout). each(allFields, new Debug("##### vehicles"));
?
????????vehiclesStream
????????????????.max(new SpeedComparator()) // Comparator w.r.t speed on received tuple.
????????????????.each(vehicleField, new Debug("#### fastest vehicle"))
????????????????.project(driverField)
????????????????.each(driverField, new Debug("##### fastest driver"));
?
????????vehiclesStream
????????????????.maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.
????????????????.each(vehicleField, new Debug("#### most efficient vehicle"));
?
########兩個比較器和min、minBy中用到的比較器是一樣的
?
?
?
Windowing:
?
Trident可以處理在同一個window中的一批一批的tuple,并且將匯總結果發送到下一個操作。這里有兩種類型的window,分別是Tumblingwindow和SlidingWindow;兩者都支持基于處理時間的或者是基于tuple的個數
?的Window劃分。
?
?Tumbling window:
?
?tuple被基于處理時間或者tuple的count值,分配在一個單獨的Window中;任何的tuple都只可能屬于一個Window。
?
????/**
?????* 返回一個包含tuple個數為windowCount的tummbling window中每一個tuple的匯總結果所組成的流
?????*/
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);
?
????/**
?????* 返回一個時間跨度為windowDuration的tummbling window中每一個tuple的匯總結果所組成的流
?????*/
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);
?
?
?Sliding window:
?
?Tuple被分組到各個window中,并且window每隔一定的時間間隔進行一次滑動。一個tuple可以屬于一個或者多個window。
?
????/**
?????* 數為windowCount的sliding window中每一個tuple的匯總結果所組成的流,并將sliding window向后滑動slideCount
?????*/
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
?
????/**
?????* 返回一個時間跨度為window向后滑動windowDuration的sliding window中每一個tuple的匯總結果所組成的流,并將sliding window向后滑動windowDuration
?????*/
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
?
?
Common windowing API:
?下面是接受任何被支持的windowing configuration的公共windowing API:
?
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields)
?
windowConfig 可以是下面這幾種類型:
1.SlidingCountWindow.of(int windowCount, int slidingCount)
2.SlidingDurationWindow.of(BaseWindowedBolt.Duration?windowDuration,BaseWindowedBolt.Duration slidingDuration)
3.TumblingCountWindow.of(int windowLength)
4.TumblingCountWindow.of(int windowLength)
?
Trident windowing APIS 需要WindowsStoreFactory 來保存接收到的tuple和匯總值;現在已經提供的一個基礎的工廠是基于hbase的HBaseWindowsStoreFactory;它可以被擴展,用來支持不同場景的應用。
HBaseWindowStoreFactory 的例子如下:
?
// window-state table should already be created with cf:tuples column(要在hbase中提前建立好一個表叫window-state,并且已經在cf列族中添加了tuples列)
HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
????
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
????????????new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
????????????new Values("how many apples can you eat"), new Values("to be or not to be the person"));
????spout.setCycle(true);
?
TridentTopology topology = new TridentTopology();
?
Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),new Split(), new Fields("word"))
?????????????????????????.window(TumblingCountWindow.of(1000), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
????????????.peek(new Consumer() {
????????????????@Override
????????????????public void accept(TridentTuple input) {
????????????????????LOG.info("Received tuple: [{}]", input);
????????????????}
????????????});
?
????StormTopology stormTopology = ?topology.build();
?
partitionAggregate:
?
?partitionAggregate 在每一個批次的tuple的每一個分區上運行一個函數,和function(第一個介紹的那個)不同,partitionAggregate 處理后發送的tuple覆蓋了他所接收到的tuple;來看看這個例子:
?
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
?
?假設輸入的流中的tuple包含字段 ["a", "b"],并且有一下這樣幾個分區的tuple流入了sum函數:
?
Partition 0:
["a", 1]
["b", 2]
?
Partition 1:
["a", 3]
["c", 8]
?
Partition 2:
["e", 1]
["d", 9]
["d", 10]
?
?
?然后輸出的流中只包含一個字段'sum',tuple的內容如下:
?
Partition 0:
[3]
?
Partition 1:
[11]
?
Partition 2:
[20]
?
?這里有三種不同的接口用來定義聚合器:CombinerAggregator, ReducerAggregator, 和 Aggregator.
?
?CombinerAggregator接口定義的內容如下:
?
public interface CombinerAggregator<T> extends Serializable {
????T init(TridentTuple tuple);
????T combine(T val1, T val2);
????T zero();
}
?
?一個CombinerAggregator只能返回一個tuple并且該tuple只有一個字段。CombinerAggregator在每一個輸入的tuple上都會運行init方法來初始化值,然后使用combine方法來combine所有的值,直到只剩下一個
?值為止。如果分區內沒有任何tuple,CombinerAggregator就發送zero方法產生的值。例如,這是Count的實現:
?
public class Count implements CombinerAggregator<Long> {
????public Long init(TridentTuple tuple) {
????????return 1L;
????}
?
????public Long combine(Long val1, Long val2) {
????????return val1 + val2;
????}
?
????public Long zero() {
????????return 0L;
????}
}
?
?
?當在aggregate方法中而不是在partitionAggregate方法中使用CombinerAggregators 的時候,你就能感受到它的好處了。在aggregate方法中,Trident會通過在網絡之間傳遞tuple之前進行局部分區聚合的
方式來優化計算。
?
?
?ReducerAggregator接口的定義如下:
?
public interface ReducerAggregator<T> extends Serializable {
????T init();
????T reduce(T curr, TridentTuple tuple);
}
?ReducerAggregator在初始化的時候設置一個初始值,然后迭代每一個輸入的tuple的value來產生一個只有一個值的單一tuple來作為輸出。例如,下面是使用ReducerAggregator實現的Count函數:
?
?
public class Count implements ReducerAggregator<Long> {
????public Long init() {
????????return 0L;
????}
?
????public Long reduce(Long curr, TridentTuple tuple) {
????????return curr + 1;
????}
}
?
?
ReducerAggregator也可以被使用在persistentAggregate方法中,稍后你將會看到。
?
?最最通用的聚合接口就是Aggregator了,接口定義如下:
?
public interface Aggregator<T> extends Operation {
????T init(Object batchId, TridentCollector collector);
????void aggregate(T state, TridentTuple tuple, TridentCollector collector);
????void complete(T state, TridentCollector collector);
}
?
Aggregator們可以發送帶有任意數量字段的任意數量的tuple。在其方法執行的任何地方都可以發送tuple,Aggregator們按照如下的方式來執行:
?
1.初始化方法在處理一個batch之前被調用,返回結果是一個用來表示聚合狀態的對象,并且會被傳遞給aggregate和complete方法中。
2.aggregate 方法在每一個batch分區的tuple上運行,這個方法可以更新初始化的那個狀態對象,并可以選擇性地發送一些消息。
3.complete 方法在batch分區上的所有tuple都被Aggregator處理后調用。
?
下面的例子演示如何使用Aggregator來實現一個Count:
?
public class CountAgg extends BaseAggregator<CountState> {
????static class CountState {
????????long count = 0;
????}
?
????public CountState init(Object batchId, TridentCollector collector) {
????????return new CountState();
????}
?
????public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
????????state.count+=1;
????}
?
????public void complete(CountState state, TridentCollector collector) {
????????collector.emit(new Values(state.count));
????}
}
?
有時候你想同時執行很多個aggregator,這種方式被稱為鏈式調用,可以像下面這樣使用:
mystream.chainedAgg()
????????.partitionAggregate(new Count(), new Fields("count"))
????????.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
????????.chainEnd()
?
?上面的代碼會在每一個分區上運行Count和Sum聚合器,輸出將會只有一個tuple,并包含 ["count", "sum"]字段。
?
?
?
stateQuery 和 partitionPersist:
?
?stateQuery 和 partitionPersist 分別可以查詢和更新作為數據資源的state;相關介紹在Trident state doc中。
?
?
projection:
?projection方法作用在流上后,可以使流中只包含方法中指定的字段;假如你有一個包含字段["a", "b", "c", "d"]的流,然后你運行下面的方法:
?
mystream.project(new Fields("b", "d"))
?
?那么輸出的流中就會只包含字段 ["b", "d"]。
?
?
Repartitioning operations:
?
?Repartitioning operations(重分區操作)運行一個可以改變tuple在各個任務之間是如何分區的函數,該函數的運行結果可能會改變分區的數目;例如,并行處理數大于重新分區后的分區數的時候。
重分區操作需要進行網絡傳輸,下面是提供的重分區函數:
?
?1.shuffle:隨機分配tuple到所有的目標分區中;
?2.broadcast:每一個tuple都會被重復的發送到每一個目標分區中;這在DRPC操作用很有用,例如:你需要在每一個分區的data上進行stateQuery的時候
?3.partitionBy:該函數接收一批字段,然后根據這批字段進行分區。這批字段會被進行哈希運算然后根據分區個數取模,然后根據運算結果進行分區。該函數保證相同一批字段的tuple一定會去到同一個分區
中。
?4.global:所有的tuple都被發送到同一個分區中。流中所有的batch都會選擇同一個分區。
?5.batchGlobal:在batch中的所有tuple都會進入同一個分區,但是不同的batch中的tuple可能會進入到不同的分區中。
?6.partition:該函數接收一個本地化的分區方法,本地化的分區方法需要實現org.apache.storm.grouping.CustomStreamGrouping。
?
?
Aggregation operations(集合操作):
?
?Trident有aggregate 和persistentAggregate 方法來提供在一個流上進行聚合操作;aggregate 獨立地運行在流中的每一個batch上,persistentAggregate 會運行在流中的所有batch上,并且會把結果
保存在state中。
?
?運行aggregate方法會在流上進行全局的聚合。當你使用ReducerAggregator 和ReducerAggregator 的時候,首先流會被重新分組到一個單獨的分區中,然后分區函數在這個單獨的分區中運行;然而當你
使用CombinerAggregator的時候,Trident首先會在每一個分區上進行聚合,然后把每個分區的聚合結果重新分區到一個獨立的分區中,然后在完成網絡傳輸后完成全局聚合操作。CombinerAggregator比較
高效,你應該盡量的使用它。
?
?這里有一個例子展示如何使用aggregate 來獲得某個batch中的全局count:
?
mystream.aggregate(new Count(), new Fields("count"))
?
?像partitionAggregate一樣,aggregate中的聚合器可以以鏈式的方式進行調用;然而,如果你把一個CombinerAggregator 和一個不是CombinerAggregator 的聚合器鏈在一起后,storm就無法進行在每個分區
中預先進行聚合操作的優化了。
?
?你可以在Trident state doc中查看persistentAggregate的使用方式。
?
?
Operations on grouped streams(在分組流上的操作)
?
?groupBy 操作根據特定的字段運行一個partitionBy 操作來對流進行重新分區,然后在每一個分區中,把特定字段相同的tuple放到一個組中。下面是一個示例圖:
?
?
?
?
?如果你在一個分組后的流上運行aggregators ,那么聚合操作會在每一個組中運行,而不是在每個batch中運行。persistentAggregate 也可以運行在一個分組后的流上,在這種情況下
聚合后的結果會被保存在一個 MapState中,該 MapState使用用來分組的字段作為key。在Trident state doc中你可以找到更多答案。
?
?和普通的流一樣,運行在分組后的流上的aggregators 也可以進行鏈式調用。
?
?
Merges and joins:
?
?
?API的最后一部分就是把不同的流結合在一起,最最簡單的結合流的方式就是把幾個不同的流合并到同一個流中。你可以通過merge 方法(像下面這樣)來達到目的:
?
topology.merge(stream1, stream2, stream3);
?
?Trident會用第一個流的字段來重新命名其他合并的流的字段,在作為新的輸出流的字段。
?
?另一種合結合流的方式就是join操作,現在來看一個標準的join操作,就像在SQL中的join操作一樣,join要求輸入是有限的,所以對于無限地不停地發送的流是不起作用的。在Trident
中的join操作僅僅作用于每一個有spout發出的很小的batch中;
?
?下面的例子在包含字段["key", "val1", "val2"] 的流和包含字段["x", "val1"]的另一個流上進行join操作:
?
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
?
?
?上面的代碼中,把stream1和stream2通過key和xjoin在一起,Trident要求輸出流中所有的輸出字段都要起名字,因為在輸入流中可能會用重復的字段名稱。由join操作發出的tuple
會包含如下內容:
?
1.首先是鏈接字段的列表。在這里key等同以stream1中的key也等同于stream2中的x;
2.然后就是所有流中沒有進行join的字段,這些字段按照傳遞進來的順序排序;在這個例子中,a=stream1.val1,b=stream1.val2,c=stream2.val1.
?當來自不同的spout的流和并的時候,這些spout會在發送batch上進行同步。也就是說一個要處理的batch會包含所有的參與的spout所發送的tuple。
?你也許會好奇,該如何實現一個類似"windowed join"的操作,也就是說,來自一方的tuple和來自另一方的最近一小時的tuple進行join操作。
?要實現這樣的功能,你需要利用partitionPersist 和stateQuery,最近一小時tuple會被保存并且循環迭代在一個state中,以join操作的field作為key。然后stateQuery 將會通過join的字段查詢state中的數據來進行join操作。
總結
以上是生活随笔為你收集整理的Trident API 概览的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 蓝桥杯第六届省赛JAVA真题----打印
- 下一篇: 提权真的很难吗?