集群管理工具KafkaAdminClient——原理与示例
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-admin-client-1-principles-and-demos/
前言
一般情況下,我們都習慣使用Kafka中bin目錄下的腳本工具來管理查看Kafka,但是有些時候需要將某些管理查看的功能集成到系統(比如Kafka Manager)中,那么就需要調用一些API來直接操作Kafka了。在Kafka0.11.0.0版本之前,可以通過kafka-core包(Kafka的服務端代碼,采用Scala編寫)下的AdminClient和AdminUtils來實現部分的集群管理操作,比如筆者之前在Kafka解析之topic創建(1)和Kafka解析之topic創建(2)兩篇文章中所講解的Topic的創建就用到了AdminUtils類。而在Kafka0.11.0.0版本之后,又多了一個AdminClient,這個是在kafka-client包下的,這是一個抽象類,具體的實現是org.apache.kafka.clients.admin.KafkaAdminClient,這個就是本文所要陳述的重點了。
功能與原理介紹
在Kafka官網中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):
其內部原理是使用Kafka自定義的一套二進制協議來實現,詳細可以參見Kafka協議。主要實現步驟:
和協議有關的請求和回執的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執類的兩個基本父類。
示例
下面就以創建Topic來舉一個簡單的KafkaAdminClient的使用案例,【代碼清單1】:
private static final String NEW_TOPIC = "topic-test2"; private static final String brokerUrl = "localhost:9092";private static AdminClient adminClient;@BeforeClass public static void beforeClass(){Properties properties = new Properties();properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);adminClient = AdminClient.create(properties); }@AfterClass public static void afterClass(){adminClient.close(); }@Test public void createTopics() {NewTopic newTopic = new NewTopic(NEW_TOPIC,4, (short) 1);Collection<NewTopic> newTopicList = new ArrayList<>();newTopicList.add(newTopic);adminClient.createTopics(newTopicList); }示例中的createTopics()方法就創建了一個分區數為4,副本因子為1的“topic-test2”的Topic。
代碼剖析
下面來詳細介紹一下KafkaAdminClient中現有的listTopics()方法(這個方法的實現相對干凈利落,代碼量少、易于講解)的實現方式,以便可以了解KafkaAdminClient中的大體脈絡。listTopics()方法的具體代碼如【代碼清單2】所示:
public ListTopicsResult listTopics(final ListTopicsOptions options) {final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();final long now = time.milliseconds();runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),new LeastLoadedNodeProvider()) {@OverrideAbstractRequest.Builder createRequest(int timeoutMs) {return MetadataRequest.Builder.allTopics();}@Overridevoid handleResponse(AbstractResponse abstractResponse) {MetadataResponse response = (MetadataResponse) abstractResponse;Cluster cluster = response.cluster();Map<String, TopicListing> topicListing = new HashMap<>();for (String topicName : cluster.topics()) {boolean internal = cluster.internalTopics().contains(topicName);if (!internal || options.shouldListInternal())topicListing.put(topicName, new TopicListing(topicName, internal));}topicListingFuture.complete(topicListing);}@Overridevoid handleFailure(Throwable throwable) {topicListingFuture.completeExceptionally(throwable);}}, now);return new ListTopicsResult(topicListingFuture); }listTopics()方法接收一個ListTopicsOptions類型的參數,KafkaAdminClient中基本所有的應用類方法都有一個類似XXXOptions類型的參數,這個類型一般只包含timeoutMs這個成員變量,用來設定請求的超時時間,如果沒有指定則使用默認的request.timeout.ms參數值,即30000ms。就拿查詢Topic信息所對應的DescribeTopicsOptions來說,其就包含一個timeoutMs參數,具體如【代碼清單3】所示:
public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {} public abstract class AbstractOptions<T extends AbstractOptions> {private Integer timeoutMs = null;@SuppressWarnings("unchecked")public T timeoutMs(Integer timeoutMs) {this.timeoutMs = timeoutMs;return (T) this;}public Integer timeoutMs() {return timeoutMs;} }不過ListTopicsOptions擴展了一個成員變量listInternal,用來指明是否需要羅列內部Topic,比如在Kafka解析之topic創建(1)中提及的“__consumer_offsets”和“transaction_state”就是兩個內部Topic。ListTopicsOptions的代碼如【代碼清單4】所示:
public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {private boolean listInternal = false;public ListTopicsOptions listInternal(boolean listInternal) {this.listInternal = listInternal;return this;}public boolean shouldListInternal() {return listInternal;} }listInternal的值默認為false,如果同時要羅列出目前的內部Topic的話就需要將這個listInternal設置為true,示例代碼如【代碼清單5】所示:
@Test public void listTopicsIncludeInternal() throws ExecutionException, InterruptedException {ListTopicsOptions listTopicsOptions = new ListTopicsOptions();listTopicsOptions.listInternal(true);ListTopicsResult result = adminClient.listTopics(listTopicsOptions);Collection<TopicListing> list = result.listings().get();System.out.println(list); }接下去繼續講解listTopics()方法,其返回值為ListTopicResult類型。與ListTopicsOptions對應,KafkaAdminClient中基本所有的應用類方法都有一個類似XXXResult類型的返回值,其內部一般包含一個KafkaFuture,用于異步發送請求之后等待操作結果。KafkaFuture實現了Java中的Future接口,用來支持鏈式調用以及其他異步編程模型,可以看成是Java8中CompletableFuture的一個小型版本,其中也有類似thenApply、complete、completeExceptionally的方法。
再來看代碼清單2中的 runnable.call(new Call(“listTopics”, calcDeadlineMs(now, options.timeoutMs()),new LeastLoadedNodeProvider()) 這行代碼,runnable的類型是AdminClientRunnable,其是KafkaAdminClient負責處理與服務端交互請求的服務線程。AdminClientRunnable中的call方法用作入隊一個Call請求,進而對其處理。Call請求代表與服務端的一次請求交互,比如listTopics和createTopics都是一次Call請求,AdminClientRunnable線程負責處理這些Call請求。
Call類是一個抽象類,構造方法接收三個參數:本次請求的名稱callName、超時時間deadlineMs、以及節點提供器nodeProvider。nodeProvider是NodeProvider類型,用來提供本次請求所交互的Broker節點。Call類中還有3個抽象方法:createRequest()、handleResponse()、handleFailure(),分別用來創建請求、處理回執和處理失敗。在代碼清單2中,對于listTopics()方法而言,其內部原理就是發送MetadataRequest請求然后處理MetadataResponse,其處理邏輯峰封裝在createRequest()、handleResponse()、handleFailure()這三個方法之中了。
綜上,如果要自定義實現一個功能,只需要三個步驟:
KafkaAdminClient目前而言尚未形成一個完全體,里面還可以擴展很多功能,就拿上一篇文章《如何獲取Kafka的消費者詳情——從Scala到Java的切換》中介紹的而言,目前KafkaAdminClient尚未實現describeConsumerGroup和listGroupOffsets的功能,所以需要進一步的升級改造。篇幅限制,這部分內容將在下一篇文章進行介紹,如果想要先睹為快,可以參考下代碼實現,詳細的邏輯解析敬請期待….
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-admin-client-1-principles-and-demos/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的集群管理工具KafkaAdminClient——原理与示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何获取Kafka的消费者详情——从Sc
- 下一篇: 集群管理工具KafkaAdminClie