【Storm】Spout的storm-starter及Grouping策略、并发度讲解、网站浏览量和用户数统计
maven先安裝好。
以下講storm-starter的使用。
1、從github下載官方的storm-starter例子包,是maven工程,
? ? ? 地址?https://github.com/nathanmarz/storm-starter
2、把文件解壓復制到workspace目錄下,用cmd命令行,在該文件目錄下運行mvn eclipse:eclipse,生成eclipse所用的文件,使得maven工程變成eclipse可用的工程。
3、導入到eclipse。新建源碼文件夾lesson。把上一節storm入門案例工程的lesson包,整個復制到storm-starter-master的lesson源碼文件夾下。
4、選中項目右鍵,Run as maven package,用maven打包。在target文件夾下有2個jar包。
storm-starter-0.0.1-SNAPSHOT.jar是不含依賴,只含有工程代碼,較小。
storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar 包含依賴,較大,通常需要由依賴的包。
5、官方提供的例子
?
分組策略(Stream Grouping)
stream grouping 用來定義一個 stream 應該如何分配給 Bolts 上面的多個Tasks,也就是分配給Bolts上面的多個Executors(多線程,并發度)。
1、Storm 里面有 6種類型的 stream grouping:
注:1)、2)、5)最常用,其他基本不用。
單線程等于All Grouping
1)Shuffle Grouping:隨機分組,隨機派發 stream 里面的 tuple,保證每個 bolt 接收到的 tuple 數目大致相同。通過輪詢實現,保證平均分配。
2)Fields Grouping:按字段分組,比如按 userid 來分組,具有同樣 userid 的 tuple 會被分到相同的 bolts,而不同的 userid 則會被分配到不同的 bolts。
3)All Grouping:廣播發送,對于每一個 tuple,所有的 bolts 都會收到。
4)Global Grouping:全局分組,這個 tuple 被分配到 storm 中的一個 bolt 的其中一個 task。再具體一點就是分配給 id 值最低的那個 task。
5)None Grouping:不分組,這個分組的意思是說 stream 不關心到底誰會收到它的 tuple。目前這種分組和 Shuffle Grouping 是一樣的效果,但是多線程下不平均分配。
6)Direct Grouping:直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發送者指定由消息接收者的哪個 task 處理這個消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息 tuple 必須使用 emitDirect 方法來發射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的 task 的 id (OutputCollector.emit 方法也會返回 task 的 id)。
7)Local or Shuffle Grouping:如果目標 bolt 有一個或者多個 task 在同一個工作進程中,tuple 將會被隨機發送給這些 tasks。否則,和普通的 Shuffle Grouping 行為一致。
2、測試
1)Shuffle Grouping 輪循的方式。
在lesson的Main.java改代碼。把bolt并發數改為2,也就是bolt會有2個線程。
// bolt的方法有3個參數,// bolt的id(String類型),實例,并發數。大量數據場景并行數設置大一些// bolt的數據來源于spout,名稱要和上文setSpout的id一致,否則不能獲取到數據// shuffle發射規則,后續詳講topoBuilder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");lesson的MyBolt.java改代碼。打印的內容增加當前的線程名。
if (null != valStr) {num++;System.out.println(Thread.currentThread().getName() + ", lines : " + num + ", sessionId: " + valStr.split("\t")[1]);}控制臺打印的內容,顯示有2個bolt線程,每個線程接收到的個數相同。每個線程打印出來的總行數相加,才等于track.log文件的總行數。
Thread-22-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-50-bolt, lines : 1, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-22-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-50-bolt, lines : 2, sessionId: 5C3FBA728FD7D264B80769B23 Thread-22-bolt, lines : 3, sessionId: 5D16C1F5191CF9371Y32B58CF Thread-50-bolt, lines : 3, sessionId: 5C16BC4MB91B85661FE22F413 . . . Thread-22-bolt, lines : 23, sessionId: 5GFBAT3D3100A7A7255027A70 Thread-50-bolt, lines : 23, sessionId: 5D16C1EB1C7A751AE03201C3F Thread-50-bolt, lines : 24, sessionId: 5B16C0F7215109AG43528BA2D Thread-22-bolt, lines : 24, sessionId: 5N16C2FE51E5619C2A1244215 Thread-22-bolt, lines : 25, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-50-bolt, lines : 25, sessionId: 5C3FBA728FD7D264B80769B232)spout的并發數為2,bolt并發數為1.
topoBuilder.setSpout("spout", new MySpout(), 2); topoBuilder.setBolt("bolt", new MyBolt(), 1).shuffleGrouping("spout");控制臺打印的內容顯示,只有1個bolt線程,符合預期。但是bolt讀取到的總行數是track.log文件行數的2倍。這是因為spout有2個線程,所以track.log被讀取了2次。說明把文件當做spout數據來源是,shout的線程數只能是1.
Thread-23-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-23-bolt, lines : 2, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-23-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-23-bolt, lines : 4, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-23-bolt, lines : 5, sessionId: 5D16C7A886E2P2AE3EA29FC3E . . . Thread-23-bolt, lines : 96, sessionId: 5N16C2FE51E5619C2A1244215 Thread-23-bolt, lines : 97, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-23-bolt, lines : 98, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-23-bolt, lines : 99, sessionId: 5C3FBA728FD7D264B80769B23 Thread-23-bolt, lines : 100, sessionId: 5C3FBA728FD7D264B80769B233)Non Grouping
spout并發數為1,bolt并發數為2.
topoBuilder.setSpout("spout", new MySpout(), 1); topoBuilder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");控制臺打印內容如下,bolt有2個線程,線程名為Thread-23-bolt的計數器是20,線程名為Thread-50-bolt的計數器是30。說明在non grouping模式下,是不平均分配的。
Thread-23-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-50-bolt, lines : 1, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-23-bolt, lines : 3, sessionId: 5C3FBA728FD7D264B80769B23 Thread-23-bolt, lines : 4, sessionId: 5D16C1F5191CF9371Y32B58CF Thread-50-bolt, lines : 2, sessionId: 5C16BC4MB91B85661FE22F413 . . . Thread-23-bolt, lines : 17, sessionId: 5D16C1EB1C7A751AE03201C3F Thread-23-bolt, lines : 18, sessionId: 5B16C0F7215109AG43528BA2D Thread-50-bolt, lines : 30, sessionId: 5N16C2FE51E5619C2A1244215 Thread-23-bolt, lines : 19, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-23-bolt, lines : 20, sessionId: 5C3FBA728FD7D264B80769B234)Fields Grouping 策略,
? 回顧 fields grouping 策略的作用:
(1)過濾,從源端(spout或上一級bolt)多輸出Fields中選擇某些field;
(2)相同的tuple會分發給同一個Executor或task處理。
典型場景:去重操作,join(企業用得不多,需要用到2個數據源,且2個數據源要同時,不能相差太久)
1個spout,2個bolt。
topoBuilder.setSpout("spout", new MySpout(), 1); // Field grouping有2個參數。第一個是spout名稱,第二個是field名稱 topoBuilder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("log"));效果和Non Grouping差不多。
Thread-50-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-50-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-50-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-50-bolt, lines : 4, sessionId: 5C3FBA728FD7D264B80769B23 Thread-50-bolt, lines : 5, sessionId: 5D16C1F5191CF9371Y32B58CF . . . Thread-50-bolt, lines : 26, sessionId: 5B16C0F7215109AG43528BA2D Thread-21-bolt, lines : 22, sessionId: 5N16C2FE51E5619C2A1244215 Thread-21-bolt, lines : 23, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-50-bolt, lines : 27, sessionId: 5C3FBA728FD7D264B80769B235)All grouping策略模式
1個spout,2個bolt。
topoBuilder.setSpout("spout", new MySpout(), 1); // all grouping 廣播方式 topoBuilder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");spout會把每個數據分發給每一個下級的bolt,每個bolt線程獲取到的行數都是一樣的。開發時廣播方式不常用。
Thread-23-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-49-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-23-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-49-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E . . . Thread-23-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-49-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-23-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23 Thread-49-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B236)Global Grouping 全局分組
1的spout,2個bolt。
topoBuilder.setSpout("spout", new MySpout(), 1); // Global Grouping 全局分組 topoBuilder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout");控制臺打印的內容,有2個線程,名稱分別為Thread-22-bolt 和 Thread-50-bolt。但是只有序號小的有接收到數據,序號大的沒有接收到數據。Global Grouping 全局分組是把數據分配給id值最低的task。
[Thread-22-bolt] INFO backtype.storm.daemon.executor - Prepared bolt bolt:(5) [Thread-50-bolt] INFO backtype.storm.daemon.executor - Prepared bolt bolt:(6) . . . Thread-22-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8 Thread-22-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E Thread-22-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E . . . Thread-22-bolt, lines : 47, sessionId: 5B16C0F7215109AG43528BA2D Thread-22-bolt, lines : 48, sessionId: 5N16C2FE51E5619C2A1244215 Thread-22-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D Thread-22-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23?
思考:讀取文件案例思考
? ? ? 示例中用的是storm讀取文件,把文件作為數據源,在企業中很少見。storm是分布式應用,數據會分發到每一臺supervisor執行,讀本地文件只在一臺機器上。
1)Spout數據源可以是數據庫、文件、MQ(比如:Kafka)。
2)數據源是數據庫:只適合讀取數據庫的配置文件,但不能讀取增量數據。
3)數據源是文件:只適合測試、講課用(因為集群是分布式集群),其他無用。
4)企業產生的 log 文件處理步驟:
??(1)讀出內容寫 入MQ
??(2)Storm 再處理
讀文件案例說明:
1)分布式應用無法讀文件;
2)spout無法并發讀,開并發會重復讀。
?
并發度場景分析
場景分析:
單線程下:加減乘除(其實什么都可以做),和任何類進行操作。
多線程下:可以做局部加減乘除,不適合做全部加減乘除。
多線程下適合:
a、局部加減乘除
b、做處理類Operate,如split
c、持久化,如入DB
?
?
官方案例
1、統計單詞?WordCountTopology,在storm-starter-master工程的目錄如下。
為了便于理解,對官方的案例進行改寫。在源碼文件夾lesson下,創建包WordCount,復制WordCountTopology.java到WordCount包。
2、程序分析:
(1)有1個spout,有2個bolt。
(2)MyRandomSentenceSpout多次發送數據,nextTuple函數的實現是,每次發送的數據相同。
(3)MySplit,接收到spout的數據,數據都是字符串,對字符串進行分割。
(4)WordCount,接收從split發射的數據,都是單個字符,統計每個字符的個數。
3、各個程序代碼和運行結果
(1)主程序WordCountTopology,用于統計單詞個數的bolt程序WordCount
package WordCount;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.ShellBolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.starter.spout.RandomSentenceSpout;import java.util.HashMap; import java.util.Map;/*** This topology demonstrates Storm's stream groupings and multilang capabilities.*/ public class WordCountTopology {public static class SplitSentence extends ShellBolt implements IRichBolt {public SplitSentence() {super("python", "splitsentence.py");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}// 統計每個單詞出現的次數public static class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);Integer count = counts.get(word);if (count == null)count = 0;count++;counts.put(word, count);//打印出當前線程名稱,單詞 和 個數System.out.println(Thread.currentThread().getName() + ", word = " + word + ", count = " + count);collector.emit(new Values(word, count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new MyRandomSentenceSpout(), 1);builder.setBolt("split", new MySplit(" "), 8).shuffleGrouping("spout");builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", conf, builder.createTopology());// Thread.sleep(10000); // // cluster.shutdown();}} }(2)spout程序,MyRandomSentenceSpout。定義一個字符串數組,{ "a b c d", "e f g h", "i j k l"},為了便于觀察,所以每個字符都不重復。每個字符串的字符由空格隔開,每個字符串逐個發送。
package WordCount;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;import java.util.Map; import java.util.Random;public class MyRandomSentenceSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;SpoutOutputCollector _collector;Random _rand;// 單詞字符串,由空格隔開String[] sentences = new String[]{ "a b c d", "e f g h", "i j k l"};@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_rand = new Random();}@Overridepublic void nextTuple() {for (String sentence : sentences) {_collector.emit(new Values(sentence));}// 睡眠10秒鐘Utils.sleep(10 * 1000);}@Overridepublic void ack(Object id) {}@Overridepublic void fail(Object id) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}(3)bolt程序,MySplit,接收字符串,分割成單個單詞。
package WordCount;import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.FailedException; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;/*** IBasic開頭的不需要寫ack方法。會自動調用ack方法* @description bolt,分割單詞. * @author whiteshark* @date 2019年6月30日 下午11:46:02*/ public class MySplit implements IBasicBolt{private String pattern;public MySplit(String pattern) {this.pattern = pattern;}/*** 每個bolt 和 spout 最好序列化,免得開高并發出錯*/private static final long serialVersionUID = 1L;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {try {String sentence = input.getString(0);if (null != sentence){// 字符串由空格隔開,用split分割成單個字符for (String word : sentence.split(pattern)) {collector.emit(new Values(word));}}// IBasic開頭的不需要寫ack方法。執行成功會自動調用ack方法// 如果拋出FailedException異常,失敗了也會通知} catch (FailedException e) {e.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}}(4)運行主程序WordCountTopology,在控制臺有打印bolt線程名稱,單詞,單詞個數。spout運行了2次,所有發送了2次數據。可以看到,由于在bolt程序WordCount使用了fields,每個線程兩次處理的字符都相同。
驗證了上文在Field Grouping策略中提到的,相同的tuple會分發給同一個Executor或task處理。
Thread-16-count, word = e, count = 1 Thread-16-count, word = h, count = 1 Thread-16-count, word = k, count = 1 Thread-16-count, word = b, count = 1 Thread-18-count, word = f, count = 1 Thread-18-count, word = i, count = 1 Thread-18-count, word = l, count = 1 Thread-18-count, word = c, count = 1 Thread-20-count, word = g, count = 1 Thread-20-count, word = j, count = 1 Thread-20-count, word = a, count = 1 Thread-20-count, word = d, count = 1Thread-16-count, word = b, count = 2 Thread-16-count, word = e, count = 2 Thread-16-count, word = h, count = 2 Thread-16-count, word = k, count = 2 Thread-18-count, word = f, count = 2 Thread-18-count, word = c, count = 2 Thread-18-count, word = i, count = 2 Thread-18-count, word = l, count = 2 Thread-20-count, word = a, count = 2 Thread-20-count, word = g, count = 2 Thread-20-count, word = d, count = 2 Thread-20-count, word = j, count = 2?
并發度
在Storm中,一個task可以簡單的理解為在集群某節點上運行的一個spout或者bolt實例。在集群運行運行中,topology主要有四個組成部分:他們從低到高分別是:task(bolt/spout實例)、Executor(線程)、Workers(JVM虛擬機)、Nodes(服務器)
各個部分的含義如下:
(1)Nodes(服務器):是指配置在一個Storm集群中的服務器,會執行topology的一部分運算。一個Storm集群可以包括一個或者多個工作node。
(2)Workers(JVM虛擬機):是指一個node節點服務器上相互獨立運行的JVM進程。每一個node可以配置運行一個或者多個worker。一個topology會分配到一個或者多個worker上運行。
(3)Executor(線程):是指一個worker的JVM進程中運行的Java線程。多個task可以指派給同一個executor來執行。除非是明確指定,Storm默認會給每一個executor分配一個task。
(4)Task(bolt/spout實例):task是spout和bolt的實例,里面的nextTuple()和execute()方法會被executors線程調用執行。
?
并發度:用戶指定一個任務,可以被多個線程執行,并發度的數量等于線程 executor 的數量。
task 就是具體的處理邏輯對象,一個 executor 線程可以執行一個或多個 tasks,但一般默認每個 executor 只執行一個 task,所以我們往往認為 task 就是執行線程,其實不是。
task 代表最大并發度,一個 component 的 task 數是不會改變的,但是一個 componet 的 executer 數目是會發生變化的(storm rebalance 命令),task 數 >= executor 數,executor 數代表實際并發數。
?
結構圖如下:
WordCountTopology 統計單詞的案例,包含的的spout和bolt如下,
(1)spout,類名為SentenceSpout,產生字符串。
(2)bolt,類名為SplitSentence,分割字符串為單詞。
(3)bolt,類名為WordCount,統計單詞。
(4)bolt,類名為ReportBolt,報告單詞統計。
設置不同的線程數和任務數,看并發圖
(1)默認情況下,每個 spout / bolt 的并發度(executor)是1,任務(task)也是1。
builder.setSpout(SENTENCE_SPOUT_ID, spout); // SentenceSpout --> SplitSentenceBolt builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID); // SplitSentenceBolt --> WordCountBolt builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); // WordCountBolt --> ReportBolt builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);并發圖如下。唯一的并發機制出現在線程級,每個任務在同一個JVM的不同線程中運行。如何增加并發度以充分利用硬件能力?讓我們來增加分配給topology 的 worker 和 executer 的數量。
(2)把 SentenceSpout 的并發度設置為2,worker不變。
//這個2指的是有兩個executor,雖然沒有顯示指定task的數量, //1個executor至少有1個task。因為executor為2,默認task也就是2 builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);并發圖如下。SentenceSpout 有2個線程,每個線程有1個任務。
(3)配置worker數量為2,SplitSentence設置為4個task和2個executor。WordCount設置為4個executor。
Config config = new Config(); config.setNumWorkers(2); builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID); // SplitSentenceBolt --> WordCountBolt builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));并發圖如下:
?
?
網站瀏覽量和用戶數統計
PV-UV案例需求分析
網站最常用的2個指標:
PV(page views):count(session_id)
UV(user views):count(distinct session_id)
多線程下,注意線程安全問題。
一、PV統計
方案分析
如下是否可行?
1、定義static long pv,Synchronized控制累計操作。
Synchronized和Lock在單JVM下有效,但在多JVM下無效。
可行的兩個方案:
1、shuffleGrouping下,pv * executor并發數。比較簡單,但只能局限于shuffleGrouping,且會有中間數據。
2、bolt1進行多并發局部匯總,bolt2單線程進行全局匯總。這種方式可行,推薦這種方式。
?線程安全:多線程處理的結果和單線程一致,就是線程安全。否則不安全。
?
案例代碼
準備數據:track.log文件有50行數據。每行有3列,分別為網站,sessionId,時間。列與列由tab隔開。如下圖。
www.taobao.com 5CFBA5BD76BACF436ACA9DCC8 2019-06-29 11:01:20 www.taobao.com 5D16C7A886E2P2AE3EA29FC3E 2019-06-29 08:01:36 www.taobao.com 5D16C7A886E2P2AE3EA29FC3E 2019-06-29 10:51:27(1)程序的啟動類。1個spout,4個bolt1,1個匯總的sumbolt。
package visits;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder;/*** 程序的啟動類* @description * @author whiteshark* @date 2019年7月6日 下午5:13:42*/ public class Main {public static void main(String[] args) {// 1.創建topology, 拓撲對象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 設置spout,bolt// setSpout方法的3個參數分別為,// spout的id(string類型),實例,并發數。大量數據場景并行數設置大一些topoBuilder.setSpout("spout", new MySpout(), 1);// shuffle Grouping 分組topoBuilder.setBolt("bolt", new PVBolt1(), 4).shuffleGrouping("spout");// 匯總topoBuilder.setBolt("sumbolt", new PVSumBolt(), 1).shuffleGrouping("bolt");// 3. 設置works個數Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3個參數分別為:拓撲名稱,stormconfig配置,拓撲實例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}} }(2)spout,從track.log讀取每一行,發送到下一級bolt。
package visits;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;/*** * @description Spout,讀取數據,一行一行的發送到下一級處理* @author whiteshark* @date 2019年6月30日 上午9:28:17*/ public class MySpout implements IRichSpout{private static final long serialVersionUID = 1L;private FileInputStream fis;private InputStreamReader isr;private BufferedReader br;private String str = null;SpoutOutputCollector collector = null;@Overridepublic void nextTuple() {try {while ((str = this.br.readLine()) != null) {/*在這可以對數據進行加工或過濾*/// 發射數據collector.emit(new Values(str));// 為了在控制臺觀察打印出來的數據,這里暫停Thread.sleep(500);}} catch (Exception e) {e.printStackTrace();}}// 在提交作業時,會把storm.yaml的所有項讀取到Map中,在open方法中如果修改map的值,// 會覆蓋原來storm.yaml所定義的值,@Overridepublic void open(Map conf, TopologyContext content, SpoutOutputCollector collector) {this.collector = collector;try {this.fis = new FileInputStream("track.log");this.isr = new InputStreamReader(fis, "UTF-8");this.br = new BufferedReader(isr);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 定義發射數據的格式declarer.declare(new Fields("log"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}// 下一級成功應答ack,打印內容@Overridepublic void ack(Object msgId) {System.out.println("spout ack: " + msgId.toString());}@Overridepublic void activate() {// TODO Auto-generated method stub}// 資源關閉@Overridepublic void close() {try {br.close();isr.close();fis.close();} catch (Exception e) {e.printStackTrace();}}@Overridepublic void deactivate() {// TODO Auto-generated method stub}// 下一級失敗應答ack,打印內容@Overridepublic void fail(Object msgId) {System.out.println("spout fail: " + msgId.toString());} }(3)bolt1,在main中設置有4個線程。接收從spout發射的每一行,分割成3個字段。并統計第二個字段(sessionId)的個數,sessionId的個數和當前線程序號,一起傳遞到sumBolt做最后的匯總。
package visits;import java.util.Map;import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;public class PVBolt1 implements IRichBolt{private static final long serialVersionUID = 1L;OutputCollector collector;// 初始化方法@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;}String logString = null;String sessionId = null;Integer pv = 0;@Overridepublic void execute(Tuple input) {logString = input.getString(0);sessionId = logString.split("\t")[1];if (null != sessionId) {pv++;}// 第一個值是當前線程ID,第二個值是瀏覽次數collector.emit(new Values(Thread.currentThread().getId(), pv));System.out.println(Thread.currentThread().getName() + ", pv = " + pv);}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId", "pv"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;} }(4)PVSumBolt,做最后的匯總。從上一級接收線程序號和sessionId個數,保存和更新到map中。遍歷map,累加所有的個數。得出總的個數。
package visits;import java.util.HashMap; import java.util.Map;import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple;public class PVSumBolt implements IRichBolt{private static final long serialVersionUID = 1L;Map<Long, Integer> counts = new HashMap<Long, Integer>();@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {}@Overridepublic void execute(Tuple input) {Long threadId = input.getLong(0);Integer pv = input.getInteger(1);counts.put(threadId, pv);long wordSum =0;// 獲取總數,遍歷counts的values,進行累加for (Map.Entry<Long, Integer> count : counts.entrySet()) {wordSum += count.getValue();}System.out.println(Thread.currentThread().getName() + ", wordSum = " + wordSum);}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;} }(5)輸出結果:可以看出,有4個bolt1線程,只有1個sumbolt匯總結果。
Thread-26-bolt, pv = 1 Thread-57-sumbolt, wordSum = 1 Thread-53-bolt, pv = 1 Thread-57-sumbolt, wordSum = 2 Thread-23-bolt, pv = 1 Thread-57-sumbolt, wordSum = 3 Thread-55-bolt, pv = 1 Thread-57-sumbolt, wordSum = 4 . . . Thread-53-bolt, pv = 11 Thread-57-sumbolt, wordSum = 41 Thread-23-bolt, pv = 11 Thread-57-sumbolt, wordSum = 42 Thread-55-bolt, pv = 11 Thread-57-sumbolt, wordSum = 43 Thread-26-bolt, pv = 11 Thread-57-sumbolt, wordSum = 44 Thread-23-bolt, pv = 12 Thread-57-sumbolt, wordSum = 45 Thread-55-bolt, pv = 12 Thread-57-sumbolt, wordSum = 46 Thread-26-bolt, pv = 12 Thread-57-sumbolt, wordSum = 47 Thread-53-bolt, pv = 12 Thread-57-sumbolt, wordSum = 48 Thread-26-bolt, pv = 13 Thread-57-sumbolt, wordSum = 49 Thread-53-bolt, pv = 13 Thread-57-sumbolt, wordSum = 50?
?
PV-UV案例優化引入Zookeeper鎖控制線程操作
匯總型方案:
1、在shuffleGrouping下,pv(單線程結果)*Executer并發數,一個Executer默認一個task,如果設置Task數大于1,公式應該是 pv(單線程結果)*Task數。同一個Executer下task的線程ID相同,taskId不同。
優點:簡單、計算量小
缺點:存在有一點誤差,但大部分場景能接受。
優化:
案例PVBolt中每個Task都會輸出一個匯總值,實際只需要一個Task輸出匯總值。利用Zookeeper鎖來做到只有一個Task輸出匯總值,而且每5秒輸出一次。
2、bolt1進行多次并發局部匯總,bolt2單線程進行全局匯總。
優點:
(1)計算絕對準確;
(2)如果用fieldGrouping可以得到中間值,如單個user的訪問PV(訪問深度,也是有用的指標)
缺點:計算量稍大,且多一個Bolt。
?
預處理:現在虛擬機hadoop-senior和hadoop-senior02啟動zookeeper集群。先創建"/lock”目錄,再創建"/lock/storm”目錄,保存的值均為空。
?
案例代碼:
(1)SourceSpout用于產生源數據。每次產生100行字符串,每行字符串的格式均為:域名 + "\t" + sessionId + "\t" +時間
package lock;import java.util.Map; import java.util.Queue; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;/*** * @description 生成數據* @author whiteshark* @time 2019年6月29日 上午9:45:40* @version*/ public class SourceSpout implements IRichSpout{private static final long serialVersionUID = 1L;SpoutOutputCollector collector;Queue<String> queue = new ConcurrentLinkedQueue<String>();String str = null;@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 1");this.collector = collector;Random random = new Random();// 登錄的網站是taobaoString hosts = "www.taobao.com";//每次登錄的session idString[] sessionId = {"5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8", "5D16C3E0209C16DEAA28L1824","5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413","5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E","5C3FBA728FD7D264B80769B23", "5B16C0F7215109AG43528BA2D","5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F"};//登錄的時間String[] times = {"2019-06-29 08:01:36", "2019-06-29 08:11:37", "2019-06-29 08:31:38", "2019-06-29 09:23:07", "2019-06-29 10:51:27", "2019-06-29 10:51:56","2019-06-29 11:01:07", "2019-06-29 11:01:20", "2019-06-29 11:45:30","2019-06-29 12:31:49", "2019-06-29 12:41:51", "2019-06-29 12:51:37", "2019-06-29 13:11:27", "2019-06-29 13:20:40", "2019-06-29 13:31:38",};for (int i = 0; i < 100; i++) {queue.add(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]);}System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 2");} catch (Exception e) {e.printStackTrace();}}@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic void activate() {// TODO Auto-generated method stub}@Overridepublic void deactivate() {// TODO Auto-generated method stub}@Overridepublic void nextTuple() {// System.out.println("SourceSpout nextTuple");if (queue.size() >= 0) {this.collector.emit(new Values(queue.poll()));}try {Thread.sleep(200);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void ack(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void fail(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}(2)PVBolt,在prepare中,會創建一個zookeeper 臨時目錄 "/lock/storm/pv",保存的值為 IP + ":" + taskId,所以只有一個任務能創建成功該臨時目錄。在execute方法中,每次讀取一行字符串,取sessionId累加。如果是創建zookeeper目錄的任務task,每隔5秒會輸出總的sessionId個數。
package lock;import java.net.InetAddress; import java.util.Map;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper;import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple;public class PVBolt implements IRichBolt{private static final long serialVersionUID = 1L;//在zk中創建鎖的路徑public static final String zkPath = "/lock/storm/pv";ZooKeeper zKeeper = null;String lockData = null;OutputCollector collector;String logString = null;String sessionId = null;Integer pv = 0;long beginTime = System.currentTimeMillis();long endTime = 0;// 初始化方法@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) { // this.collector = collector;try {System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 1");// 新建zk客戶端到zk集群。3秒連不到集群算超時. ip地址是虛擬機的地址,2臺虛擬機都有裝zk,并且已經啟動,存在目錄"/lock/storm"zKeeper = new ZooKeeper("192.168.178.131:2181,192.168.178.132:2181", 20000, new Watcher(){@Overridepublic void process (WatchedEvent event) {System.out.println("event : " + event.getType());}});System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 2");//如果沒有連接到集群,休眠1秒,讓其重連。如果沒有連接成功,一直等待while (zKeeper.getState() != ZooKeeper.States.CONNECTED) {Thread.sleep(1000);}System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 3");//獲得本機的ipInetAddress address = InetAddress.getLocalHost();//保存在zk路徑中的值,IP + ":" + taskIdlockData = address.getHostAddress() + ":" + context.getThisTaskId();System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 4");// 只是判斷目錄是否存在,不放監控watchif (null == zKeeper.exists(zkPath, false)) {//如果不存在該目錄,創建一個臨時目錄節點zKeeper.create(zkPath, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - create " + zkPath);}System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 5");} catch (Exception e) {try {zKeeper.close();} catch (InterruptedException e1) {e1.printStackTrace();}}}@Overridepublic void execute(Tuple input) {try {logString = input.getString(0);if (logString != null) {endTime = System.currentTimeMillis();sessionId = logString.split("\t")[1];if (null != sessionId) {pv++;}// 第一個值是當前線程ID,第二個值是瀏覽次數 // collector.emit(new Values(Thread.currentThread().getId(), pv));// 每5秒打印一次if (endTime - beginTime >= 5000) {System.err.println(lockData + "============================="); // System.out.println("lockData = " + new String(zKeeper.getData(zkPath, false, null)));if (lockData.equals(new String(zKeeper.getData(zkPath, false, null)))) {System.out.println("pv ====================== " + pv * 4);}beginTime = System.currentTimeMillis();}}} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) { // declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;} }(3)PVTopo,設置spout并發數為1,bolt并發數為4,并且策略為shuffle Grouping,使得數據均勻分配到每個bolt。
package lock;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder;/*** 程序的啟動類* @description * @author whiteshark* @date 2019年7月6日 下午5:13:42*/ public class PVTopo {public static void main(String[] args) {// 1.創建topology, 拓撲對象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 設置spout,bolt// setSpout方法的3個參數分別為,// spout的id(string類型),實例,并發數。大量數據場景并行數設置大一些topoBuilder.setSpout("spout", new SourceSpout(), 1);// shuffle Grouping 分組,并發數為4topoBuilder.setBolt("bolt", new PVBolt(), 4).shuffleGrouping("spout");// 3. 設置works個數Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3個參數分別為:拓撲名稱,stormconfig配置,拓撲實例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}} }輸入的結果為:
192.168.43.105:6============================= 192.168.43.105:8============================= 192.168.43.105:5============================= pv ====================== 16 192.168.43.105:7============================= 192.168.43.105:8============================= 192.168.43.105:6============================= 192.168.43.105:7============================= 192.168.43.105:5============================= pv ====================== 44 192.168.43.105:8============================= 192.168.43.105:6============================= 192.168.43.105:7============================= 192.168.43.105:5============================= pv ====================== 72 192.168.43.105:8============================= 192.168.43.105:6============================= 192.168.43.105:7============================= 192.168.43.105:5============================= pv ====================== 100如果程序不關閉,在zookeeper能看到臨時目錄"/lock/storm/pv"保存的值(IP + ":" + taskId):
[zk: localhost:2181(CONNECTED) 18] get /lock/storm/pv 192.168.43.105:5 cZxid = 0x2000000050 ctime = Tue Jul 30 00:02:39 CST 2019 mZxid = 0x2000000050 mtime = Tue Jul 30 00:02:39 CST 2019 pZxid = 0x2000000050 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x16c3e672803000f dataLength = 16 numChildren = 0?
打包發布到虛擬機storm集群運行
上面的案例是在本地機子上運行。接下來要在storm集群運行。
1、工程打包,工程右鍵 Run AS,點擊Maven install,最終控制臺顯示“BUILD SUCCESS”,說明打包成功。
在target目錄下,有2個包,名稱帶有depedencies的是有依賴,需要的是這個包。
用Filezilla 把包發送到虛擬機hadoop-senior的目錄/opt/datas/stormjars下
2、啟動虛擬機的storm集群。
先啟動hadoop-senior虛擬機和hadoop-senior02虛擬機上的zookeeper。
在hadoop-senior虛擬機啟動storm作為主節點,并啟動ui。nohup是后臺啟動,關閉窗口不會停止該storm節點
nohup ./storm nimbus & nohup ./storm ui &在hadoop-senior02虛擬機啟動storm作為從節點。
nohup ./storm supervisor &在瀏覽器http://hadoop-senior:8081,可以看到集群的監控界面
3、提交拓撲任務
在hadoop-senior的storm安裝目錄下,進入bin目錄。輸入命令提交拓撲任務到集群
./storm jar /opt/datas/stormjars/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar lock.PVTopo PVTopo包是在/opt/datas/stormjars下,所以要加上前綴。后面帶有2個參數,參數1是類名,類名前綴要加上具體的包,參數2是拓撲名,拓撲名必須唯一。這里參數1是lock.PVTopo,參數2是PVTopo。
輸入提交拓撲任務的命令后,截取部分控制臺的輸出腳本,可以看到已經完成提交。
在監控界面可以看到,新增一個拓撲,名稱為PVTopo,狀態為active。
點擊拓撲名稱,進入查看拓撲詳情,可以看到有1個spout,4個bolt。spout發送了2060條數據,4個bolt總共接收了2060條數據。
4、查看運行結果
點擊bolt進入查看bolt詳情,4個bolt都在hadoop-senior02虛擬機的storm上運行,端口是work占用。我們在bolt程序中,有輸出總的sessionid個數,有4個bolt,只有1個bolt有輸出總數。如果要查看每一個bolt的log文件,這樣要耗時。
我們在zookeeper中,保存在"/lock/storm/pv"的值是 IP + taskId,所以找到taskid,就知道在哪個虛擬機和端口。
從圖片中可以看到,有輸出sessionid總數的taskid是6,輸出的log日志在hadoop-senior02虛擬機logs目錄下,端口為6707.
因為4個bolt都是在hadoop-senior02虛擬機上運行,所以在hadoop-senior02虛擬機storm目錄logs目錄下,可以看到各個bolt的log文件。查看logs目錄下的文件列表。4個bolt對應的log文件。每個bolt的log文件由端口號來識別。
有輸出sessionid總數的log是 worker-6706.log,查看文件,有輸出總數。上文中,soput發送和bolt接收的數量都遠大于這個數,是因為發送的有空字符串。sessionid統計的是非空。
5、停止拓撲任務
有2中方法。分別是監控界面操作,命令行操作。
(1)界面操作(沒有權限控制,只要能登陸進這個界面就能操作,所以不推薦),所有點deactivate,使任務停止(并沒有立即停止,而是處理完正在運行中的程序),此時任務狀態為inactivate。再點擊kill,任務停止運行。
此時任務狀態為inactivate,inactivate的下一個操作可以是activate,或者kill。顧名思義,activate是使得任務繼續運行,kill是殺死任務,使得任務完全停止。
點擊kill,完全停止任務。狀態變為killed
(2)命令行操作,在storm的bin目錄下,輸入命令
./storm kill PVTopo?
?
案例升級 計算網站UV(去重計算模式)
方案分析
1、把sessionId放入set實現自動去重,set.size()獲得UV
可行的方案(類似wordcount的計算去重word總數):
bolt1通過fieldGrouping進行多線程局部匯總,下一級bolt2進行單線程保存sessionId和count數到map且進行遍歷,可以得到:
PV,UV,訪問深度(每個sessionId的瀏覽數)
簡單、快速,但比較耗內存,要求集群資源內存多。
適用于中小企業,不適合大企業。
適用于小數據量,如訂單。
2、no-sql分布式數據庫,如HBase
通過rowkey實現去重,統計行數得到去重后的sessionId總數。
適用于大數據量,如統計流量。
?
storm的局限性:
storm應用場景廣泛,但能做的復雜度有限,通常為匯總型。
對源數據做預處理,寫入數據庫。
?
下文的案例采用第一種方案。
程序分析:
(1)DateFmt,工具類,用于把長日期字符串,轉化為短日期字符串(只含有年月日);
(2)SourceSpout ,spout類,1個線程,用于產生源數據。每次產生100行字符串,每行字符串的格式均為:域名 + "\t" + sessionId + "\t" +時間;
(3)FmtLogBolt,bolt類,4個線程,接收spout的數據,截取日期和sessionId發送到下一級;
(4)DeepVisitBolt,bolt類,4個線程,局部匯總,接收FmtLogBolt的數據,把接收到的日期和sessionId拼接成以下形式的字符串:日期+ "_" + sessionId。定義map,把日期+ "_" + sessionId作為key,每次進來相同的key,value加1;把日期+ "_" + sessionId,以及對應的value發送到下一級;
(5)UVSumBolt,bolt類,1個線程,全局匯總,接收DeepVisitBolt的數據。統計PV:用戶的瀏覽數(對用戶不去重),UV:用戶的瀏覽數(對用戶去重)
(6)UVTopo,topology類,定義spout,bolt類。
案例代碼:
(1)DateFmt,工具類
package user_visit;import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date;/*** 工具類* @author silva**/ public class DateFmt {public static final String DATE_LONG = "yyyy-MM-dd HH:mm:ss";public static final String DATE_SHORT = "yyyy-MM-dd";public static SimpleDateFormat sdf = new SimpleDateFormat(DATE_SHORT);// 返回指定格式的字符串public static String getCountDate(String date, String pattern) {SimpleDateFormat sdf = new SimpleDateFormat(pattern);Calendar cal = Calendar.getInstance();if (null != date) {try {cal.setTime(sdf.parse(date));} catch (Exception e) {e.printStackTrace();}}return sdf.format(cal.getTime());}// 字符串轉化為datepublic static Date parseDate(String dateStr) throws Exception {return sdf.parse(dateStr);}public static void main(String[] args) {String dateStr = "2019-06-29 13:11:27";System.out.println("date = " + getCountDate(dateStr, DATE_SHORT));} }(2)SourceSpout ,spout類,1個線程,用于產生源數據。日期要和運行程序為同一個日期,否則最后的出來的結果是0.
package user_visit;import java.util.Map; import java.util.Queue; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;/*** * @description 生成數據* @author whiteshark* @time 2019年6月29日 上午9:45:40* @version*/ public class SourceSpout implements IRichSpout{private static final long serialVersionUID = 1L;SpoutOutputCollector collector;Queue<String> queue = new ConcurrentLinkedQueue<String>();String str = null;@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 1");this.collector = collector;Random random = new Random();// 登錄的網站是taobaoString hosts = "www.taobao.com";//每次登錄的session idString[] sessionId = {"5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8", "5D16C3E0209C16DEAA28L1824","5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413","5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E","5C3FBA728FD7D264B80769B23", "5B16C0F7215109AG43528BA2D","5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F"};//登錄的時間String[] times = {"2019-08-04 08:01:36", "2019-08-04 08:11:37", "2019-08-04 08:31:38", "2019-08-04 09:23:07", "2019-08-04 10:51:27", "2019-08-04 10:51:56","2019-08-04 11:01:07", "2019-08-04 11:01:20", "2019-08-04 11:45:30","2019-08-04 12:31:49", "2019-08-04 12:41:51", "2019-08-04 12:51:37", "2019-08-04 13:11:27", "2019-08-04 13:20:40", "2019-08-04 13:31:38",};for (int i = 0; i < 100; i++) {queue.add(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]);}System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 2");} catch (Exception e) {e.printStackTrace();}}@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic void activate() {// TODO Auto-generated method stub}@Overridepublic void deactivate() {// TODO Auto-generated method stub}@Overridepublic void nextTuple() {// System.out.println("SourceSpout nextTuple");if (queue.size() >= 0) {this.collector.emit(new Values(queue.poll()));}try {Thread.sleep(200);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void ack(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void fail(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}(3)FmtLogBolt,bolt類,4個線程
package user_visit;import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;public class FmtLogBolt implements IBasicBolt{/*** */private static final long serialVersionUID = 1L;private String eachLog = null;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date", "sessionId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {eachLog = input.getString(0);if (null != eachLog && eachLog.length() > 0) {// 分別發送日期,sessionIdcollector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2], DateFmt.DATE_SHORT), eachLog.split("\t")[1]));}}@Overridepublic void cleanup() {// TODO Auto-generated method stub} }(4)DeepVisitBolt,bolt類,4個線程
package user_visit;import java.util.HashMap; import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;/*** 局部匯總* @author silva**/ public class DeepVisitBolt implements IBasicBolt{/*** */private static final long serialVersionUID = 1L;private Map<String, Integer> counts = new HashMap<>();@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date_sessionId", "count"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String dateStr = input.getString(0);String sessionId = input.getString(1);Integer count = counts.get(dateStr + "_" + sessionId);if (null == count) {count = 0;}count++;counts.put(dateStr + "_" + sessionId, count);// 發送到下一級,做全局匯總collector.emit(new Values(dateStr + "_" + sessionId, count));}@Overridepublic void cleanup() {// TODO Auto-generated method stub}}(5)UVSumBolt,bolt類,1個線程
package user_visit;import java.util.HashMap; import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple;/** 全局匯總*/ public class UVSumBolt implements IBasicBolt{private static final long serialVersionUID = 1L;private Map<String, Integer> counts = new HashMap<String, Integer>();private String curDate = null;private long beginTime;@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {try {long PV = 0; // 總數long UV = 0; // 去重后的個數long endTime = System.currentTimeMillis();String dateSessionId = input.getString(0);Integer count = input.getInteger(1);// 如果不是以當前日期開頭,并且比當前日期大,說明已經跨天到第二天// 需要把map清空,curDate 設置為新的日期if (!dateSessionId.startsWith(curDate) && DateFmt.parseDate(dateSessionId.split("_")[0]).after(DateFmt.parseDate(curDate))) {curDate = dateSessionId.split("_")[0];counts.clear();}counts.put(dateSessionId, count);if (endTime - beginTime > 1 * 1000) {for (Map.Entry<String, Integer> map : counts.entrySet()) {if (map.getKey().startsWith(curDate)) { // 只統計今天的數據,過濾非今天的數據UV++; // 用戶總數+1PV += map.getValue(); // 瀏覽數累加}}System.out.println("UV = " + UV + ", PV = " + PV);}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// 預處理,設置為當天時間,格式為 yyyy-MM-ddcurDate = DateFmt.getCountDate(null, DateFmt.DATE_SHORT);beginTime = System.currentTimeMillis();} }(6)UVTopo,topology類,定義spout,bolt類。
package user_visit;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields;/*** 程序的啟動類* @description * @author whiteshark* @date 2019年8月2日 下午1:03:02*/ public class UVTopo {public static void main(String[] args) {// 1.創建topology, 拓撲對象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 設置spout,bolt// setSpout方法的3個參數分別為,// spout的id(string類型),實例,并發數。大量數據場景并行數設置大一些topoBuilder.setSpout("spout", new SourceSpout(), 1);// shuffle Grouping 分組,并發數為4topoBuilder.setBolt("fmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout");// fields Grouping 分組,并發數為4。根據 date + sessionId 分組,相同 date + sessionId 的被放到同一個bolt線程處理topoBuilder.setBolt("deepVisitBolt", new DeepVisitBolt(), 4).fieldsGrouping("fmtLogBolt", new Fields("date", "sessionId"));// shuffle Grouping 分組,并發數為1topoBuilder.setBolt("sumBolt", new UVSumBolt(), 1).shuffleGrouping("deepVisitBolt");// 3. 設置works個數Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3個參數分別為:拓撲名稱,stormconfig配置,拓撲實例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}} }運行程序打印的結果。用戶的瀏覽數為100次,有12個不同的用戶訪問。與spout發送的數據一致。
. . . UV = 12, PV = 97 UV = 12, PV = 98 UV = 12, PV = 99 UV = 12, PV = 100?
總結
以上是生活随笔為你收集整理的【Storm】Spout的storm-starter及Grouping策略、并发度讲解、网站浏览量和用户数统计的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TableViewCell下载的问题
- 下一篇: 基于Springboot的超市订单管理系