Kafka:Kafka核心概念
1 消息系統簡介
1.1 為什么要用消息系統 ?
解耦 各位系統之間通過消息系統這個統一的接口交換數據,無須了解彼此的存在;
冗余 部分消息系統具有消息持久化能力,可規避消息處理前丟失的風險;
靈活性和消除峰值 在訪問量劇增的情況下,應用仍然需要繼續發揮作用,使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰;(節省資源)
可恢復性 系統中部分組件失效并不會影響整個系統,它恢復后仍然可從消息系統中獲取并處理數據;
順序保障 在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性;
異步通信 在不需要立即處理請求的場景下,可以將請求放入消息系統,合適的時候再處理。
1.2 有哪些消息系統 ?
RabbitMQ Erlang編寫,支持多協議 AMQP,XMPP,SMTP,STOMP。支持負載均衡、數據持久化。同時支持Peer-to-Peer和發布/訂閱模式;
Redis 基于Key-Value對的NoSQL數據庫,同時支持MQ功能,可做輕量級隊列服務使用。就入隊操作而言, Redis對短消息(小于10KB)的性能比RabbitMQ好,長消息的性能比RabbitMQ差;
ZeroMQ 輕量級,不需要單獨的消息服務器或中間件,應用程序本身扮演該角色,Peer-to-Peer。它實質上是 一個庫,需要開發人員自己組合多種技術,使用復雜度高;
ActiveMQ JMS實現,Peer-to-Peer,支持持久化、XA事務;
MetaQ/RocketMQ 純Java實現,發布/訂閱消息系統,支持本地事務和XA分布式事務;
Kafka 高性能跨語言的分布式發布/訂閱消息系統,數據持久化,全分布式,同時支持實時在線處理和離線數據處理。Apache Kafka相對于ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。
1.3 Kafka設計目標是什么?
高吞吐率 在廉價的商用機器上單機可支持每秒100萬條消息的讀寫;
消息持久化 所有消息均被持久化到磁盤,無消息丟失,支持消息重放;
完全分布式 Producer,Broker,Consumer均支持水平擴展,同時適應在線流處理和離線批處理。
2 Kafka簡介和架構
2.1 kafka架構
kafka是生產者生產消息、kafka集群、消費者獲取消息這樣一種架構,如下圖:
注意,還有zookeeper圖中未畫出。
2.2 kafka核心概念
(1)消息
消息是kafka中最基本的數據單元,其ProducerRecord如下所示:
public class ProducerRecord<K, V> {private final String topic;private final Integer partition;private final Headers headers;private final K key;private final V value;private final Long timestamp;//略... }其中key/value是producer發送數據時指定,key的主要作用是根據一定的策略,將此消息路由到指定的Partition中,這樣可以保證同一key的消息全部寫入同一分區中(key可以為null)。
(2)Topic & 分區 & Log
Topic:存儲消息的邏輯概念,可以看作是一個消息集合。每個Topic可以有多個生產者向其中push消息,也可以任意多個消費者消費消息。
Partition:每個Topic可以劃分成多個分區,同一Topic下的不同分區包含的消息是不同的。一個消息被添加到Topic時,會分配唯一的一個offset,Kafka通過offset保證消息在分區內時順序的。即:Kafka保證一個分區內的消息是有序的;同一Topic的多個分區的消息,Kafka并不保證其順序性,如下圖:
注:同一Topic的不同分區會分配在不同Broker上,分區是Kafka水平擴展性的基礎。
Log:分區在邏輯上對應一個Log,當生產者將消息寫入分區時,實際就是寫入到對應的Log中。
Log是邏輯概念,對應到磁盤中的一個文件夾。Log是由多個Segment組成的,每個Segment對應一個日志文件和索引文件,注意Segment的大小是由限制的,當超過限制后會產生新的Segment。注意的是:Kafka采取的是順序磁盤IO,所以只允許向最新的Segment追加數據。索引文件采用稀疏索引的方式,運行時會將其映射到內存,提高索引速度。
(3)Broker
一個單獨的Kafka server就是一個Broker,主要工作是接收生產者發送的消息,分配offset,之后保存到磁盤中;同時,接收消費者、其他Broker的請求,根據請求類型進行相應處理并返回響應。
(4)Producer
主要工作是生產消息,將消息按照一定的規則推送到Topic的分區中。如:根據消息key的Hash值選擇分區、或者按序輪詢全部分區。
(5)Consumer
主要工作是從Topic拉取消息進行消費。某個消費者消費Partition的哪個位置(offset)是由Consumer自己維護的。
這么設計的目的:
- 避免KafkaServer端維護消費者消費位置的開銷;
- 防止KafkaSever端出現延遲或者消費狀態丟失時,影響大量的Consumer;
- 提高了Consumer的靈活性,Consumer可以修改消費位置對某些消息反復消費。
(6)Consumer Group
Kafka中可以讓多個Consumer組成一個 Consumer Group(下面簡稱CG),一個Consumer只能屬于一個CG。Kafka保證其訂閱的Topic的每個Partition只會分配給此CG的一個消費者進行處理。如果不同CG訂閱了同一個Topic,CG之間是不會互相影響的。
CG可以實現**“獨占”和“廣播”**模式的消息處理。
“獨占”:即實現一個消息只被一個消費者消費的效果,則將每個Consumer單獨放入一個CG中。
“廣播”:即實現一個消息被多個消費者消費的效果,則將所有消費者放在一個CG中。
Kafka還通過CG實現了Consumer的水平擴展和故障轉移。
“水平擴展”:如上圖,當Consumer3能力不足以處理兩個分區時,可以向CG添加一個Consumer4,并觸發Rebalance重新分配分區與消費者的對應關系,實現水平擴展,是Consumer4對Partition3進行消費。
“故障轉移”:若此時,Consumer4宕機了,CG又會重新分配分區,Consumer3將會接管Consumer4的分區。
注意:CG中的消費者數量不是越多越好,當 CG消費者數量 > 分區數量 時,將會造成消費者的浪費。
(7)副本
Kafka對消息進行了冗余備份,每個分區可以有多個副本,每個副本包含的消息是一樣的。(同一時刻,副本之間其實并不完全一樣)
每個分區的副本集合有兩種角色:一個leader副本、多個follower副本。kafka在不同的場景下會采用不同的選舉策略。所有的讀寫請求都由選舉出的leader提供服務,其他都作為follower副本,如下圖所示:
follower副本僅僅只是把leader副本數據拉取到本地后,同步更新到自己的Log中。
一般情況下,同一分區的多個副本是被分到不同Broker上的,這樣當leader所在的Broker宕機后,可以重新選舉新的leader繼續對外提供服務。
(8)保留策略 & 日志壓縮
無論消費者是否消費過消息,Kafka為了保證磁盤不被占滿,會配置相應的“保留策略”,以實現周期性地刪除陳舊的消息。
kafka有2種保留策略:
kafka會啟動一個后臺線程,定期檢查是否有可以刪除的消息。“保留策略”可以有全局配置,也可以針對某個Topic覆蓋全局配置。
“日志壓縮”:有些場景下,用戶只關心key對應的最新value值,這是就可以開啟其日志壓縮功能,會啟動一個線程,定期將相同key的消息合并,只保留最新的value。如下所示:
(9)Cluster & Controller
多個Broker構成一個Cluster(集群)對外提供服務,每個集群會選取一個Broker來擔任Controller。
Controller職責:管理分區的狀態、管理每個分區的副本狀態、監聽Zookeeper中數據的變化等工作。
其他Broker:監聽Controller Leader的狀態。
當Controller出現故障時會重新選取Controller Leader。
(10)ISR集合
ISR是In-Sync Replica的縮寫,ISR集合表示的是 **目前“可用”(alive)**且 消息量與Leader相差不多的副本集合。ISR集合中的副本必須滿足下面兩個條件:
每個分區的leader副本會維護此分區的ISR集合,會將違反上面兩個條件的副本踢出ISR集合外。
(11)HW & LEO
HW(HightWatermark,水位線)標記了一個特殊的offset,消費者處理消息的時候,HW之后的消息對于消費者是不可見的。HW也是由leader副本管理的。
Kafka官網將HW之前的消息狀態稱為“commit”,此時就算leader副本損壞了,也不會造成HW之前的數據丟失。當ISR集合中全部的Follower副本都拉取HW指定消息進行同步后,Leader副本會遞增HW。
LEO(Log End Offset)是所有副本都會有的一個offset標記,它指向當前副本的最后一個消息的offset。
現在考慮kafka為什么要這樣設計?
在分布式存儲中,冗余備份一般有兩種方案:同步復制 和 異步復制。
**同步復制:**要求所有Follower副本全部復制完,這條消息才會被認為提交成功。此時若有一個副本出現故障,會導致HW無法完成遞增,消息無法提交,故障的Follower副本就會拖慢系統性能,甚至造成不可用。
**異步復制:**Leader副本收到生產者推送的消息,就會認為消息提交成功。Follower副本異步地從Leader副本中同步消息,這可能會造成Follower副本的消息量總是遠遠落后于Leader副本。
**Kafka怎么解決的?**kafka權衡了上述兩種策略,引入了ISR集合的概念,當Follower副本延遲過高時,Follower副本被踢出ISR集合,使得消息依然能快速被提交。
- 可以通過從ISR集合中踢出高延遲的Follower副本,避免高延遲副本影響集群性能;
- 當Leader副本宕機時,kafka會優先將ISR集合中的Follower副本選舉為Leader副本,新副本包含了HW之前的全部消息,從而避免消息丟失。
注意:Follower副本更新消息時采用的是批量寫磁盤,加速了磁盤IO,極大減少了Follower與Leader的差距。
2.3 zookeeper在kafka的作用
其在Kafka的作用有:
-
Broker注冊
Zookeeper上會有一個專門用來進行Broker服務器列表記錄的節點:/brokers/ids。每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創建屬于自己的節點,如/brokers/ids/[0…N]。不同的Broker必須使用不同的Broker ID進行注冊,創建完節點后,每個Broker就會將自己的IP地址和端口信息記錄到該節點中去。其中,Broker創建的節點類型是臨時節點,一旦Broker宕機,則對應的臨時節點也會被自動刪除。
-
Topic注冊
在Kafka中,同一個Topic的消息會被分成多個分區并將其分布在多個Broker上,這些分區信息及與Broker的對應關系也都是由Zookeeper在維護,由專門的節點來記錄,如:/borkers/topics。Broker服務器啟動后,會到對應Topic節點(/brokers/topics)上注冊自己的Broker ID并寫入針對該Topic的分區總數,如/brokers/topics/login/3->2,這個節點表示Broker ID為3的一個Broker服務器,對于"login"這個Topic的消息,提供了2個分區進行消息存儲,同樣,這個分區節點也是臨時節點。
-
生產者負載均衡
由于同一個Topic消息會被分區并將其分布在多個Broker上,因此,生產者需要將消息合理地發送到這些分布式的Broker上,那么如何實現生產者的負載均衡,Kafka支持傳統的四層負載均衡,也支持Zookeeper方式實現負載均衡。
(1) 四層負載均衡,根據生產者的IP地址和端口來為其確定一個相關聯的Broker。通常,一個生產者只會對應單個Broker,然后該生產者產生的消息都發往該Broker。這種方式邏輯簡單,每個生產者不需要同其他系統建立額外的TCP連接,只需要和Broker維護單個TCP連接即可。但是,其無法做到真正的負載均衡,因為實際系統中的每個生產者產生的消息量及每個Broker的消息存儲量都是不一樣的,如果有些生產者產生的消息遠多于其他生產者的話,那么會導致不同的Broker接收到的消息總數差異巨大,同時,生產者也無法實時感知到Broker的新增和刪除。
(2) 使用Zookeeper進行負載均衡,由于每個Broker啟動時,都會完成Broker注冊過程,生產者會通過該節點的變化來動態地感知到Broker服務器列表的變更,這樣就可以實現動態的負載均衡機制。
-
消費者負載均衡
與生產者類似,Kafka中的消費者同樣需要進行負載均衡來實現多個消費者合理地從對應的Broker服務器上接收消息,每個消費者分組包含若干消費者,每條消息都只會發送給分組中的一個消費者,不同的消費者分組消費自己特定的Topic下面的消息,互不干擾。
-
記錄 分區 與 消費者組 的關系
在Kafka中,規定了每個消息分區 只能被同組的一個消費者進行消費,因此,需要在 Zookeeper 上記錄 消息分區 與 Consumer 之間的關系,每個消費者一旦確定了對一個消息分區的消費權力,需要將其Consumer ID 寫入到 Zookeeper 對應消息分區的臨時節點上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一個 消息分區 的標識,節點內容就是該 消息分區 上 消費者的Consumer ID。
-
offset的記錄
在消費者對指定消息分區進行消息消費的過程中,需要定時地將分區消息的消費進度Offset記錄到Zookeeper上,以便在該消費者進行重啟或者其他消費者重新接管該消息分區的消息消費后,能夠從之前的進度開始繼續進行消息消費。Offset在Zookeeper中由一個專門節點進行記錄,其節點路徑為:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
節點內容就是Offset的值。
-
消費者注冊
消費者服務器在初始化啟動時加入消費者分組的步驟如下
注冊到消費者分組。每個消費者服務器啟動時,都會到Zookeeper的指定節點下創建一個屬于自己的消費者節點,例如/consumers/[group_id]/ids/[consumer_id],完成節點創建后,消費者就會將自己訂閱的Topic信息寫入該臨時節點。
對 消費者分組 中的 消費者 的變化注冊監聽。每個 消費者 都需要關注所屬 消費者分組 中其他消費者服務器的變化情況,即對/consumers/[group_id]/ids節點注冊子節點變化的Watcher監聽,一旦發現消費者新增或減少,就觸發消費者的負載均衡。
對Broker服務器變化注冊監聽。消費者需要對/broker/ids/[0-N]中的節點進行監聽,如果發現Broker服務器列表發生變化,那么就根據具體情況來決定是否需要進行消費者負載均衡。
進行消費者負載均衡。為了讓同一個Topic下不同分區的消息盡量均衡地被多個 消費者 消費而進行 消費者 與 消息 分區分配的過程,通常,對于一個消費者分組,如果組內的消費者服務器發生變更或Broker服務器發生變更,會發出消費者負載均衡。
Kafka的zookeeper存儲結構如下:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-tYv7k0ac-1578199803523)(assets/zookeeper在kafka的作用-1534754742260.png)]
2.4 kafka高性能的原因
(1)高效使用磁盤
-
Kafka的整個設計中,Partition相當于一個非常長的數組,而Broker接收到的所有消息順序寫入這個大數組中。同時Consumer通過Offset順序消費這些數據,并且不刪除已經消費的數據,從而避免了隨機寫磁盤的過程。
-
Kafka順序存寫數據,故刪除時刪除對應的Segment(物理文件,disk),避免對文件的隨機寫操作。
-
充分利用了頁緩存PageCache。
-
支持多DIsk Drive。Broker的log.dirs配置項,允許配置多個文件夾。如果機器上有多個Disk Drive,可將不同的Disk掛載到不同的目錄,然后將這些目錄都配置到log.dirs里。Kafka會盡可能將不同的Partition分配到不同的目錄,也即不同的Disk上,從而充分利用了多Disk的優勢。
(2)零拷貝技術
Kafka中存在大量的網絡數據持久化到磁盤(Producer到Broker)和磁盤文件通過網絡發送(Broker到Consumer)的過程。這一過程的性能直接影響Kafka的整體吞吐量。對比傳統模式的拷貝來看看kafka如何實現零拷貝
傳統模式下的四次拷貝與四次上下文切換
以將磁盤文件通過網絡發送為例。傳統模式下,一般使用如下偽代碼所示的方法先將文件數據讀入內存,然后通過Socket將內存中的數據發送出去。
buffer = File.read Socket.send(buffer)這一過程實際上發生了四次數據拷貝。首先通過系統調用將文件數據讀入到內核態Buffer(DMA拷貝),然后應用程序將內存態Buffer數據讀入到用戶態Buffer(CPU拷貝),接著用戶程序通過Socket發送數據時將用戶態Buffer數據拷貝到內核態Buffer(CPU拷貝),最后通過DMA拷貝將數據拷貝到NIC Buffer(網卡緩沖)。同時,還伴隨著四次上下文切換,如下圖所示。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-TDx7TZNY-1578199803524)(assets/BIO 四次拷貝 四次上下文切換.png)]
sendfile和transferTo實現零拷貝
Linux 2.4+內核通過sendfile系統調用,提供了零拷貝。數據通過DMA拷貝到內核態Buffer后,直接通過DMA(Direct Memory Access,直接內存存取)拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少數據拷貝外,因為整個讀文件-網絡發送由一個sendfile調用完成,整個過程只有兩次上下文切換,因此大大提高了性能。零拷貝過程如下圖所示。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-iXtFuD1p-1578199803524)(assets/BIO 零拷貝 兩次上下文切換.png)]
從具體實現來看,Kafka的數據傳輸通過TransportLayer來完成,其子類PlaintextTransportLayer通過Java NIO的FileChannel的transferTo和transferFrom方法實現零拷貝,如下所示。
@Override public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {return fileChannel.transferTo(position, count, socketChannel); }注: transferTo和transferFrom并不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統相關,如果操作系統提供sendfile這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則并不能通過這兩個方法本身實現零拷貝。
(3)減少網絡開銷
批處理
批處理是一種常用的用于提高I/O性能的方式。對Kafka而言,批處理既減少了網絡傳輸的Overhead,又提高了寫磁盤的效率。
Kafka 0.8.1及以前的Producer區分同步Producer和異步Producer。同步Producer的send方法主要分兩種形式。一種是接受一個KeyedMessage作為參數,一次發送一條消息。另一種是接受一批KeyedMessage作為參數,一次性發送多條消息。而對于異步發送而言,無論是使用哪個send方法,實現上都不會立即將消息發送給Broker,而是先存到內部的隊列中,直到消息條數達到閾值或者達到指定的Timeout才真正的將消息發送出去,從而實現了消息的批量發送。
Kafka 0.8.2開始支持新的Producer API,將同步Producer和異步Producer結合。雖然從send接口來看,一次只能發送一個ProducerRecord,而不能像之前版本的send方法一樣接受消息列表,但是send方法并非立即將消息發送出去,而是通過batch.size和linger.ms控制實際發送頻率,從而實現批量發送。
由于每次網絡傳輸,除了傳輸消息本身以外,還要傳輸非常多的網絡協議本身的一些內容(稱為Overhead),所以將多條消息合并到一起傳輸,可有效減少網絡傳輸的Overhead,進而提高了傳輸效率。
數據壓縮降低網絡負載
Kafka從0.7開始,即支持將數據壓縮后再傳輸給Broker。除了可以將每條消息單獨壓縮然后傳輸外,Kafka還支持在批量發送時,將整個Batch的消息一起壓縮后傳輸。數據壓縮的一個基本原理是,重復數據越多壓縮效果越好。因此將整個Batch的數據一起壓縮能更大幅度減小數據量,從而更大程度提高網絡傳輸效率。
Broker接收消息后,并不直接解壓縮,而是直接將消息以壓縮后的形式持久化到磁盤。Consumer Fetch到數據后再解壓縮。因此Kafka的壓縮不僅減少了Producer到Broker的網絡傳輸負載,同時也降低了Broker磁盤操作的負載,也降低了Consumer與Broker間的網絡傳輸量,從而極大得提高了傳輸效率,提高了吞吐量。
(4)高效的序列化方式
Kafka消息的Key和Payload(或者說Value)的類型可自定義,只需同時提供相應的序列化器和反序列化器即可。因此用戶可以通過使用快速且緊湊的序列化-反序列化方式(如Avro,Protocal Buffer)來減少實際網絡傳輸和磁盤存儲的數據規模,從而提高吞吐率。這里要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。
參考:
https://www.jianshu.com/p/a036405f989c
https://www.jianshu.com/p/eb75372df00a
總結
以上是生活随笔為你收集整理的Kafka:Kafka核心概念的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka:分布式消息队列的抽象模型
- 下一篇: Kafka:Zero-Copy零拷贝