Trident State译文
Trident State?譯文
Trident針對狀態化的數據源的讀寫進行了一流的分裝。State可以包含在拓撲中-例如,保存在內存中,有HDFS提供備份-也可以保存在一個外部的數據庫中,像Memcached和Cassandra。針對以上的所有情況,Trident的API都是一樣的。
?
為了保證state在更新的過程中遇到重試或者失敗處理時任然能夠具有冪等性,storm采取了必要的容錯。也就是說,storm能夠做到每一條消息僅且僅被處理一次。
?
在進行state更新操作的時候,可以選擇不同等級的容錯方式;在看這些容錯方式之前,讓我們來用一個例子說明如何保證僅且僅被處理一次的語意。假設你正在某個流中進行累加的聚合操作,并且準備把聚合的結果保存在數據庫中。 現在你在數據庫中保存了一個值來表示累加的結果,每處理一個tuple你就對數據庫中的值進行一次累加操作。
?
當失敗處理發生的時候,tuples就會被重放。這就給state的更新操作(還有任何會帶來副作用的操作)帶來了問題--你將無法確定你是否已經基于這個被重發的tuple對state成功地進行了更新操作。也許你還從來沒有處理過這個tuple,在這種情況下你就需要對數據庫中的值進行一次累加操作。也許你已經成功處理過這個tuple并且對數據庫中的值進行過了一次累加操作,但是這個tuple在你更新state之后的某個環節出錯了;在這種情況下,你在接收到這個tuple的時候就不應該對數據庫中的值進行累加操作了。也可能這個tuple曾經出現過,但是在對數據庫中的值進行累加的時候出錯了,在這種情況下你需要對數據庫中的值進行累加操作。
?
僅僅在數據庫中保存累加的值,你永遠無法確定這個tuple是否已經被處理過了。所以你需要更多的信息來幫助你做出正確的決定。Trident提供了一下的語義來幫助用戶獲得僅且僅被處理一次的語義。
?
1.所有的tuple都是一小批一小批的發送的(以batch的方式發送)。
2.每一個批量的tuple都會被賦予一個唯一的"transaction id" (txid);加入該批tuple被重播,那么該批tuple仍然保持相同的txid。
3.State的更新在各個批次的tuple之間是有序的,也就是說,只有第2批成功更新以后,第3批才會執行對state的更新操作。
?
?
有了這些保障,你自己的state就能夠檢測到某一批tuple是否被處理過,并選擇正確的方式來更新state。你到底要采取什么樣的方式來更新state依賴于輸入的spout也就是每一個批量的tuple所提供的一致性語義。Storm提供三種容錯級別的soput:"non-transactional"(非事務型), "transactional"(事務型), 和 "opaque transactional"(透明事務型)。同樣的storm也提供了三種容錯級別的state:"non-transactional"(非事務型), "transactional"(事務型), 和 "opaque transactional"(透明事務型)。讓我們來看看每一種事務類型的spout,以及通過每種spout你所能獲得的容錯方式。
?
?
?
Transactional spouts
記住,Trident總是一小批一小批的處理tuple,并且每一個批次有一個唯一的事務ID。Spout的特性有其鎖提供的保障措施決定;事務型的spout具有一下特性:
?
1.?一個txid所對應的batch永遠是相同的。同一個txid的重放的batch永遠和之前該txid所對應的batch相同。
2.?不同batch中的tuple之間不會存在交集(一個tuple不是屬于這個batch,就是屬于另一個batch,永遠不能同時屬于兩個以上的batch)。
3.?每一個tuple都一定會在一個batch中被發送(沒有任何一個tuple被遺漏)。
?
?
這是一種很容易理解的spout類型,一個流被劃分成固定的批次,并且永遠不會改變。Storm提供了一個針對kafka的事務型spout。
?
你也許會問:為什么我們不總是使用transactional spout?這很容易理解。一個原因是并不是所有的地方都需要容錯的。舉例來說,TransactionalTridentKafkaSpout 工作的方式是給定一個txid的batch所包含的一個屬于一個topic的來自于所有Kafka partition的tuple序列。一旦這個batch被發出,在任何時候如果這個batch被重新發出時,它必須包含原來所有的tuple以滿足?transactional spout的語義。現在我們假定一個batch被TransactionalTridentKafkaSpout所發出,這個batch沒有被成功處理,并且同時kafka的一個節點也down掉了。你就無法像之前一樣重播一個完全一樣的batch(因為kakfa的節點down掉,該topic的一部分partition可能會無法使用),整個處理會被中斷。
?
這也就是"opaque transactional" spouts(不透明事務spout)存在的原因- 他們對于丟失源節點這種情況是容錯的,仍然能夠幫你達到有且只有一次處理的語義。后面會對這種spout有所介紹。
?
(當然,在Kafka開啟replication功能時,transactional spout也是可以做到容錯的;現在的kafka已經完全支持了,所以,上文中所說的當一個節點掛掉以后TransactionalTridentKafkaSpout無法正常工作的情況也就不存在了,也正是因為這樣,大部分時間都選擇使用了TransactionalTridentKafkaSpout,個人感覺在使用kafka的時候"opaque transactional" spouts確實沒有存在的意義)
?
?
在討論?"opaque transactional" spouts之前,讓我們先來看看你該如何為transactional spout設計一個具有僅且僅處理一次的state。這個state的類型被稱為"transactional state"?,它利用任何txid都永遠對應與相同一個批次的tuple的特性。
?
假設你的拓撲是用來統計單詞個數的,并且你將要把統計結果保存在一個key-value數據庫中。Key肯定就是對應的單詞了,值當然就是統計結果。你已經看到只是存儲一個數量是不足以知道你是否已經處理過一個batch的。所以,你需要將txid和統計結果一起保存在值中。那么,當你需要更新統計結果的時候,你只需要比較一下數據庫中的txid和當前batch的txid是否相同;如果相同,你就直接跳過更新操作--因為有強順序的保障,你可以肯定數據庫中的值已經包含了當前batch。如果不相同,你就修改統計結果。這個邏輯之所以能說的通是因為batch的txid永遠不會改變,并且batch之間有序地對state進行更新操作。
?
?
用一個例子來說明這個邏輯為什么行得通,假如你發送了一個txid=3的batch,該batch中包含一下的tuple:
?
[“man”]
[“man”]
[“dog”]
?
?
假設現在數據庫中保存這如下的key-value數據:
?
man => [count=3,txid=1]
dog => [count=4,txid=3]
apple =>[count=10,txid=2]
?
和man相關聯的txid是1;由于當前的batch的txid是3,那么你就可以肯定這批tuple中man 的值還沒有累加到數據庫中。所以你可以給man的count累加2,并且更新txid為3。然而,dog對應的txid在數據庫中和當前batch中 一樣,所以你可以肯定對于dog來說當前batch中的值已經在數據庫中增加過了。那么就選擇跳過更新。在該batch更新后,數據庫中的數據如下所示:
?
man => [count=5,txid=3]
dog => [count=4,txid=3]
apple =>[count=10,txid=2]
?
接下來我們一起再來看看 opaque transactional spout以及怎樣去為這種spout設計相應的state。
?
?
Opaque transactional spouts
?
opaque transactional spout并不能保證每一個txid永遠對應一個相同的batch,opaque transactional spout擁有如下特性:
?
1.?每一個tuple都只會在一個batch中執行通過。也就是說,一個tuple在某一個batch處理失敗了,該tuple可能在之后的另一個新的batch中處理成功。
?
?
OpaqueTridentKafkaSpout就是一個擁有該特性的spout,該spout允許kafka節點掛掉。每當OpaqueTridentKafkaSpout要發送一個新的batch的時候,它將會從上一個batch所成功發送的tuple的結束點開始發送,這就保證了沒有tuple會被遺漏掉,也保證了一個tuple不會被多個batch成功處理。
?
在使用opaque transactional spouts的時候,再使用和transactional spout相同的處理方式:判斷數據庫中存放的txid和當前txid去做對比已經不好用了。這是因為在state的更新過程之間,batch可能已經變了。
?
你能做的就是在數據庫中保存更多的狀態;除了保存值和txid以外,你還需要保存更新前的值(previous value)。讓我們還是用上面的例子來說明這個邏輯。假定你當前batch中的對應count是“2”, 并且我們需要進行一次狀態更新。而當前數據庫中存儲的信息如下:
?
{
??????value = 4,
??prevValue = 1,
??txid = 2
}
?
假設當前的txid為3,和數據庫中的txid(2)不同。在這種情況下,你把“preValue”設置為“value”,然后將value增加2,并更新txid為3。操作過后的數據庫內容變成了下面的樣子:
?
{
??value = 6,
??prevValue = 4,
??txid = 3
}
?
再假設當前的txid為2,和數據庫中的txid(2)相同。這時你可以確定數據庫中的“value”被之前擁有相同txid的batch更新過,但是之前的batch和現在的batch內容可能不同了。所以你要做的是讓“value”的值等于“preValue”加2,操作過后的數據庫內容變成了下面的樣子:
{
??value = 3,
??prevValue = 1,
??txid = 2
}
?
--------------------------------------------------------------------------------------------------------------------------
注:這里理解起來可能有些晦澀,舉個例子吧。
?
假設一個batch的大小為3,有下面這么多tuple要進行累加:
[dog] [dog]?[man] [man] [man]
?
假設數據庫中現在的信息為:
?
dog =>{value=2,prevValue=1,txid=1}
man =>{value=3,prevValue=1,txid=1}
?
然后發送一個txid為2的batch {[dog] [dog] [man]}
?
然后進行保存操作,
man 成功保存,但是dog保存的時候發生了錯誤,所以數據庫中的信息變成了
?
dog =>{value=2,prevValue=1,txid=1}
man =>{value=4,prevValue=3,txid=2}
?
那么失敗了,就會有batch的重發,恰好這是負責發送第一個 [dog]的kafka節點壞掉了,batch無法獲得第一個[dog]了,那么就只能從第二個dog開始發了,所以發送的batch的txid依然為2,內容為{[dog] [man] [man]}
?
到這里,dog 的兩個txid不同,更新;但是man txid相同了,所以用prevValue+2來更新value;從這里應該可以看出,為什么是這樣做了。
?
更新后的結果如下:
dog =>{value=3,prevValue=2,txid=2}
man =>{value=5,prevValue=3,txid=2}
?
---------------------------------------------------------------------------------------------------------------------------------
因為Trident保證了batch之間的強順序性,因此這種方法是有效的。一旦Trident去處理一個新的batch,它就不會重新回到之前的任何一個batch。并且由于opaque transactional spout確保在各個batch之間是沒有共同成員的,每個tuple只會在一個batch中被成功處理,你可以安全的在之前的值上進心更新。
?
?
?
Non-transactional spouts
?
Non-transactional spout(非事務spout)不提供任何的保障。所以在tuple處理失敗后不進行重發的情況下,一個tuple可能是最多被處理一次的。同時他也可能會是至少處理一次的,如果tuple在不同的batch中被多次成功處理的時候。無論怎樣,這種spout是不可能實現有且只有一次被成功處理的語義的。
?
?
Summary of spout and state types
?
這個圖展示了哪些spout和state的組合能夠實現有且只有一次被成功處理的語義:
?
?
?
?
Opaque transactional state有著最為強大的容錯性。但是這是以存儲更多的信息作為代價的。Transactional states 需要存儲較少的狀態信息,但是僅能和 transactional spouts協同工作. 最后, non-transactional state所需要存儲的信息最少,但是卻不能實現有且只有一次被成功處理的語義。
State和Spout類型的選擇其實是一種在容錯性和存儲消耗之間的權衡,你的應用的需要會決定那種組合更適合你。
?
?
?
?
?
State APIs
在前面你已經看到了一些用來實現僅且僅執行一次語義的復雜方法,有一個關于Trident的好消息就是,Trident把所有容錯的邏輯都在state內部實現了。那么作為一個用戶,你就從比較txid,保存多余的值到數據庫中,或者任何像它們兩個那樣的苦差事中脫離了出來。你只需要像下面這樣寫代碼就可以了:
?
?
TridentTopology topology = new TridentTopology(); ???????
TridentState wordCounts =
??????topology.newStream("spout1", spout)
????????.each(new Fields("sentence"), new Split(), new Fields("word"))
????????.groupBy(new Fields("word"))
????????.persistentAggregate(MemcachedState.opaque(serverLocations),new Count(),new Fields("count")) //重點就是這句了,這里其實使用了mapState,用來做批量的聚合結果的保//存 ??????????????
????????.parallelismHint(6);
?
?
所有管理opaque transactional state的必要邏輯都在MemcachedState.opaque方法內部實現了。另外,更新操作是批量進行的,以減少對數據庫的壓力。
?
基礎的state接口只有兩個方法:
?
public interface State {
????void beginCommit(Long txid); // can be null for things like partitionPersist occurring off //a DRPC stream(放生在DRPC流中的partitionPersist操作中,txid可能為空)
????void commit(Long txid);
}
?
?
在這個接口所提供的兩個方法中,你可以知道什么時候開了對state的更新操作,什么時候完成了對state的更新操作,在每個方法中你都能夠獲得txid。Trident對你的state是如何工作的沒有做出任何的假設(也就是說,你要自己寫更新和查詢方法)。
?
加入你自己有一套數據庫,并且希望通過Trident來在其中更新、查詢用戶的位置信息。那么你自己實現的state中就要自己去寫更新和查詢的方法了:
?
public class LocationDB implements State {
????public void beginCommit(Long txid) { ???
????}
?
????public void commit(Long txid) { ???
????}
?
????public void setLocation(long userId, String location) {
??????// code to access database and set location
?//自己寫的向數據庫中保存用戶位置信息的方法,這個方法會在你自己實現的
//BaseStateUpdater中調用(呵呵,自己實現然后自己調用)
????}
?
????public String getLocation(long userId) {
??????// code to get location from database
??????????//自己寫的從數據庫中查找用戶位置信息的方法,這個方法會在你自己實現的
//BaseQueryFunction中調用(也是自己實現自己調用)
????}}
?
?
然后你就要實現一個Trdient定義的StateFactory?,使你能夠在Trient的task中創建你自己的state。下面是為LocationDB?實現的StateFactory:
?
public class LocationDBFactory implements StateFactory {
???public State makeState(Map conf, int partitionIndex, int numPartitions) {
??????return new LocationDB();
???} }
?
?
Trident提供了QueryFunction?用來對state進行查詢,提供了StateUpdater?用來對state進行更新操作。讓我們來寫一個QueryLocation的操作,該操作從LocationDB中查詢用戶的位置信息。首先來看看那你該如何在拓撲中使用QueryLocation操作。假設你的拓撲接收一個用戶的id的輸入流。
?
TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
.stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
?
//.stateQuey就是查詢了,第一個參數指定了要查詢的state(這個state使用LocationDBFactory來創建的,這就是為什么要為你的state建立一個stateFactory了,因為你無法在Trident的API中直接new你的state,你只能new stateFactory,然后Trident會調用其中的makeState方法來創建state);第二個參數就是輸入的流的字段,這里把userId輸入到操作中;第三個參數就是你自己實現的QueryFunction?用來執行查詢操作;第四個參數是輸出字段。
?
?
好了,現在可以來看看如何來實現一個自己的QueryFunction?了。
?
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
????
???public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
//查詢的方法,下面的代碼都是要自己寫的
????????List<String> ret = new ArrayList();
????????for(TridentTuple input: inputs) {
????????????ret.add(state.getLocation(input.getLong(0)));//每次查詢一個,效率不高
????????}
????????return ret;//這個ret的類型是你自己定義好的泛型(在類的開始處)
//返回的ret會循環調用下面的execute方法來發送每一個location
????}
?
????public void execute(TridentTuple tuple, String location, TridentCollector collector) {
//發送輸出數據的方法,輸出字段的定義在上面已經完成了,說白了還是一個bolt節點 ps:在新的版本中String location已經變成了一個List了,也就是ret一次都傳進來了,在execute方法中進行遍歷
????????collector.emit(new Values(location));
????} ???
}
?
?
QueryFunction的執行分為兩步:第一步,Trident會收集一個batch的輸入數據然后把他們傳遞給batchRetrieve。在這個例子中,batchRetrieve會接收到很多的用戶ID。BatchRetrieve方法需要返回和接收到的batch中的tuple的數量相同的一個list數據。List中的第一個元素對應第一個tuple查詢的結果,第二個元素對應第二個tuple查詢的結果,以此類推。
?
也許你會看出上面的代碼中沒有利用Trident所提供的batch的優勢,因為它每次只從LocationDB?中查詢一條數據。所以可以把LocationDB?向下面這樣優化一下:
public class LocationDB implements State {
????public void beginCommit(Long txid) { ???
????}
?
????public void commit(Long txid) { ???
????}
?
????public void setLocationsBulk(List<Long> userIds, List<String> locations) {
??????// set locations in bulk批量進行更新
????}
?
????public List<String> bulkGetLocations(List<Long> userIds) {
??????// get locations in bulk批量進行查詢
}}
有了上面優化后的LocationDB?,那么QueryLocation?就也需要修改一下了:
?
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
????public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
????????List<Long> userIds = new ArrayList<Long>();
????????for(TridentTuple input: inputs) {
????????????userIds.add(input.getLong(0));
????????}
????????return state.bulkGetLocations(userIds);//一次查一批...
????}
?
????public void execute(TridentTuple tuple, String location, TridentCollector collector) {
????????collector.emit(new Values(location));
????} ???}
?
?
將QueryLocation?修改為上面的樣子以后,就可以大大減少對數據庫的請求了。
?
?
查詢說完了,下面就是如何來更新state了。你要利用StateUpdater?接口來實現自己的目的。下面是例子:
?
?
public class LocationUpdater extends BaseStateUpdater<LocationDB> {
????public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {//很簡單
????????List<Long> ids = new ArrayList<Long>();
????????List<String> locations = new ArrayList<String>();
????????for(TridentTuple t: tuples) {
????????????ids.add(t.getLong(0));
????????????locations.add(t.getString(1));
????????}
????????state.setLocationsBulk(ids, locations);
????}}
?
有了上面的代碼,你就可以像下面這樣在Trident中來更新state了
?
TridentTopology topology = new TridentTopology();
TridentState locations =
topology.newStream("locations", locationsSpout)
.partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())
第一個參數就是LocationDB對應的stateFactory;第二個參數是輸入的流的字段;第三個就是上面寫的更新操作了。
?
?
partitionPersist 操作會更新一個State。其內部是將?State和一批更新的tuple交給StateUpdater,由StateUpdater完成相應的更新操作。
在這段代碼中,只是簡單的從輸入的tuple中提取處userid和對應的location,并一起更新到State中。
partitionPersist 會返回一個TridentState對象來表示被這個Trident topoloy更新過的locationDB。 然后你就可以使用這個state在topology的任何地方進行查詢操作了。
?
同時,你也可以看到我們傳了一個TridentCollector給StateUpdaters。 emit到這個collector的tuple就會去往一個新的stream。在這個例子中,我們并沒有去往一個新的stream的需要,但是如果你在做一些事情,比如說更新數據庫中的某個count,你可以emit更新的count到這個新的stream。然后你可以通過調用TridentState#newValuesStream方法來訪問這個新的stream來進行其他的處理。
?
persistentAggregate
?
persistentAggregate是另一個用來更新state的方法,?你在之前的word count例子中應該已經見過了,如下:
?
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
??????topology.newStream("spout1", spout)
????????.each(new Fields("sentence"), new Split(), new Fields("word"))
????????.groupBy(new Fields("word"))
????????.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
?
persistentAggregate是在partitionPersist之上的另外一層抽象。它知道怎么去使用一個Trident 聚合器來更新State。在這個例子當中,因為這是一個group好的stream,Trident會期待你提供的state是實現了MapState接口的。用來進行group的字段會以key的形式存在于State當中,聚合后的結果會以value的形式存儲在State當中。MapState接口看上去如下所示:
?
public interface ?MapState<T> extends State {
????List<T> multiGet(List<List<Object>> keys);
????List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
????void multiPut(List<List<Object>> keys, List<T> vals);}
?
?
?
?
當你在一個未經過group的stream上面進行聚合的話,Trident會期待你的state實現Snapshottable接口:
?
public interface ?Snapshottable<T> extends State {
????T get();
????T update(ValueUpdater updater);
????void set(T o);
}
?
MemoryMapState?和?MemcachedState?都實現了上面的2個接口。(自己寫的mapState也會實現上面的兩個接口)
?
Implementing Map States
在Trident中實現MapState是非常簡單的,它幾乎幫你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 類實現了所有相關的邏輯,包括容錯的邏輯。你只需要將一個IBackingMap 的實現提供給這些類就可以了。IBackingMap接口看上去如下所示:
?
public interface IBackingMap<T> {
????List<T> multiGet(List<List<Object>> keys);
????void multiPut(List<List<Object>> keys, List<T> vals);
}
?
OpaqueMap's會用OpaqueValue的value來調用multiPut方法,TransactionalMap's會提供TransactionalValue中的value,而NonTransactionalMaps只是簡單的把從Topology獲取的object傳遞給multiPut。
?
Trident還提供了一種CachedMap類來進行自動的LRU 緩存。
?
另外,Trident 提供了?SnapshottableMap?類將一個MapState 轉換成一個 Snapshottable 對象.(用來對沒有進行group by 的流進行全局匯總)
?
大家可以看看?MemcachedState的實現,從而學習一下怎樣將這些工具組合在一起形成一個高性能的MapState實現。MemcachedState是允許大家選擇使用opaque transactional, transactional, 還是 non-transactional 語義的。
?
?
Ps:翻譯的內容就這么多了,其實網上翻譯的很多,但是看了以后并不能給很多新手帶來一些幫助(原文寫的太高深了)。努力翻譯了一下,但是還是覺得有很多沒有說清楚,下面會抽時間把storm官方提供的 hbase相關的trident state的源代碼解讀一下,我覺得只有解讀一下這個源代碼,才會讓人更加清晰 state當地怎么用,以及如何寫自己的state。
?
總結
以上是生活随笔為你收集整理的Trident State译文的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Chrome读音
- 下一篇: Spring中AOP的使用