集群管理工具KafkaAdminClient——改造
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-admin-client-2-reuse/
前文概述
在上一篇文章《集群管理工具KafkaAdminClient——原理與示例》中講述了KafkaAdminClient的功能以及相應的原理,但是同時也提出了目前的KafkaAdminClient并沒有非常的完善,還有許多功能還需要去豐富,這些功能可以自定義實現,在《如何獲取Kafka的消費者詳情——從Scala到Java的切換》一文中介紹了如何獲取Kafka的消費詳情,其原理是通過Java調用Kafka的Scala代碼實現的,如果要使用純Java的方式實現就需要用到了KafkaAdminClient,另外Scala版的AdminClient也被標注為:“This client is deprecated, and will be replaced by KafkaAdminClient.”,說明官方也推薦使用KafkaAdminClient。不過現在的版本(目前最新1.1.0)并沒有提供類似describeConsumerGroup和listGroupOffsets的方法實現,這一點在前文《集群管理工具KafkaAdminClient——原理與示例》也有提及,所以如果要實現獲取類似消費者詳情的功能,那么就需要自己動手進行改造。
改造
參考Scala版的AdminClient,要實現獲取Kafka的消費者詳情的功能首先需要實現describeConsumerGroup和listGroupOffsets的方法,其中describeConsumerGroup方法內部還需要一個findCoordinator的方法用來提供消費者對應的coodinator節點,以便提供詳細的消費者詳情。describeConsumerGroup、listGroupOffsets和findCoordinator這三個方法都將在KafkaAdminClient類里提供自定義實現。KafkaAdminClient和XXXOptions、XXXResult的類都位于org.apache.kafka.clients.admin包下,筆者也將擴展的類也置于其同一包下,不過也進行了一些區分,如下圖所示,新加入的XXXOptions、XXXResult類放入extend下,新加入的JavaBean放入model下,然后與具體應用功能對應的放在app下:
首先建立對應的XXXOptions、XXXResult類,就那簡單的ListGroupOffsets來說,其ListGroupOffsetsOptions只是繼承了AbstractOptions的空實現,而ListGroupOffsetsResult也很簡單,提供了一個KafkaFuture的調用,代碼參考如下:
public class ListGroupOffsetsResult {private final KafkaFutureImpl<Map<TopicPartition, Long>> future;public ListGroupOffsetsResult(KafkaFutureImpl<Map<TopicPartition, Long>> future) {this.future = future;}public KafkaFutureImpl<Map<TopicPartition, Long>> values(){return this.future;} }model目錄下的ConsumerGroupSummary是所要實現的describeConsumerGroup方法中所要獲取的值類型,封裝在DescribeConsumerGroupResult 中;ConsumerSummary在describeConsumerGroup方法內部使用,用來封裝消費狀態,包括consumerId、clientId、host(消費者主機)以及TopicPartition列表,最終被封裝進ConsumerGroupSummary中。PartitionAssignmentState是服務于KafkaConsumerGroupService的,用來最后顯示消費者詳情列表。
KafkaAdminClient的父類是AdminClient(kafka-client中的抽象類),describeConsumerGroup、listGroupOffsets和findCoordinator這三個方法也需要在AdminClient類中做申明,詳細參考如下:
public abstract DescribeConsumerGroupResult describeConsumerGroup(final String group,final DescribeConsumerGroupOptions options); public DescribeConsumerGroupResult describeConsumerGroup(final String group) {return describeConsumerGroup(group, new DescribeConsumerGroupOptions()); } public abstract FindCoordinatorResult findCoordinator(final String group,final FindCoordinatorOptions options); public FindCoordinatorResult findCoordinator(final String group) {return findCoordinator(group, new FindCoordinatorOptions()); } public abstract ListGroupOffsetsResult listGroupOffsets(final String group,final ListGroupOffsetsOptions options); public ListGroupOffsetsResult listGroupOffsets(final String group){return listGroupOffsets(group, new ListGroupOffsetsOptions()); }在前面2篇文章《集群管理工具KafkaAdminClient——原理與示例》和《如何獲取Kafka的消費者詳情——從Scala到Java的切換》中都詳細解釋了describeConsumerGroup、listGroupOffsets方法,所以這里不在贅述,具體實現也很簡單,可以參考筆者的實現。
最后來講述一下org.apache.kafka.clients.admin.app包下的KafkaConsumerGroupService,具體代碼地址在這里,其內部通過上面改造的KafkaAdminClient和KafkaConsumer來實現,其內部邏輯和《如何獲取Kafka的消費者詳情——從Scala到Java的切換》一文中的KafkaConsumerGroupCustomService一樣,這里就不在贅述了。
本篇以及《Kafka的Lag計算誤區及正確實現》、《如何獲取Kafka的消費者詳情——從Scala到Java的切換》這三篇文章都是圍繞如何獲取消費者詳情來做具體的陳述,回到問題的初衷:kafka.admin.ConsumerGroupCommand.PartitionAssignmentState無法被外部訪問,那么真的需要這么復雜的轉變過程么,詳細請參考下一篇《Scala與Java語言的互操作》。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-admin-client-2-reuse/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的集群管理工具KafkaAdminClient——改造的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 集群管理工具KafkaAdminClie
- 下一篇: Kafka解析之topic创建(3)——