storm原理介绍
storm原理介紹
@(STORM)[storm, 大數(shù)據(jù)]
- storm原理介紹
- 一原理介紹
- Why use Storm
- 1適用場景
- 2集群相關(guān)概念
- 3拓?fù)湎嚓P(guān)概念
- 二配置
- 三并行度
- 一storm拓?fù)涞牟⑿卸瓤梢詮囊韵?個維度進(jìn)行設(shè)置
- 二并行度的設(shè)置方法
- 三示例
- 四分組
- 五可靠性
- 一spout
- 二bolt
一、原理介紹
Why use Storm?
**Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. **Storm is simple, can be used with any programming language, and is a lot of fun to use!
Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.
**Storm integrates with the queueing and database technologies you already use. **A Storm topology consumes streams of data and processes those streams in arbitrarily complex ways, repartitioning the streams between each stage of the computation however needed. Read more in the tutorial.
storm: 分布式實時計算系統(tǒng)。
1、適用場景
流數(shù)據(jù)處理:Storm可以用來處理源源不斷流進(jìn)來的消息,處理之后將結(jié)果寫入到某個存儲中去。
分布式rpc:由于storm的處理組件是分布式的,而且處理延遲極低,所以可以作為一個通用的分布式rpc框架來使用。當(dāng)然,其實我們的搜索引擎本身也是一個分布式rpc系統(tǒng)。
2、集群相關(guān)概念
(1) Nimbus:負(fù)責(zé)資源分配和任務(wù)調(diào)度。
(2)Supervisor:負(fù)責(zé)接受nimbus分配的任務(wù),啟動和停止屬于自己管理的worker進(jìn)程。
(3)Worker:運(yùn)行具體處理組件邏輯的進(jìn)程。
(4)Task:worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之后,task不再與物理線程對應(yīng),同一個spout/bolt的task可能會共享一個物理線程,該線程稱為executor。
3、拓?fù)湎嚓P(guān)概念
(1) Topology:storm中運(yùn)行的一個實時應(yīng)用程序,因為各個組件間的消息流動形成邏輯上的一個拓?fù)浣Y(jié)構(gòu)。
(2)Spout:在一個topology中產(chǎn)生源數(shù)據(jù)流的組件。通常情況下spout會從外部數(shù)據(jù)源中讀取數(shù)據(jù),然后轉(zhuǎn)換為topology內(nèi)部的源數(shù)據(jù)。Spout是一個主動的角色,其接口中有個nextTuple()函數(shù),storm框架會不停地調(diào)用此函數(shù),用戶只要在其中生成源數(shù)據(jù)即可。
(3)Bolt:在一個topology中接受數(shù)據(jù)然后執(zhí)行處理的組件。Bolt可以執(zhí)行過濾、函數(shù)操作、合并、寫數(shù)據(jù)庫等任何操作。Bolt是一個被動的角色,其接口中有個execute(Tuple input)函數(shù),在接受到消息后會調(diào)用此函數(shù),用戶可以在其中執(zhí)行自己想要的操作。
(4)Tuple:一次消息傳遞的基本單元。本來應(yīng)該是一個key-value的map,但是由于各個組件間傳遞的tuple的字段名稱已經(jīng)事先定義好,所以tuple中只要按序填入各個value就行了,所以就是一個value list.
(5)Stream:源源不斷傳遞的tuple就組成了stream。
二、配置
完整的默認(rèn)配置文件見下面defaluts.yaml,若需要修改,則在storm.yaml中修改。重要參數(shù)如下:
1、storm.zookeeper.servers:指定使用哪個zookeeper集群
2、nimbus.host:指定nimbus是哪臺機(jī)器
nimbus.host: "gdc-nn01-test”3、指定supervisor在哪個端口上運(yùn)行worker,每個端口可運(yùn)行一個worker,因此有多少個配置端口,則每個supervisor有多少個slot(即可運(yùn)行多少個worker)
supervisor.slots.ports:- 6700- 6701- 6702- 6703storm.local.dir: "/home/hadoop/storm/data"4、jvm設(shè)置
nimbus.childopts:"-4096m” supervisor.childopts:"-Xmx4096m" nimubs.childopts:"-Xmx3072m”除此外,還有ui.childopts,logviewer.childopts
附完整配置文件:defaults.yaml
########### These all have default values as shown ########### Additional configuration goes into storm.yaml java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"### storm.* configs are general configurations # the local dir is where jars are kept storm.local.dir: "storm-local" storm.zookeeper.servers:- "localhost" storm.zookeeper.port: 2181 storm.zookeeper.root: "/storm" storm.zookeeper.session.timeout: 20000 storm.zookeeper.connection.timeout: 15000 storm.zookeeper.retry.times: 5 storm.zookeeper.retry.interval: 1000 storm.zookeeper.retry.intervalceiling.millis: 30000 storm.cluster.mode: "distributed" # can be distributed or local storm.local.mode.zmq: false storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin" storm.messaging.transport: "backtype.storm.messaging.netty.Context" storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"### nimbus.* configs are for the master nimbus.host: "localhost" nimbus.thrift.port: 6627 nimbus.thrift.max_buffer_size: 1048576 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 nimbus.inbox.jar.expiration.secs: 3600 nimbus.task.launch.secs: 120 nimbus.reassign: true nimbus.file.copy.expiration.secs: 600 nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator"### ui.* configs are for the master ui.port: 8080 ui.childopts: "-Xmx768m" logviewer.port: 8000 logviewer.childopts: "-Xmx128m" logviewer.appender.name: "A1"drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 drpc.invocations.port: 3773 drpc.request.timeout.secs: 600 drpc.childopts: "-Xmx768m" transactional.zookeeper.root: "/transactional" transactional.zookeeper.servers: null transactional.zookeeper.port: null### supervisor.* configs are for node supervisors # Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication supervisor.slots.ports:- 6700- 6701- 6702- 6703 supervisor.childopts: "-Xmx256m" #how long supervisor will wait to ensure that a worker process is started supervisor.worker.start.timeout.secs: 120 #how long between heartbeats until supervisor considers that worker dead and tries to restart it supervisor.worker.timeout.secs: 30 #how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary supervisor.monitor.frequency.secs: 3 #how frequently the supervisor heartbeats to the cluster state (for nimbus) supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true ### worker.* configs are for task workers worker.childopts: "-Xmx768m" worker.heartbeat.frequency.secs: 1# control how many worker receiver threads we need per worker topology.worker.receiver.thread.count: 1 task.heartbeat.frequency.secs: 3 task.refresh.poll.secs: 10 zmq.threads: 1 zmq.linger.millis: 5000 zmq.hwm: 0 storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead. storm.messaging.netty.max_retries: 30 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. storm.messaging.netty.transfer.batch.size: 262144# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is. storm.messaging.netty.flush.check.interval.ms: 10 ### topology.* configs are for specific executing storms topology.enable.message.timeouts: true topology.debug: false topology.workers: 1 topology.acker.executors: null topology.tasks: null # maximum amount of time a message has to complete before it's considered failed topology.message.timeout.secs: 30 topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer" topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null topology.max.spout.pending: null topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: null topology.executor.receive.buffer.size: 1024 #batched topology.executor.send.buffer.size: 1024 #individual messages topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets) topology.transfer.buffer.size: 1024 # batched topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy" topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 topology.max.error.report.per.interval: 5 topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer" topology.trident.batch.emit.interval.millis: 500 topology.classpath: null topology.environment: null dev.zookeeper.path: "/tmp/dev-storm-zookeeper"</span>三、并行度
(一)storm拓?fù)涞牟⑿卸瓤梢詮囊韵?個維度進(jìn)行設(shè)置:
1、node(服務(wù)器):指一個storm集群中的supervisor服務(wù)器數(shù)量。
2、worker(jvm進(jìn)程):指整個拓?fù)渲衱orker進(jìn)程的總數(shù)量,這些數(shù)量會隨機(jī)的平均分配到各個node。
3、executor(線程):指某個spout或者bolt的總線程數(shù)量,這些線程會被隨機(jī)平均的分配到各個worker。
4、task(spout/bolt實例):task是spout和bolt的實例,它們的nextTuple()和execute()方法會被executors線程調(diào)用。除非明確指定,storm會給每個executor分配一個task。如果設(shè)置了多個task,即一個線程持有了多個spout/bolt實例.
注意:以上設(shè)置的都是總數(shù)量,這些數(shù)量會被平均分配到各自的宿主上,而不是設(shè)置每個宿主進(jìn)行多少個進(jìn)程/線程。詳見下面的例子。
關(guān)于executor/task的進(jìn)一步說明:
The number of tasks is the number of spout objects that get created, that each have their own distinct sets of tuples that are emitted, need to be acked, etc. The number of executors is the number of OS threads (potentially across more than 1 machine) that get created to service these spout objects. Usually there is 1 executor for each task, but you may want to create more tasks than executors if you think you will want to rebalance in the future.
(二)并行度的設(shè)置方法
1、node:買機(jī)器吧,然后加入集群中……
2、worker:Config#setNumWorkers() 或者配置項 TOPOLOGY_WORKERS
3、executor:Topology.setSpout()/.setBolt()的最后一個參數(shù)
4、task:ComponentConfigurationDeclarer#setNumWorker()
(三)示例
// 創(chuàng)建topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);//設(shè)置executor數(shù)量為5 builder.setBolt("filter-bolt", new FilterBolt(), 3).shuffleGrouping( "kafka-reader");//設(shè)置executor數(shù)量為3 builder.setBolt("log-splitter", new LogSplitterBolt(), 3) .shuffleGrouping("filter-bolt");//設(shè)置executor數(shù)量為5 builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping( "log-splitter");//設(shè)置executor數(shù)量為2 // 啟動topology Config conf = new Config(); conf.put(Config.NIMBUS_HOST, nimbusHost); conf.setNumWorkers(3); //設(shè)置worker數(shù)量 StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());1、通過config.setNumWorkers(3)將worker進(jìn)程數(shù)量設(shè)置為3,假設(shè)集群中有3個node,則每個node會運(yùn)行一個worker。
2、executor的數(shù)量分別為:
spout:5
filter-bolt:3
log-splitter:3
hdfs-bolt:2
總共為13個executor,這13個executor會被隨機(jī)分配到各個worker中去。
注:這段代碼是從kafka中讀取消息源的,而這個topic在kafka中的分區(qū)數(shù)量設(shè)置為5,因此這里spout的線程ovtn為5.
3、這個示例都沒有單獨(dú)設(shè)置task的數(shù)量,即使用每個executor一個task的默認(rèn)配置。若需要設(shè)置,可以:
來進(jìn)行設(shè)置,這5個task會被分配到3個executor中。
(四)并行度的動態(tài)調(diào)整
對storm拓?fù)涞牟⑿卸冗M(jìn)行調(diào)整有2種方法:
1、kill topo—>修改代碼—>編譯—>提交拓?fù)?
2、動態(tài)調(diào)整
第1種方法太不方便了,有時候topo不能說kill就kill,另外,如果加幾臺機(jī)器,難道要把所有topo kill掉還要修改代碼?
因此storm提供了動態(tài)調(diào)整的方法,動態(tài)調(diào)整有2種方法:
1、ui方式:進(jìn)入某個topo的頁面,點(diǎn)擊rebalance即可,此時可以看到topo的狀態(tài)是rebalancing。但此方法只是把進(jìn)程、線程在各個機(jī)器上重新分配,即適用于增加機(jī)器,或者減少機(jī)器的情形,不能調(diào)整worker數(shù)量、executor數(shù)量等
2、cli方式:storm rebalance
舉個例子
將topo的worker數(shù)量設(shè)置為7,并將filter-bolt與hdfs-bolt的executor數(shù)量分別設(shè)置為6、8.
此時,查看topo的狀態(tài)是rebalancing,調(diào)整完成后,可以看到3臺機(jī)器中的worker數(shù)量分別為3、2、2
四、分組
Storm通過分組來指定數(shù)據(jù)的流向,主要指定了每個bolt消費(fèi)哪個流,以及如何消費(fèi)。
storm內(nèi)置了7個分組方式,并提供了CustomStreamGrouping來創(chuàng)建自定義的分組方式。
1、隨機(jī)分組 shuffleGrouping
這種方式會隨機(jī)分發(fā)tuple給bolt的各個task,每個task接到到相同數(shù)量的tuple。
2、字段分組 fieldGrouping
按照指定字段進(jìn)行分組,該字段具有相同組的會被發(fā)送到同一個task,具體不同值的可能會被發(fā)送到不同的task。
3、全復(fù)制分組 allGrouping(或者叫廣播分組)
每一個tuple都會發(fā)送給所有的task,必須小心使用。
4、全局分組 globlaGrouping
將所有tuple均發(fā)送到唯一的task,會選取task ID最小的task。這種分組下,設(shè)置task的并行度是沒有意義的。另外,這種方式很有可能引起瓶頸。
5、不分組 noneGrouping
留作以后使用,目前也隨機(jī)分組相同。
6、指向型分組 directGrouping(或者叫直接分組)
數(shù)據(jù)源會調(diào)用emitDirect()方法來判斷一個tuple應(yīng)該由哪個storm組件來接收,只能在聲明了是指向型的數(shù)據(jù)流上使用。
7、本地或隨機(jī)分組 localOrShuffleGrouping
如果接收bolt在同一個進(jìn)程中存在一個或者多個task,tuple會優(yōu)先發(fā)送給這個task。否則和隨機(jī)分組一樣。相對于隨機(jī)分組,此方式可以減少網(wǎng)絡(luò)傳輸,從而提高性能。
五、可靠性
可靠性:spout發(fā)送的消息會被拓?fù)錁渖系乃泄?jié)點(diǎn)ack,否則會一直重發(fā)。
導(dǎo)致重發(fā)的原因有2個:
(1)fail()被調(diào)用
(2)超時無響應(yīng)。
完整的可靠性示例請參考storm blueprint的chapter1 v4代碼,或者P22,或者參考從零開始學(xué)storm P102頁的例子。
關(guān)鍵步驟如下:
(一)spout
1、創(chuàng)建一個map,用于記錄已經(jīng)發(fā)送的tuple的id與內(nèi)容,此為待確認(rèn)的tuple列表。
private ConcurrentHashMap<UUID,Values> pending;2、發(fā)送tuple時,加上一個參數(shù)用于指明該tuple的id。同時,將此tuple加入map中,等待確認(rèn)。
UUID msgId = UUID.randomUUID(); this.pending.put(msgId,values); this.collector.emit(values,msgId);3、定義ack方法與fail方法。
ack方法將tuple從map中取出
fail方法將tuple重新發(fā)送
this.collector.emit(this.pending.get(msgId),msgId);對于沒回復(fù)的tuple,會定時重新發(fā)送。
(二)bolt
處理該tuple的每個bolt均需要增加以下內(nèi)容:
1、emit時,增加一個參數(shù)anchor,指定響應(yīng)的tuple
2、確認(rèn)接收到的tuple已經(jīng)處理
this.collector.ack(tuple); 創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
- 上一篇: trident原理及编程指南
- 下一篇: storm集群操作指南