flume avro java 发送数据_flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结...
1、source為http模式,sink為logger模式,將數(shù)據(jù)在控制臺打印出來。
conf配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = http #該設置表示接收通過http方式發(fā)送過來的數(shù)據(jù)
a1.sources.r1.bind = hadoop-master #運行flume的主機或IP地址都可以
a1.sources.r1.port = 9000#端口
#a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger#該設置表示將數(shù)據(jù)在控制臺打印出來
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume命令為:
bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console。
顯示如下的信息表示啟動flume成功。
895 (lifecycleSupervisor-1-3) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
打開另外一個終端,通過http post的方式發(fā)送數(shù)據(jù):
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:9000。
hadoop-master就是flume配置文件綁定的主機名,9000就是綁定的端口。
然后在運行flume的窗口就是看到如下的內容:
2018-06-12 08:24:04,472 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestampe=1234567, host=master} body: 62 61 64 6F 75 20 66 6C 75 6D 65 badou flume }
2、source為netcat(udp、tcp模式),sink為logger模式,將數(shù)據(jù)打印在控制臺
conf配置文件如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master#綁定的主機名或IP地址
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transcationCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume
bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console。
然后在另外一個終端,使用telnet發(fā)送數(shù)據(jù):
命令為:telnet hadoop-maser 44444
[root@hadoop-master ~]# telnet hadoop-master 44444
Trying 192.168.194.6...
Connected to hadoop-master.
Escape character is '^]'.
顯示上面的信息表示連接flume成功,然后輸入:
12213213213
OK
12321313
OK
在flume就會收到相應的信息:
2018-06-12 08:38:51,129 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 32 31 33 32 31 33 32 31 33 0D 12213213213. }
2018-06-12 08:38:51,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 33 32 31 33 31 33 0D 12321313. }
3、source為netcat/http模式,sink為hdfs模式,將數(shù)據(jù)存儲在hdfs中。
conf配置文件如下,文件名為hdfs.conf:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs:/flume/events #文件在hdfs文件系統(tǒng)中存放的位置
a1.sinks.k1.hdfs.filePrefix = events- #文件的前綴
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream #制定文件的存放格式,這個設置是以text的格式存放從flume傳輸過來的數(shù)據(jù)。
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在hdfs文件系統(tǒng)中創(chuàng)建文件存放的路徑:
hadoop fs -mkdir /flume/event1。
啟動flume:
bin/flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console
通過telnet模式向flume中發(fā)送文件:
telnet hadoop-master 44444
然后輸入:
aaaaaaaa
bbbbbbb
ccccccccc
dddddddddd
通過如下的命令hadoop fs -ls /flume/events/查看hdfs中的文件,可以看到hdfs中有/flume/events有如下文件:
-rw-r--r-- 3 root supergroup 16 2018-06-05 06:02 /flume/events/events-.1528203709070
-rw-r--r-- 3 root supergroup 5 2018-06-05 06:02 /flume/events/events-.1528203755556
-rw-r--r-- 3 root supergroup 11 2018-06-05 06:03 /flume/events/events-.1528203755557
-rw-r--r-- 3 root supergroup 26 2018-06-13 07:28 /flume/events/events-.1528900112215
-rw-r--r-- 3 root supergroup 209 2018-06-13 07:29 /flume/events/events-.1528900112216
-rw-r--r-- 3 root supergroup 72 2018-06-13 07:29 /flume/events/events-.1528900112217
通過hadoop fs -cat /flume/events/events-.1528900112216查看文件events-.1528900112216的內容:
aaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbb
ccccccccccccccccccc
dddddddddddddddd
eeeeeeeeeeeeeeeeeee
fffffffffffffffffffffff
gggggggggggggggggg
hhhhhhhhhhhhhhhhhhhhhhh
iiiiiiiiiiiiiiiiiii
jjjjjjjjjjjjjjjjjjj
http模式就是把hdfs.conf文件中的netcat改為http,然后傳輸文件從telnet改為:
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:44444。
在hadoop文件中就會看到上面命令傳輸?shù)膬热?#xff1a;badou flume。
4、source為netcat/http模式,sink為hive模式,將數(shù)據(jù)存儲在hive中,并分區(qū)存儲。
conf配置如下,文件名為hive.conf:
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore=thrift://hadoop-master:9083
a1.sinks.k1.hive.database=default#hive數(shù)據(jù)庫名
a1.sinks.k1.hive.table=flume_user1
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.hive.partition=3#如果以netcat模式,只能靜態(tài)設置分區(qū)的值,因為netcat模式傳輸數(shù)據(jù),無法傳輸某個字段的值,只能按照順序來。這里設置age的分區(qū)值為3。
#a1.sinks.k1.hive.partition=%{age}#如果以http或json等模式,只能動態(tài)設置分區(qū)的值,因為http模式可以動態(tài)傳輸age的值。
a1.sinks.k1.serializer.delimiter=" "
a1.sinks.k1.serializer.serderSeparator=' '
a1.sinks.k1.serializer.fieldnames=user_id,user_name
a1.sinks.k1.hive.txnsPerBatchAsk = 10
a1.sinks.k1.hive.batchSize = 1500
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在hive中創(chuàng)建表:
create table flume_user(
user_id int
,user_name string
)
partitioned by(age int)
clustered by (user_id) into 2 buckets
stored as orc
在hive-site.xml中添加如下內容:
javax.jdo.option.ConnectionPassword
hive
password to use against metastore database
hive.support.concurrency
true
hive.exec.dynamic.partition.mode
nonstrict
hive.txn.manager
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on
true
hive.compactor.worker.threads
1
將hive根目錄下的/hcatalog/share/hcatalog文件夾中的如下三個文件夾添加到flume的lib目錄下。
運行flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
重新打開一個窗口,
啟動metastroe服務:
hive --service metastore &
重新打開一個客戶端,通過telnet連接到flume
telnet hadoop-master 44444
然后輸入:
1 1
3 3
就會在hive中看到如下兩行數(shù)據(jù):
flume_user1.user_id flume_user1.user_name flume_user1.age
1 1 3
3 3 3
age是在hive.conf中設置的值3。
現(xiàn)在將flume的source換成http模式,然后hive分區(qū)通過參數(shù)模式動態(tài)的傳輸分區(qū)值。
將hive.conf中的
a1.sources.r1.type = netcat改成a1.sources.r1.type = http
a1.sinks.k1.hive.partition=3改成a1.sinks.k1.hive.partition=%{age}。
然后啟動flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
在重新打開的窗口中通過http的模式傳輸數(shù)據(jù)到flume
curl -X POST -d '[{"headers":{"age":"109"},"body":"11 ligongong"}]' hadoop-master:44444。
在hive中可以看到如下的數(shù)據(jù):
flume_user1.user_id flume_user1.user_name flume_user1.age
11 ligongong 109
由此可以看出通過http模式傳輸數(shù)據(jù)到hive中時,分區(qū)字段的信息是在header中傳輸,而其他字段的信息是放在bady中傳輸,并且不同列之間以hive.conf文件定義好的分隔符分隔。
5、使用avro模式,將數(shù)據(jù)在控制臺打印出來。
不同的agent之間傳輸數(shù)據(jù)只能通過avro模式。
這里我們需要兩臺服務器來演示avro的使用,兩臺服務器分別是hadoop-master和hadoop-slave2
hadoop-master中運行agent2,然后指定agent2的sink為avro,并且將數(shù)據(jù)發(fā)送的主機名設置為hadoop-slave2。hadoop-master中flume的conf文件設置如下,名字為push.conf:
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
#Describe/configure the source
a2.sources.r1.type= netcat
a2.sources.r1.bind= hadoop-master
a2.sources.r1.port = 44444
a2.sources.r1.channels= c1
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
#Describe/configure the source
a2.sinks.k1.type= avro#制定sink為avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= hadoop-slave2#指定sink要發(fā)送數(shù)據(jù)到的目的服務器名
a2.sinks.k1.port= 44444#目的服務器的端口
hadoop-slave2中運行的是agent1,agent1的source為avro。flume配置內容如下,文件名為pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= hadoop-slave2
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000。
現(xiàn)在hadoop-slave2中啟動flume,然后在hadoop-master中啟動flume,順序一定要對,否則會報如下的錯誤:org.apache.flume.FlumeException: java.net.SocketException: Unresolved address
在hadoop-slave2中啟動flume:
bin/flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
在hadoop-master中啟動flume:
bin/flume-ng agent -c conf -f conf/push.conf -n a2 -Dflume.root.logger=INFO,console
重新打開一個窗口,通過telnet連接到hadoop-master
telnet hadoop-master 44444
然后發(fā)送11111aaaa
在hadoop-slave2的控制臺中就會顯示之前發(fā)送的,11111aaaa,如下所示:
2018-06-14 06:43:00,686 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 31 31 31 61 61 61 61 0D 11111aaaa. }
6、通過flume將數(shù)據(jù)通傳輸?shù)絢afka,然后通過kafka將數(shù)據(jù)存儲在hdfs和hive中。
在分別在hadoop-master、hadoop-slave1、hadoop-slave2上啟動zookeeper。
命令為:
然后啟動kafka,進入kafka的安裝目錄,執(zhí)行命令:
./bin/kafka-server-start.sh config/server.properties &
在kafka中創(chuàng)建topic:
bin/kafka-topics.sh --create --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --replication-factor 1 --partitions 2 --topic flume_kafka
查看kafka中的topic:
bin/kafka-topics.sh --list --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181
啟動kafka的消費者:
./kafka-console-consumer.sh --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --topic flume_kafka
配置flume中conf文件,設置source類型為exec,sink為org.apache.flume.sink.kafka.KafkaSink,設置kafka的topic為上面創(chuàng)建的flume_kafka,具體配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#設置sources的類型為exec,就是執(zhí)行命令的意思
a1.sources.r1.type = exec
#設置sources要執(zhí)行的命令
a1.sources.r1.command = tail -f /home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt
# 設置kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 設置kafka的broker地址和端口號
a1.sinks.k1.brokerList=hadoop-master:9092
# 設置Kafka的topic
a1.sinks.k1.topic=flume_kafka
# 設置序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
啟動flume:
只要/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt中有數(shù)據(jù)時flume就會加載kafka中,然后被上面啟動的kafka消費者消費掉。
我們查看發(fā)現(xiàn)/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt文件中有如下的數(shù)據(jù):
131,dry pasta
131,dry pasta
132,beauty
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
134,specialty wines champagnes
134,specialty wines champagnes
134,specialty wines champagnes
總結
以上是生活随笔為你收集整理的flume avro java 发送数据_flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 学pyqt5之前需要学python吗_快
- 下一篇: 怎么通过包装辨别润康的真假?