sparkstreaming 读取mysql_SparkStreaming读取Kafka的两种方式
本文主要從以下幾個方面介紹SparkStreaming讀取Kafka的兩種方式:
一、SparkStreaming簡介
二、Kafka簡介
三、Redis簡介(可用于保存歷史數據或偏移量數據)
四、SparkStreaming讀取Kafka數據的兩種方式
五、演示Demo
一、SparkStreaming簡介
可以參考這篇文章:SparkStreaming 詳解
二、Kafka簡介
可以參考這篇文章:Kafka(分布式發布訂閱消息系統) 簡介
三、Redis簡介
可以參考這篇文章:Redis簡介
四、SparkStreaming讀取Kafka數據的兩種方式
spark streaming提供了兩種獲取方式,一種是利用接收器(receiver)和kafaka的高層API實現。
一種是不利用接收器,直接用kafka底層的API來實現(spark1.3以后引入)。
1、reciver鏈接方式(有些問題,開發中不采用這種方式)
- 用KafkaUtils.createDstream創建鏈接。Receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job會去處理那些數據。
- Receiver方式是通過zookeeper來連接kafka隊列,調用Kafka高階API,offset存儲在zookeeper,由Receiver維護。
- 在executor上會有receiver從kafka接收數據并存儲在Spark executor中,在到了batch時間后觸發job去處理接收到的數據,1個receiver占用1個core使用wal預寫機制,因為需要使用hdfs等存儲,因此會降低性能。
receiver方式
基于Receiver方式存在的問題:
- 啟用WAL機制,每次處理之前需要將該batch內的數據備份到checkpoint目錄中,這降低了數據處理效率,同時加重了Receiver的壓力;另外由于數據備份機制,會受到負載影響,負載一高就會出現延遲的風險,導致應用崩潰。
- 采用MEMORY_AND_DISK_SER降低對內存的要求,但是在一定程度上影響了計算的速度。
- 單Receiver內存。由于Receiver是屬于Executor的一部分,為了提高吞吐量,提高Receiver的內存。但是在每次batch計算中,參與計算的batch并不會使用這么多內存,導致資源嚴重浪費。
- 提高并行度,采用多個Receiver來保存kafka的數據。Receiver讀取數據是異步的,不會參與計算。如果提高了并行度來平衡吞吐量很不劃算。
- Receiver和計算的Executor是異步的,在遇到網絡等因素時,會導致計算出現延遲,計算隊列一直在增加,而Receiver一直在接收數據,這非常容易導致程序崩潰。
- 在程序失敗恢復時,有可能出現數據部分落地,但是程序失敗,未更新offsets的情況,這會導致數據重復消費。
2、Direct直連方式(開發中使用的方式)
- 使用KafkaUtils.createDirectStream創建鏈接。這種方式定期從kafka的topic下對應的partition中查詢最新偏移量,并在每個批次中根據相應的定義的偏移范圍進行處理。Spark通過調用kafka簡單的消費者API讀取一定范圍的數據。
- Direct方式是直接連接kafka分區來獲取數據。從每個分區直接讀取數據大大提高了并行能力Direct方式調用Kafka低階API(底層API),offset自己存儲和維護,默認由Spark維護在checkpoint中,消除了與zk不一致的情況當然也可以自己手動維護,把offset存在mysql、redis中所以基于Direct模式可以在開發中使用,且借助Direct模式的特點+手動操作可以保證數據的Exactly once 精準一次
基于Direct方式的優勢:
- 簡化并行讀取:如果要讀取多個partition,不需要創建多個輸入DStream然后對他們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,并且會并行從kafka中讀取數據。所以在kafka partition和RDD partition之間,有一一對應的關系。
- 高性能:如果要保證數據零丟失,在基于Receiver的方式中,需要開啟WAL機制。這種方式其實效率很低,因為數據實際被復制了兩份,kafka自己本身就有高可靠的機制,會對數據復制一份,而這里又會復制一份到WAL中。而基于Direct的方式,不依賴于Receiver,不需要開啟WAL機制,只要kafka中做了數據的復制,那么就可以通過kafka的副本進行恢復。
- 強一致語義:基于Receiver的方式,使用kafka的高階API來在Zookeeper中保存消費過的offset。這是消費kafka數據的傳統方式。這種方式配合WAL機制,可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和Zookeeper之間可能是不同步的。基于Direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據時消費一次且僅消費一次。
- 降低資源:Direct不需要Receiver,其申請的Executors全部參與到計算任務中;而Receiver則需要專門的Receivers來讀取kafka數據且不參與計算。因此相同的資源申請,Direct能夠支持更大的業務。Receiver與其他Executor是異步的,并持續不斷接收數據,對于小業務量的場景還好,如果遇到大業務量時,需要提高Receiver的內存,但是參與計算的Executor并不需要那么多的內存,而Direct因為沒有Receiver,而是在計算的時候讀取數據,然后直接計算,所以對內存的要求很低。
- 魯棒性更好:基于Receiver方式需要Receiver來異步持續不斷的讀取數據,因此遇到網絡、存儲負載等因素,導致實時任務出現堆積,但Receiver卻還在持續讀取數據,此種情況容易導致計算崩潰。Direct則沒有這種顧慮,其Driver在觸發batch計算任務時,才會讀取數據并計算,隊列出現堆積并不不會引起程序的失敗。
基于Direct方式的不足:
- Direct方式需要采用checkpoint或者第三方存儲來維護offset,而不是像Receiver那樣,通過Zookeeper來維護offsets,提高了用戶的開發成本。
- 基于Receiver方式指定topic指定consumer的消費情況均能夠通過Zookeeper來監控,而Direct則沒有這么便利,如果想做監控并可視化,則需要投入人力開發。
五、演示Demo
1、reciver鏈接方式
package xxximport org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * Receiver鏈接方式 */object KafkaWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) val zkQuorum = "slave2:2181,slave3:2181,slave4:2181" val groupId = "g1" val topic = Map[String, Int]("test1" -> 1) //創建DStream,需要KafkaDStream val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic, StorageLevel.MEMORY_AND_DISK_SER) //對數據進行處理 //Kafak的ReceiverInputDStream[(String, String)]里面裝的是一個元組(key是寫入的key,value是實際寫入的內容) val lines: DStream[String] = data.map(_._2) //對DSteam進行操作,操作這個抽象(代理,描述),就像操作一個本地的集合一樣,類似于RDD val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //打印結果(Action) reduced.print() //啟動sparksteaming程序 ssc.start() //等待優雅的退出 ssc.awaitTermination() }}2、直連方式(用zookeeper存儲偏移量)
步驟:
準備zookeeper集群存儲讀取到額kafka數據的每個分區的偏移量
調用KafkaUtils.createDirectStream建立直連鏈接
讀取zookeeper集群中的已經存儲的每個數據分區地偏移量,根據該偏移量繼續讀取數據。或者從頭(當前)位置讀取數據
調用kafkaStream.transform遍歷每個RDD,獲取該RDD對應數據的偏移量
對RDD進行操作,并將zookeeper中保存的數據偏移量進行更新
package sparkStreamingAndKafkaimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}/** * 直連方式,用zookeeper存偏移量 */object KafkaDirection1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("kafkaDirection").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) val group = "group1" // 分組 val topic = "wordCount" // topic val brokerList = "slave1:9092,slave2:9092,slave3:9092" // broker集群,sparkStream的Task直接連到kafka分區上 val zkQuorum = "slave2:2181,slave3:2181,slave4:2181" // zookeeper集群,用于記錄偏移量(也可以選擇MySQL、Redis等記錄偏移量) val topics = Set(topic) // 創建stream時使用的topic名字集合,sparkStreaming可同時消費多個topic val topicDirs = new ZKGroupTopicDirs(group, topic) // 創建一個ZKGroupTopicDirs對象,其實就是指定往zookeeper中寫入數據的目錄,該目錄用于保存偏移量 val zkTopicPath: String = topicDirs.consumerOffsetDir // 獲取zookeeper中的路徑"/group1/offsets/wordCount/" // 準備kafka參數 val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString // 偏移量最開始的時候從哪讀,SmallestTimeString表示從頭開始讀, // LargestTimeString表示從啟動時刻產生的數據讀 ) val zkClient = new ZkClient(zkQuorum) // zookeeper的客戶端,可以從zk中讀取偏移量數據,并更新偏移量 val numOfzkChildren: Int = zkClient.countChildren(zkTopicPath) // 檢查該路徑下是否保存有數據(偏移量), // 例如:/group1/offsets/wordCount/2/1003 表示2號分區有偏移量1003 var kafkaStream: InputDStream[(String, String)] = null // 如果zookeeper中保存有偏移量offfset,則利用這個偏移量作為kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() if (numOfzkChildren > 0){ // 如果保存過offset for (i 1003 fromOffsets += (tp -> fromOffset.toLong) // 將topic不同分區所對應的偏移量放入集合中 } //Key: kafka的key values: "hello tom hello jerry" //這個會將 kafka 的消息進行 transform,最終 kafka 的數據都會變成 (kafka的key, message) 這樣的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) // 讀數據的規則 //通過KafkaUtils創建直連的DStream(fromOffsets參數的作用是:按照前面計算好了的偏移量繼續消費數據) // 泛型參數說明: //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解碼方式 value的解碼方式 處理完成后Dstream中的數據類型 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ // 沒有保存過offset,相當于從頭讀 //如果未保存,根據 kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offset //[String, String, StringDecoder, StringDecoder] // key value key的解碼方式 value的解碼方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范圍 var offsetRanges = Array[OffsetRange]() //從kafka讀取的消息,DStream的Transform方法可以將當前批次的RDD獲取出來 //該transform方法計算獲取到當前批次RDD,然后將RDD的偏移量取出來,然后在將RDD返回到DStream val transform: DStream[(String, String)] = kafkaStream.transform { rdd => //得到該 RDD對應 kafka 的消息的 offset //該RDD是一個KafkaRDD,可以獲得它的偏移量的范圍 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 偏移量范圍 rdd // 不對RDD進行操作,再放回DStream } // DStream 是RDD的工廠,每隔一段時間產生一個RDD val messages: DStream[String] = transform.map(_._2) //依次迭代DStream中的RDD messages.foreachRDD { rdd => // foreachRDD,每隔一段時間產生一個RDD rdd.foreachPartition(partition => // foreachPartition 每個分區一個連接鏈接 partition.foreach(x => { // foreach 分區中的每條數據 println(x) }) ) // 更新偏移量offset for (o但是,在這個方案中,為了獲取偏移量需要遍歷RDD,后續又要遍歷RDD操作RDD,代碼冗余
3、直連方式(獲取數據偏移量的同時處理數據)
package xxximport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}/** * 直連方式,用zookeeper存偏移量(獲取偏移量的同時,對數據進行操作) */object kafkaDirection2 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("kafkaDirection").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) val group = "group3" // 分組 val topic = "wordCount" // topic val brokerList = "slave1:9092,slave2:9092,slave3:9092" // broker集群,sparkStream的Task直接連到kafka分區上 val zkQuorum = "slave2:2181,slave3:2181,slave4:2181" // zookeeper集群,用于記錄偏移量(也可以選擇MySQL、Redis等記錄偏移量) val topics = Set(topic) // 創建stream時使用的topic名字集合,sparkStreaming可同時消費多個topic val topicDirs = new ZKGroupTopicDirs(group, topic) // 創建一個ZKGroupTopicDirs對象,其實就是指定往zookeeper中寫入數據的目錄,該目錄用于保存偏移量 val zkTopicPath: String = topicDirs.consumerOffsetDir // 獲取zookeeper中的路徑"/group1/offsets/wordCount/" // 準備kafka參數 val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString // 偏移量最開始的時候從哪讀,SmallestTimeString表示從頭開始讀, // LargestTimeString表示從啟動時刻產生的數據讀 ) val zkClient = new ZkClient(zkQuorum) // zookeeper的客戶端,可以從zk中讀取偏移量數據,并更新偏移量 val numOfzkChildren: Int = zkClient.countChildren(zkTopicPath) // 檢查該路徑下是否保存有數據(偏移量), // 例如:/group1/offsets/wordCount/2/1003 表示2號分區有偏移量1003 var kafkaStream: InputDStream[(String, String)] = null // 如果zookeeper中保存有偏移量offfset,則利用這個偏移量作為kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() if (numOfzkChildren > 0){ // 如果保存過offset for (i 1003 fromOffsets += (tp -> fromOffset.toLong) // 將topic不同分區所對應的偏移量放入集合中 } //Key: kafka的key values: "hello tom hello jerry" //這個會將 kafka 的消息進行 transform,最終 kafka 的數據都會變成 (kafka的key, message) 這樣的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) // 讀數據的規則 //通過KafkaUtils創建直連的DStream(fromOffsets參數的作用是:按照前面計算好了的偏移量繼續消費數據) // 泛型參數說明: //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解碼方式 value的解碼方式 處理完成后Dstream中的數據類型 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ // 沒有保存過offset,相當于從頭讀 //如果未保存,根據 kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offset //[String, String, StringDecoder, StringDecoder] // key value key的解碼方式 value的解碼方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范圍 var offsetRanges = Array[OffsetRange]() // 獲取偏移量的同時處理數據 // 直連方式只有在kakaDstream中的RDD才能獲取偏移量,那么就不能調用DStream的Transformation // 所以只能在KafkaStream中調用foreachRDD,獲取RDD的偏移量,然后就是對RDD進行操作了 //依次迭代DStream中的RDD // 如果使用直連方式進行累加數據,就需要在外部的數據庫中進行累加(用kay-value的內存數據庫,NoSQL型數據庫 Redis) kafkaStream.foreachRDD { kafkaRDD =>{ // 只有kafkaRDD可以強轉成HashOffSetRanges,并獲取偏移量 val offsetRanges: Array[OffsetRange] = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] = kafkaRDD.map(_._2) val words: RDD[String] = lines.flatMap(u => { u.split(" ") }) val wordsAndOne: RDD[(String, Int)] = words.map(word => { (word, 1) }) val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey((a, b) => { a + b }) reduced.foreach(println) // 更新偏移量offset for (o但是該方案,無法獲取歷史數據。這里統計到的wordcount只是某一時間片內對應數據的統計結果,并不包含歷史數據。
4、直連方式,zookeeper存儲偏移量數據,redis存儲歷史數據。
redis的連接池:
package xxximport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}object JedisConnectePool { val config = new JedisPoolConfig() //最大連接數, config.setMaxTotal(20) //最大空閑連接數, config.setMaxIdle(10) //當調用borrow Object方法時,是否進行有效性檢查 --> config.setTestOnBorrow(true) //10000代表超時時間(10秒) val pool = new JedisPool(config, "192.168.247.8", 6379, 10000, "123") def getConnection():Jedis={ pool.getResource }}package xxximport jedis.JedisConnectionPoolimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import redis.clients.jedis.Jedis/** * 直連方式,在獲取RDD偏移量的同時操作偏移量,并且能夠wordcount統計時包含歷史統計數據 */object kafkaDirection3 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("kafkaDirection").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) val group = "group2" // 分組 val topic = "wordCount" // topic val brokerList = "slave1:9092,slave2:9092,slave3:9092" // broker集群,sparkStream的Task直接連到kafka分區上 val zkQuorum = "slave2:2181,slave3:2181,slave4:2181" // zookeeper集群,用于記錄偏移量(也可以選擇MySQL、Redis等記錄偏移量) val topics = Set(topic) // 創建stream時使用的topic名字集合,sparkStreaming可同時消費多個topic val topicDirs = new ZKGroupTopicDirs(group, topic) // 創建一個ZKGroupTopicDirs對象,其實就是指定往zookeeper中寫入數據的目錄,該目錄用于保存偏移量 val zkTopicPath: String = topicDirs.consumerOffsetDir // 獲取zookeeper中的路徑"/group1/offsets/wordCount/" // 準備kafka參數 val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString // 偏移量最開始的時候從哪讀,SmallestTimeString表示從頭開始讀, // LargestTimeString表示從啟動時刻產生的數據讀 ) val zkClient = new ZkClient(zkQuorum) // zookeeper的客戶端,可以從zk中讀取偏移量數據,并更新偏移量 val numOfzkChildren: Int = zkClient.countChildren(zkTopicPath) // 檢查該路徑下是否保存有數據(偏移量), // 例如:/group1/offsets/wordCount/2/1003 表示2號分區有偏移量1003 var kafkaStream: InputDStream[(String, String)] = null // 如果zookeeper中保存有偏移量offfset,則利用這個偏移量作為kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() if (numOfzkChildren > 0){ // 如果保存過offset for (i 1003 fromOffsets += (tp -> fromOffset.toLong) // 將topic不同分區所對應的偏移量放入集合中 } //Key: kafka的key values: "hello tom hello jerry" //這個會將 kafka 的消息進行 transform,最終 kafka 的數據都會變成 (kafka的key, message) 這樣的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) // 讀數據的規則 //通過KafkaUtils創建直連的DStream(fromOffsets參數的作用是:按照前面計算好了的偏移量繼續消費數據) // 泛型參數說明: //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解碼方式 value的解碼方式 處理完成后Dstream中的數據類型 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ // 沒有保存過offset,相當于從頭讀 //如果未保存,根據 kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offset //[String, String, StringDecoder, StringDecoder] // key value key的解碼方式 value的解碼方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范圍 var offsetRanges = Array[OffsetRange]() // 直連方式只有在kakaDstream中的RDD才能獲取偏移量,那么就不能調用DStream的Transformation // 所以只能在KafkaStream中調用foreachRDD,獲取RDD的偏移量,然后就是對RDD進行操作了 //依次迭代DStream中的RDD // 如果使用直連方式進行累加數據,就需要在外部的數據庫中進行累加(用kay-value的內存數據庫,NoSQL型數據庫 Redis) kafkaStream.foreachRDD { kafkaRDD =>{ // 只有kafkaRDD可以強轉成HashOffSetRanges,并獲取偏移量 val offsetRanges: Array[OffsetRange] = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] = kafkaRDD.map(_._2) val words: RDD[String] = lines.flatMap(u => { u.split(" ") }) val wordsAndOne: RDD[(String, Int)] = words.map(word => { (word, 1) }) val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey((a, b) => { a + b }) val stated: RDD[(String, Int)] = reduced.map(u => { // 獲取redis存放的歷史統計數據 val conn: Jedis = JedisConnectionPool.getConnection() val str: String = conn.get(u._1) var num = 0 if(str != null){ num = str.toInt } val value: Int = u._2 val value1: Int = num+value // 更新redis中的統計數據 conn.set(u._1, value1.toString) conn.close() (u._1, value1) }) stated.foreach(println) // 更新偏移量offset for (o 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的sparkstreaming 读取mysql_SparkStreaming读取Kafka的两种方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ue查找文件中每行第二个单词_UI设计和
- 下一篇: python矩阵对角化_numpy创建单