Kafka解析之topic创建(3)——合法性验证
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/
前文摘要
在《Kafka解析之Topic創(chuàng)建(1)》這篇文章中,我們講述了創(chuàng)建Topic的方式有兩種:
在學習了KafkaAdminClient之后我們發(fā)現(xiàn)它也可以用來創(chuàng)建Topic,即通過發(fā)送CreateTopicsRequest請求的方式來創(chuàng)建。KafkaAdminClient的詳細內容可以參考:《集群管理工具KafkaAdminClient——原理與示例》和《集群管理工具KafkaAdminClient——改造》。
一般情況下,Kafka生產環(huán)境中的 auto.create.topics.enable參數(shù)會被修改為false,即自動創(chuàng)建Topic這條路會被堵住。kafka-topics.sh腳本創(chuàng)建的方式一般由運維人員操作,普通用戶無權過問。那么KafkaAdminClient就為普通用戶提供了一個口子,或者將其集成到公司內部的資源申請、審核系統(tǒng)中更加的方便。普通用戶在創(chuàng)建Topic的時候,有可能由于誤操作或者其他原因而創(chuàng)建了不符合運維規(guī)范的Topic,比如命名不規(guī)范,副本因子數(shù)太低等,這些都會影響后期的系統(tǒng)運維。如果創(chuàng)建Topic的操作是封裝在資源申請、審核系統(tǒng)中的話,那么可以在前端就可以根據(jù)規(guī)則過濾掉不符合規(guī)范的申請操作。然而如果用戶就是用了KafkaAdminClient或者類似的工具來創(chuàng)建了一個錯誤的Topic,我們有什么辦法可以做相應的規(guī)范處理呢?
在Kafka服務端中提供了這樣一個參數(shù):create.topic.policy.class.name,其提供了一個入口用來驗證Topic創(chuàng)建的合法性。使用方式是自定義實現(xiàn)org.apache.kafka.server.policy.CreateTopicPolicy接口,比如下面的PolicyDemo,然后在kafka broker中的config/server.properties配置文件中配置參數(shù)create.topic.policy.class.name=org.apache.kafka.server.policy.PolicyDemo,然后啟動Kafka服務即可。PolicyDemo的代碼參考如下,主要實現(xiàn)接口中的configure、close以及validate方法,configure方法會在Kafka服務啟動的時候執(zhí)行,validate方法用來鑒定Topic參數(shù)的合法性,其在創(chuàng)建Topic的時候執(zhí)行,close方法在關閉Kafka服務的時候執(zhí)行。
public class PolicyDemo implements CreateTopicPolicy{public void configure(Map<String, ?> configs) {}public void close() throws Exception {}public void validate(RequestMetadata requestMetadata)throws PolicyViolationException {if(requestMetadata.numPartitions()!=null || requestMetadata.replicationFactor()!=null){if(requestMetadata.numPartitions()< 5){throw new PolicyViolationException("Topic should have at least 5 partitions, received: "+ requestMetadata.numPartitions());}if(requestMetadata.replicationFactor()<= 1){throw new PolicyViolationException("Topic should have at least 2 replication factor, recevied: "+ requestMetadata.replicationFactor());}}}}采用文章《集群管理工具KafkaAdminClient——原理與示例》中的所提及的關于KafkaAdminClient來創(chuàng)建Topic,測試代碼如下,創(chuàng)建一個分區(qū)數(shù)為4,副本數(shù)為1的Topic:
@Test public void createTopics() {NewTopic newTopic = new NewTopic(NEW_TOPIC,4, (short) 1);Collection<NewTopic> newTopicList = new ArrayList<>();newTopicList.add(newTopic);CreateTopicsResult result = adminClient.createTopics(newTopicList);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} }測試結果如期報錯:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4相應的Kafka服務端的日志如下:
CreateTopicPolicy.RequestMetadata(topic=topic-test2, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={}) [2018-04-18 19:52:02,747] INFO [Admin Manager on Broker 0]: Error processing create topic request for topic topic-test2 with arguments (numPartitions=4, replicationFactor=1, replicasAssignments={}, configs={}) (kafka.server.AdminManager) org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4客戶端向Kafka服務端發(fā)送了CreateTopicsRequest請求之后,會經過KafkaApis:
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)然后調用handleCreateTopicsRequest()方法,Topic最終在服務端的創(chuàng)建是在AdminManager中的createTopics方法中實現(xiàn)的。而CreateTopicPolicy的作用域也限定在這個createTopics方法之內,故只有通過CreateTopicsRequest請求的方式才能促使CreateTopicPolicy有效,而對于類似于kafka-topics.sh腳本的創(chuàng)建方式無效。不過在正文開頭就提及了在運維規(guī)范的情況下,一般是通過KafkaAdminClient進行操作,或者更加規(guī)范的話直接通過申請頁面來創(chuàng)建,這樣就可以在前端規(guī)避風險,這樣顯得更加的專業(yè)。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生
總結
以上是生活随笔為你收集整理的Kafka解析之topic创建(3)——合法性验证的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 集群管理工具KafkaAdminClie
- 下一篇: 再看Kafka Lag