javascript
第三集 Spring for Apache Kafka 接受消息
我們可以接受消息通過配置一個MessageListenerContainer 和提供一個消息監聽或者通過使用@KafkaListener 注解
3.1 Message Listeners
當我們使用一個消息監聽容器的時候,我們必須提供一個監聽來接受數據。
當前有八種支持消息監聽的接口,以下是這些接口列表:
使用自動提交或其中一個容器管理的提交方法時,使用此接口處理從Kafka使用者poll()操作接收的各個ConsumerRecord實例。
public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);}使用其中一種手動提交方法時,使用此接口處理從Kafka使用者poll()操作接收的各個ConsumerRecord實例。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);}使用自動提交或其中一個容器管理的提交方法時,使用此接口處理從Kafka使用者poll()操作接收的各個ConsumerRecord實例。 提供對Consumer對象的訪問。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);}使用其中一種手動提交方法時,使用此接口處理從Kafka使用者poll()操作接收的各個ConsumerRecord實例。 提供對Consumer對象的訪問
public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data);}使用自動提交或其中一個容器管理的提交方法時,使用此接口處理從Kafka使用者poll()操作接收的所有ConsumerRecord實例。 使用此接口時不支持AckMode.RECORD,因為將為偵聽器提供完整的批處理。
public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);}使用其中一種手動提交方法時,使用此接口處理從Kafka使用者poll()操作接收的所有ConsumerRecord實例。
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);}使用自動提交或其中一個容器管理的提交方法時的操作。 使用此接口時不支持AckMode.RECORD,因為將為偵聽器提供完整的批處理。 提供對Consumer對象的訪問。
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);}使用其中一種手動提交方法時,使用此接口處理從Kafka使用者poll()操作接收的所有ConsumerRecord實例。 提供對Consumer對象的訪問。
Consumer對象不是線程安全的。 您只能在調用偵聽器的線程上調用其方法。
3.2 Message Listener Containers
提供了兩個消息監聽容器實現
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
這個KafkaMessageLisenterContainer 在一個線程中接受所有的消息從所有的主題中或者分區中。
這個ConcurrentMessageListenerContainer 代理接受一個或者多個KafkaMessageListenerContainer 實例通過提供多個線程。
3.2.1 使用KafkaMessageListenerContainer
構造方法如下所示:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties,TopicPartitionInitialOffset... topicPartitions)每個都采用ConsumerFactory和有關主題和分區的信息,以及ContainerProperties對象中的其他配置。 ConcurrentMessageListenerContainer(稍后描述)使用第二個構造函數跨消費者實例分發TopicPartitionInitialOffset。 ContainerProperties具有以下構造函數:
public ContainerProperties(TopicPartitionInitialOffset... topicPartitions)public ContainerProperties(String... topics)public ContainerProperties(Pattern topicPattern)第一個構造函數接受一個TopicPartitionInitialOffset參數數組,以顯式指示容器使用哪些分區(使用consumer assign()方法)和可選的初始偏移量。 正值是默認的絕對偏移量。 默認情況下,負值相對于分區中的當前最后一個偏移量。 提供了一個TopicPartitionInitialOffset的構造函數,它接受一個額外的布爾參數。 如果這是真的,則初始偏移(正或負)相對于該消費者的當前位置。 啟動容器時應用偏移量。 第二個采用一系列主題,Kafka根據group.id屬性分配分區 - 在整個組中分配分區。 第三個使用正則表達式模式來選擇主題。
要將MessageListener分配給容器,可以在創建Container時使用ContainerProps.setMessageListener方法。 以下示例顯示了如何執行此操作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); containerProps.setMessageListener(new MessageListener<Integer, String>() {... }); DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<Integer, String>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps); return container;有關可以設置的各種屬性的更多信息,請參閱Javadoc for ContainerProperties。
從版本2.1.1開始,可以使用名為logContainerConfig的新屬性。如果啟用了true和INFO日志記錄,則每個偵聽器容器都會寫入一條記錄其配置屬性的日志消息。
默認情況下,在DEBUG日志記錄級別執行主題偏移提交的日志記錄。從版本2.1.2開始,ContainerProperties中名為commitLogLevel的屬性允許您指定這些消息的日志級別。例如,要將日志級別更改為INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);.
從2.2版開始,添加了一個名為missingTopicsFatal的新容器屬性(默認值:true)。如果代理上不存在任何已配置的主題,則會阻止容器啟動。如果容器配置為偵聽主題模式(正則表達式),則不適用。以前,容器線程在consumer.poll()方法中循環,等待在記錄許多消息時顯示主題。除了日志之外,沒有跡象表明存在問題。要還原以前的行為,可以將該屬性設置為false。
3.2.2 使用ConcurrentMessageListenerContainer
單個構造函數類似于第一個KafkaListenerContainer構造函數。 以下清單顯示了構造函數的簽名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)它還具有并發屬性。 例如,container.setConcurrency(3)創建三個KafkaMessageListenerContainer實例。
對于第一個構造函數,Kafka使用其組管理功能在消費者之間分配分區。
收聽多個主題時,默認分區分發可能與您的預期不同。 例如,如果您有三個主題,每個主題有五個分區,并且您希望使用concurrency =
15,則只能看到五個活動使用者,每個主用戶分配一個分區,其他10個使用者處于空閑狀態。 這是因為默認的Kafka
PartitionAssignor是RangeAssignor(請參閱其Javadoc)。
對于這種情況,您可能需要考慮使用RoundRobinAssignor,它將分區分配給所有使用者。 然后,為每個使用者分配一個主題或分區。
要更改PartitionAssignor,可以在提供給DefaultKafkaConsumerFactory的屬性中設置partition.assignment.strategy使用者屬性(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。
使用Spring Boot時,您可以按如下方式分配策略:
spring.kafka.consumer.properties.partition.assignment.strategy=\ org.apache.kafka.clients.consumer.RoundRobinAssignor對于第二個構造函數,ConcurrentMessageListenerContainer在委托KafkaMessageListenerContainer實例中分發TopicPartition實例。
例如,如果提供了六個TopicPartition實例并且并發性為3; 每個容器有兩個分區。 對于五個TopicPartition實例,兩個容器獲得兩個分區,第三個獲得一個。 如果并發性大于TopicPartitions的數量,則調整并發性以使每個容器獲得一個分區。
client.id屬性(如果設置)附加-n,其中n是與并發相對應的使用者實例。 啟用JMX時,需要為MBean提供唯一的名稱。
從版本1.3開始,MessageListenerContainer提供對底層KafkaConsumer的度量的訪問。 對于ConcurrentMessageListenerContainer,metrics()方法返回所有目標KafkaMessageListenerContainer實例的度量標準。 指標分組到Map <MetricName,? 通過為底層KafkaConsumer提供的client-id擴展Metric>。
3.3 Committing Offsets 提交偏移量
提供了幾種用于提交偏移的選項。 如果enable.auto.commit使用者屬性為true,則Kafka會根據其配置自動提交偏移量。 如果為false,則容器支持多個AckMode設置(在下一個列表中描述)。
消費者poll()方法返回一個或多個ConsumerRecords。 為每條記錄調用MessageListener。 以下列表描述了容器為每個AckMode采取的操作:
-
RECORD:在處理記錄后偵聽器返回時提交偏移量。
-
BATCH:在處理poll()返回的所有記錄時提交偏移量。
-
TIME:處理poll()返回的所有記錄時的偏移量,只要超過自上次提交以來的ackTime。
-
COUNT:只要自上次提交后已收到ackCount記錄,就會在處理poll()返回的所有記錄時提交偏移量。
-
COUNT_TIME:類似于TIME和COUNT,但如果任一條件為真,則執行提交。
-
MANUAL:消息監聽器負責確認()確認。 之后,應用與BATCH相同的語義。
-
MANUAL_IMMEDIATE:在偵聽器調用Acknowledgment.acknowledge()方法時立即提交偏移量。
MANUAL和MANUAL_IMMEDIATE要求偵聽器是AcknowledgingMessageListener或BatchAcknowledgingMessageListener。
請參閱消息監聽器。
根據syncCommits容器屬性,使用使用者上的commitSync()或commitAsync()方法。
確認具有以下方法:
public interface Acknowledgment {void acknowledge();}此方法使偵聽器可以控制何時提交偏移。
Listener Container Auto Startup
偵聽器容器實現SmartLifecycle,默認情況下autoStartup為true。 容器在后期啟動(Integer.MAX-VALUE - 100)。 應該在早期階段啟動實現SmartLifecycle以處理來自偵聽器的數據的其他組件。 -100為以后的階段留出了空間,使組件能夠在容器之后自動啟動。
@KafkaListener 注解
@KafkaListener注釋用于將bean方法指定為偵聽器容器的偵聽器。 該bean包含在MessagingMessageListenerAdapter中,該MessagingMessageListenerAdapter配置有各種功能,例如轉換器以在必要時轉換數據以匹配方法參數。
您可以使用#{…}或屬性占位符($ {…})使用SpEL在注釋上配置大多數屬性。 有關更多信息,請參閱Javadoc。
Record Listeners
@KafkaListener注解為簡單的POJO偵聽器提供了一種機制。 以下示例顯示了如何使用它:
public class Listener {@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")public void listen(String data) {...}}此機制需要在其中一個@Configuration類和一個偵聽器容器工廠上使用@EnableKafka注釋,該工廠用于配置基礎ConcurrentMessageListenerContainer。 默認情況下,需要名為kafkaListenerContainerFactory的bean。 以下示例顯示如何使用ConcurrentMessageListenerContainer:
@Configuration @EnableKafka public class KafkaConfig {@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());...return props;} }請注意,要設置容器屬性,必須在工廠中使用getContainerProperties()方法。 它用作注入容器的實際屬性的模板。
從版本2.1.1開始,您現在可以為注釋創建的使用者設置client.id屬性。 clientIdPrefix以-n為后綴,其中n是表示使用并發時的容器編號的整數。
從2.2版開始,您現在可以通過使用注釋本身的屬性來覆蓋容器工廠的并發和autoStartup屬性。 屬性可以是簡單值,屬性占位符或SpEL表達式。 以下示例顯示了如何執行此操作:
@KafkaListener(id = "myListener", topics = "myTopic",autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}") public void listen(String data) {... }您還可以使用顯式主題和分區(以及可選的初始偏移量)配置POJO偵聽器。 以下示例顯示了如何執行此操作:
@KafkaListener(id = "thing2", topicPartitions ={ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))}) public void listen(ConsumerRecord<?, ?> record) {... }您可以在分區或partitionOffsets屬性中指定每個分區,但不能同時指定兩者。
使用手動AckMode時,您還可以向監聽器提供確認。 以下示例還說明了如何使用其他容器工廠。
@KafkaListener(id = "cat", topics = "myTopic",containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) {...ack.acknowledge(); }最后,可以從郵件頭中獲取有關郵件的元數據。 您可以使用以下標頭名稱來檢索郵件的標頭:
- KafkaHeaders.RECEIVED_MESSAGE_KEY
- KafkaHeaders.RECEIVED_TOPIC
- KafkaHeaders.RECEIVED_PARTITION_ID
- KafkaHeaders.RECEIVED_TIMESTAMP
- KafkaHeaders.TIMESTAMP_TYPE
以下示例顯示了如何使用標頭:
@KafkaListener(id = "qux", topicPattern = "myTopic1") public void listen(@Payload String foo,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {...Batch listeners
從1.1版開始,您可以配置@KafkaListener方法以接收從消費者調查中收到的整批消費者記錄。 要配置偵聽器容器工廠以創建批處理偵聽器,可以設置batchListener屬性。 以下示例顯示了如何執行此操作:
@Bean public KafkaListenerContainerFactory<?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<return factory; }以下示例顯示如何接收有效負載列表
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list) {... }主題,分區,偏移等在與有效負載并行的標頭中可用。 以下示例顯示了如何使用標頭:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,@Header(KafkaHeaders.OFFSET) List<Long> offsets) {... }或者,您可以在每條消息中接收帶有每個偏移量和其他詳細信息的消息<?>對象列表,但它必須是唯一的參數(除了可選的確認,使用手動提交時,和/或消費者<?,?> 參數)在方法上定義。 以下示例顯示了如何執行此操作:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory") public void listen14(List<Message<?>> list) {... }@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory") public void listen15(List<Message<?>> list, Acknowledgment ack) {... }@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory") public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {... }在這種情況下,不對有效載荷執行轉換。
如果BatchMessagingMessageConverter配置了RecordMessageConverter,您還可以向Message參數添加泛型類型并轉換有效負載。 有關詳細信息,請參閱使用批量偵聽器的有效負載轉換
您還可以接收ConsumerRecord <?,?>對象的列表,但它必須是該方法上定義的唯一參數(除了可選的確認,當使用手動提交和Consumer <?,?>參數時)。 以下示例顯示了如何執行此操作:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list) {... }@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {... }從2.2版開始,偵聽器可以接收poll()方法返回的完整ConsumerRecords <?,?>對象,讓偵聽器訪問其他方法,例如partitions()(返回列表中的TopicPartition實例)和記錄 (TopicPartition)(獲取選擇性記錄)。 同樣,這必須是方法上唯一的參數(除了可選的確認,當使用手動提交或消費者<?,?>參數時)。 以下示例顯示了如何執行此操作:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory") public void pollResults(ConsumerRecords<?, ?> records) {... }如果容器工廠配置了RecordFilterStrategy,則會忽略ConsumerRecords
<?,?>偵聽器,并發出WARN日志消息。 如果使用<List <?>>形式的偵聽器,則只能使用批量偵聽器過濾記錄。
注解屬性
從版本2.0開始,id屬性(如果存在)用作Kafka使用者group.id屬性,覆蓋使用者工廠中已配置的屬性(如果存在)。 您還可以顯式設置groupId或將idIsGroup設置為false以恢復使用使用者工廠group.id的先前行為。
您可以在大多數注釋屬性中使用屬性占位符或SpEL表達式,如以下示例所示:
@KafkaListener(topics = "${some.property}")@KafkaListener(topics = "#{someBean.someProperty}",groupId = "#{someBean.someProperty}.group")從版本2.1.2開始,SpEL表達式支持一個特殊的令牌:__listener。 它是一個偽bean名稱,表示存在此批注的當前bean實例。
請考慮以下示例:
@Bean public Listener listener1() {return new Listener("topic1"); }@Bean public Listener listener2() {return new Listener("topic2"); }鑒于上一個示例中的bean,我們可以使用以下內容:
public class Listener {private final String topic;public Listener(String topic) {this.topic = topic;}@KafkaListener(topics = "#{__listener.topic}",groupId = "#{__listener.topic}.group")public void listen(...) {...}public String getTopic() {return this.topic;}}如果您有一個名為__listener的實際bean,則可以使用beanRef屬性更改表達式標記。 以下示例顯示了如何執行此操作:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",groupId = "#{__x.topic}.group")從2.2.4版開始,您可以直接在注釋上指定Kafka使用者屬性,這些屬性將覆蓋在使用者工廠中配置的具有相同名稱的任何屬性。 您不能以這種方式指定group.id和client.id屬性; 他們會被忽視; 使用groupId和clientIdPrefix注釋屬性。
屬性被指定為具有普通Java屬性文件格式的單個字符串:foo:bar,foo = bar或foo bar。
@KafkaListener(topics = "myTopic", groupId="group", properties= {"max.poll.interval.ms:60000",ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100" })容器線程命名
監聽器容器當前使用兩個任務執行器,一個用于調用使用者,另一個用于在kafka使用者屬性enable.auto.commit為false時調用監聽器。您可以通過設置容器的ContainerProperties的consumerExecutor和listenerExecutor屬性來提供自定義執行程序。使用池化執行程序時,請確保有足夠的線程可用于處理使用它們的所有容器的并發性。使用ConcurrentMessageListenerContainer時,每個消費者使用一個線程(并發)。
如果您未提供使用者執行程序,則使用SimpleAsyncTaskExecutor。此執行程序創建名稱類似于 -C-1(使用者線程)的線程。對于ConcurrentMessageListenerContainer,線程名稱的部分變為 -m,其中m表示使用者實例。每次啟動容器時,n都會遞增。因此,使用容器的bean名稱,容器第一次啟動后,此容器中的線程將被命名為container-0-C-1,container-1-C-1等;容器-0-C-2,容器-1-C-2等,停止并隨后啟動。
@KafkaListener 作為一個元注解
從2.2版開始,您現在可以使用@KafkaListener作為元注釋。 以下示例顯示了如何執行此操作:
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @KafkaListener public @interface MyThreeConsumersListener {@AliasFor(annotation = KafkaListener.class, attribute = "id")String id();@AliasFor(annotation = KafkaListener.class, attribute = "topics")String[] topics();@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")String concurrency() default "3";}除非已在使用者工廠配置中指定了group.id,否則必須至少為其中一個主題,topicPattern或topicPartitions(以及通常為id或groupId)添加別名。 以下示例顯示了如何執行此操作:
@MyThreeConsumersListener(id = "my.group", topics = "my.topic") public void listen1(String in) {... }在一個類上使用@KafkaListener
在類級別使用@KafkaListener時,必須在方法級別指定@KafkaHandler。 傳遞消息時,轉換的消息有效負載類型用于確定要調用的方法。 以下示例顯示了如何執行此操作:
@KafkaListener(id = "multi", topics = "myTopic") static class MultiListenerBean {@KafkaHandlerpublic void listen(String foo) {...}@KafkaHandlerpublic void listen(Integer bar) {...}@KafkaHandler(isDefault = true`)public void listenDefault(Object object) {...}}從版本2.1.3開始,如果與其他方法不匹配,則可以將@KafkaHandler方法指定為調用的默認方法。 最多可以指定一種方法。 使用@KafkaHandler方法時,有效負載必須已經轉換為域對象(因此可以執行匹配)。 使用自定義反序列化器,JsonDeserializer或(String | Bytes)JsonMessageConverter,并將其TypePrecedence設置為TYPE_ID。 有關更多信息,請參閱序列化,反序列化和消息轉換。
@KafkaListener 生命周期管理
為@KafkaListener注釋創建的偵聽器容器不是應用程序上下文中的bean。相反,它們是使用KafkaListenerEndpointRegistry類型的基礎結構bean注冊的。這個bean由框架自動聲明并管理容器的生命周期;它將自動啟動autoStartup設置為true的任何容器。所有容器工廠創建的所有容器必須處于同一階段。有關更多信息,請參閱監聽器容器自動啟動。您可以使用注冊表以編程方式管理生命周期。啟動或停止注冊表將啟動或停止所有已注冊的容器?;蛘?#xff0c;您可以使用其id屬性獲取對單個容器的引用。您可以在注釋上設置autoStartup,該注釋將覆蓋配置到容器工廠中的默認設置。您可以從應用程序上下文中獲取對bean的引用,例如自動布線,以管理其已注冊的容器。以下示例顯示了如何執行此操作:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false") public void listen(...) { ... } @Autowired private KafkaListenerEndpointRegistry registry;...this.registry.getListenerContainer("myContainer").start();...@KafkaListener @Payload 校驗
從2.2版開始,現在可以更輕松地添加Validator來驗證@KafkaListener @Payload參數。 以前,您必須配置自定義DefaultMessageHandlerMethodFactory并將其添加到注冊器。 現在,您可以將驗證程序添加到注冊商本身。 以下代碼顯示了如何執行此操作:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer {...@Overridepublic void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {registrar.setValidator(new MyValidator());} }將Spring Boot與驗證啟動器一起使用時,會自動配置LocalValidatorFactoryBean,如以下示例所示:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer {@Autowiredprivate LocalValidatorFactoryBean validator;...@Overridepublic void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {registrar.setValidator(this.validator);} }以下示例顯示如何驗證:
public static class ValidatedClass {@Max(10)private int bar;public int getBar() {return this.bar;}public void setBar(int bar) {this.bar = bar;}} @KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",containerFactory = "kafkaJsonListenerContainerFactory") public void validatedListener(@Payload @Valid ValidatedClass val) {... }@Bean public KafkaListenerErrorHandler validationErrorHandler() {return (m, e) -> {...}; }Rebalancing Listeners
ContainerProperties有一個名為consumerRebalanceListener的屬性,它接受Kafka客戶端的ConsumerRebalanceListener接口的實現。 如果未提供此屬性,則容器將配置記錄偵聽器,以在INFO級別記錄重新平衡事件。 該框架還添加了一個子接口ConsumerAwareRebalanceListener。 以下清單顯示了ConsumerAwareRebalanceListener接口定義:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);}請注意,撤消分區時有兩個回調。 第一個是立即調用的。 在提交任何掛起的偏移量之后調用第二個。 如果您希望在某些外部存儲庫中維護偏移量,這非常有用,如以下示例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {@Overridepublic void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {// acknowledge any pending Acknowledgments (if using manual acks)}@Overridepublic void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {// ...store(consumer.position(partition));// ...}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// ...consumer.seek(partition, offsetTracker.getOffset() + 1);// ...} });Forwarding Listener Results using @SendTo
從2.0版開始,如果您還使用@SendTo批注注釋@KafkaListener并且方法調用返回結果,則結果將轉發到@SendTo指定的主題。
@SendTo值可以有多種形式:
-
@SendTo(“someTopic”)路由到文字主題
-
@SendTo(“#{someExpression}”)路由到在應用程序上下文初始化期間通過計算表達式確定的主題。
-
@SendTo(“!{someExpression}”)路由到通過在運行時計算表達式確定的主題。 評估的#root對象有三個屬性:
-
request:入站ConsumerRecord(或批處理偵聽器的ConsumerRecords對象))
-
source:從請求轉換的org.springframework.messaging.Message <?>。
-
result:方法返回結果。
-
-
@SendTo(無屬性):這被視為!{source.headers [‘kafka_replyTopic’]}(自版本2.1.3起)。
從版本2.1.11和2.2.1開始,屬性占位符在@SendTo值內解析。
表達式求值的結果必須是表示主題名稱的String。 以下示例顯示了使用@SendTo的各種方法:
@KafkaListener(topics = "annotated21") @SendTo("!{request.value()}") // runtime SpEL public String replyingListener(String in) {... }@KafkaListener(topics = "${some.property:annotated22}") @SendTo("#{myBean.replyTopic}") // config time SpEL public Collection<String> replyingBatchListener(List<String> in) {... }@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler") @SendTo("annotated23reply") // static reply topic definition public String replyingListenerWithErrorHandler(String in) {... } ... @KafkaListener(topics = "annotated25") @SendTo("annotated25reply1") public class MultiListenerSendTo {@KafkaHandlerpublic String foo(String in) {...}@KafkaHandler@SendTo("!{'annotated25reply2'}")public String bar(@Payload(required = false) KafkaNull nul,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {...}}從2.2版開始,您可以將ReplyHeadersConfigurer添加到偵聽器容器工廠。 查閱此信息以確定要在回復消息中設置哪些標頭。 以下示例顯示如何添加ReplyHeadersConfigurer:
@Bean public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(cf());factory.setReplyTemplate(template());factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));return factory; }如果您愿意,還可以添加更多標題。 以下示例顯示了如何執行此操作:
@Bean public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(cf());factory.setReplyTemplate(template());factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {@Overridepublic boolean shouldCopy(String headerName, Object headerValue) {return false;}@Overridepublic Map<String, Object> additionalHeaders() {return Collections.singletonMap("qux", "fiz");}});return factory; }使用@SendTo時,必須在其replyTemplate屬性中使用KafkaTemplate配置ConcurrentKafkaListenerContainerFactory以執行發送。
除非您使用請求/回復語義,否則僅使用簡單的send(topic,value)方法,因此您可能希望創建子類來生成分區或鍵。
以下示例顯示了如何執行此操作:
如果偵聽器方法返回Message <?>或Collection <Message <?>>,則偵聽器方法負責設置回復的郵件頭。 例如,在處理來自ReplyingKafkaTemplate的請求時,您可能會執行以下操作:
@KafkaListener(id = "messageReturned", topics = "someTopic") public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {return MessageBuilder.withPayload(in.toUpperCase()).setHeader(KafkaHeaders.TOPIC, replyTo).setHeader(KafkaHeaders.MESSAGE_KEY, 42).setHeader(KafkaHeaders.CORRELATION_ID, correlation).setHeader("someOtherHeader", "someValue").build(); }使用請求/回復語義時,發件人可以請求目標分區。
即使沒有返回結果,也可以使用@SendTo注釋@KafkaListener方法。
這是為了允許配置errorHandler,它可以將有關失敗消息傳遞的信息轉發到某個主題。 以下示例顯示了如何執行此操作:
有關更多信息,請參閱處理異常
Filtering Messages
在某些情況下,例如重新平衡,可以重新傳遞已經處理的消息??蚣軣o法知道是否已處理此類消息。這是一個應用程序級功能。這被稱為Idempotent Receiver模式,Spring Integration提供了它的實現。
Spring for Apache Kafka項目還通過FilteringMessageListenerAdapter類提供一些幫助,該類可以包裝MessageListener。此類采用RecordFilterStrategy的實現,您可以在其中實現filter方法,以指示消息是重復的并且應該被丟棄。這有一個名為ackDiscarded的附加屬性,它指示適配器是否應該確認丟棄的記錄。默認情況下為假。
使用@KafkaListener時,在容器工廠上設置RecordFilterStrategy(以及可選的ackDiscarded),以便將偵聽器包裝在適當的過濾適配器中。
此外,還提供了FilteringBatchMessageListenerAdapter,供您在使用批處理消息偵聽器時使用。
如果@KafkaListener收到ConsumerRecords <?,?>而不是List <ConsumerRecord
<?,?>>,則忽略FilteringBatchMessageListenerAdapter,因為ConsumerRecords是不可變的。
Retrying Deliveries
如果偵聽器拋出異常,則默認行為是調用ErrorHandler(如果已配置)或以其他方式記錄。
提供了兩個錯誤處理程序接口(ErrorHandler和BatchErrorHandler)。 您必須配置適當的類型以匹配消息偵聽器。
為了重試傳遞,提供了一個方便的偵聽器適配器RetryingMessageListenerAdapter。
您可以使用RetryTemplate和RecoveryCallback 對其進行配置 - 有關這些組件的信息,請參閱spring-retry項目。如果未提供恢復回調,則在重試耗盡后將向容器拋出異常。在這種情況下,如果已配置,則調用ErrorHandler,否則將記錄。
使用@KafkaListener時,可以在容器工廠上設置RetryTemplate(以及可選的recoveryCallback)。執行此操作時,偵聽器將包裝在適當的重試適配器中。
傳遞給RecoveryCallback的RetryContext的內容取決于偵聽器的類型。上下文始終具有記錄屬性,該記錄屬性是發生故障的記錄。如果您的偵聽器正在確認或消費者知曉,則可以使用其他確認或使用者屬性。為方便起見,RetryingMessageListenerAdapter為這些鍵提供了靜態常量。有關更多信息,請參閱其Javadoc。
沒有為任何批處理消息偵聽器提供重試適配器,因為框架不知道批處理中發生故障的位置。如果在使用批量偵聽器時需要重試功能,我們建議您在偵聽器本身中使用RetryTemplate。
Stateful Retry
您應該了解上一節中討論的重試會暫停使用者線程(如果使用BackOffPolicy)。在重試期間沒有調用Consumer.poll()??ǚ蚩ㄓ袃蓚€屬性來確定消費者的健康狀況。 session.timeout.ms用于確定使用者是否處于活動狀態。從版本0.10.1.0開始,心跳在后臺線程上發送,因此慢速消費者不再影響它。 max.poll.interval.ms(默認值:五分鐘)用于確定消費者是否顯示為掛起(從上次輪詢處理記錄花費的時間太長)。如果poll()調用之間的時間超過此值,則代理將撤消分配的分區并執行重新平衡。對于冗長的重試序列,退避時,很容易發生這種情況。
從版本2.1.3開始,您可以通過將狀態重試與SeekToCurrentErrorHandler結合使用來避免此問題。在這種情況下,每次傳遞嘗試都會將異常拋回到容器中,錯誤處理程序會重新搜索未處理的偏移量,并且下一次poll()會重新傳遞相同的消息。這避免了超出max.poll.interval.ms屬性的問題(只要嘗試之間的單個延遲不超過它)。因此,在使用ExponentialBackOffPolicy時,必須確保maxInterval小于max.poll.interval.ms屬性。要啟用有狀態重試,可以使用帶有狀態布爾參數的RetryingMessageListenerAdapter構造函數(將其設置為true)。配置偵聽器容器工廠(對于@KafkaListener)時,將工廠的statefulRetry屬性設置為true。
Detecting Idle and Non-Responsive Consumers
雖然有效,但異步消費者的一個問題是檢測它們何時空閑。 如果在一段時間內沒有消息到達,您可能需要采取一些措施。
您可以將偵聽器容器配置為在經過一段時間而沒有消息傳遞時發布ListenerContainerIdleEvent。 當容器空閑時,每個idleEventInterval毫秒都會發布一個事件。
要配置此功能,請在容器上設置idleEventInterval。 以下示例顯示了如何執行此操作:
@Bean public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");...containerProps.setIdleEventInterval(60000L);...KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);return container; }以下示例顯示如何為@KafkaListener設置idleEventInterval:
@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();...factory.getContainerProperties().setIdleEventInterval(60000L);...return factory; }在每種情況下,當容器空閑時,每分鐘發布一次事件。
此外,如果代理無法訪問,則消費者poll()方法不會退出,因此不會收到任何消息,也無法生成空閑事件。 要解決此問題,如果輪詢未在pollInterval屬性的3x內返回,則容器會發布NonResponsiveConsumerEvent。 默認情況下,每個容器中每30秒執行一次此檢查。 您可以通過在配置偵聽器容器時在ContainerProperties中設置monitorInterval和noPollThreshold屬性來修改此行為。 接收此類事件可讓您停止容器,從而喚醒消費者以便終止。
Event Consumption
您可以通過實現ApplicationListener來捕獲這些事件 - 可以是一般偵聽器,也可以是縮小到僅接收此特定事件的偵聽器。 您還可以使用Spring Framework 4.2中引入的@EventListener。
下一個示例將@KafkaListener和@EventListener組合到一個類中。 您應該了解應用程序偵聽器獲取所有容器的事件,因此如果要根據哪個容器空閑采取特定操作,則可能需要檢查偵聽器ID。 您也可以使用@EventListener條件來實現此目的。
有關事件屬性的信息,請參閱事件
該事件通常在使用者線程上發布,因此與Consumer對象進行交互是安全的。
以下示例同時使用@KafkaListener和@EventListener:
public class Listener {@KafkaListener(id = "qux", topics = "annotated")public void listen4(@Payload String foo, Acknowledgment ack) {...}@EventListener(condition = "event.listenerId.startsWith('qux-')")public void eventHandler(ListenerContainerIdleEvent event) {...}}事件偵聽器查看所有容器的事件。 因此,在前面的示例中,我們根據偵聽器ID縮小接收的事件。
由于為@KafkaListener創建的容器支持并發,因此實際容器名為id-n,其中n是每個實例的唯一值,以支持并發。
這就是我們在條件中使用startsWith的原因。
如果您希望使用idle事件來停止Lister容器,則不應在調用偵聽器的線程上調用container.stop()。
這樣做會導致延遲和不必要的日志消息。 相反,您應該將事件移交給另一個可以阻止容器的線程。 此外,如果容器實例是子容器,則不應該停止()。
您應該停止并發容器。
Current Positions when Idle
請注意,通過在偵聽器中實現ConsumerSeekAware,可以在檢測到空閑時獲取當前位置。 請參閱`尋求特定偏移量中的onIdleContainer()。
Topic/Partition Initial Offset
幾種方法可以為分區設置初始偏移量。
手動分配分區時,可以在配置的TopicPartitionInitialOffset參數中設置初始偏移量(如果需要)(請參閱消息偵聽器容器)。 您也可以隨時尋找特定的偏移量。
當您使用代理分配分區的組管理時:
-
對于新的group.id,初始偏移量由auto.offset.reset使用者屬性(最早或最新)確定。
-
對于現有組ID,初始偏移量是該組ID的當前偏移量。 但是,您可以在初始化期間(或之后的任何時間)尋找特定的偏移量。
Seeking to a Specific Offset
為了尋求,您的監聽器必須實現ConsumerSeekAware,它具有以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);啟動容器時調用第一個方法。 在初始化后的某個任意時間尋找時,您應該使用此回調。 您應該保存對回調的引用。 如果在多個容器(或ConcurrentMessageListenerContainer)中使用相同的偵聽器,則應將回調存儲在ThreadLocal或由偵聽器Thread鍵入的其他一些結構中。
使用組管理時,在分配更改時調用第二種方法。 例如,您可以通過調用回調來使用此方法來設置分區的初始偏移量。 您必須使用回調參數,而不是傳遞給registerSeekCallback的參數。 如果您自己顯式分配分區,則永遠不會調用此方法。 在這種情況下使用TopicPartitionInitialOffset。
回調有以下方法:
void seek(String topic, int partition, long offset);void seekToBeginning(String topic, int partition);void seekToEnd(String topic, int partition);當檢測到空閑容器時,您還可以從onIdleContainer()執行搜索操作。 有關如何啟用空閑容器檢測,請參閱檢測空閑和非響應消費者。
要在運行時任意搜索,請使用registerSeekCallback中的回調引用來獲取相應的線程。
Container factory
正如@KafkaListener Annotation中所討論的,ConcurrentKafkaListenerContainerFactory用于為帶注釋的方法創建容器。
從2.2版開始,您可以使用同一工廠來創建任何ConcurrentMessageListenerContainer。 如果要創建具有類似屬性的多個容器,或者希望使用某些外部配置的工廠(例如Spring Boot自動配置提供的工廠),這可能很有用。 創建容器后,可以進一步修改其屬性,其中許多屬性是使用container.getContainerProperties()設置的。 以下示例配置ConcurrentMessageListenerContainer:
@Bean public ConcurrentMessageListenerContainer<String, String>(ConcurrentKafkaListenerContainerFactory<String, String> factory) {ConcurrentMessageListenerContainer<String, String> container =factory.createContainer("topic1", "topic2");container.setMessageListener(m -> { ... } );return container; }以這種方式創建的容器不會添加到端點注冊表中。 它們應該創建為@Bean定義,以便它們在應用程序上下文中注冊。
Thread Safety
使用并發消息偵聽器容器時,將在所有使用者線程上調用單個偵聽器實例。因此,監聽器需要是線程安全的,并且最好使用無狀態監聽器。如果無法使偵聽器線程安全或添加同步會顯著降低添加并發性的好處,則可以使用以下幾種技術之一:
-
使用并發= 1的n個容器和原型作用域MessageListener bean,以便每個容器都有自己的實例(使用@KafkaListener時這是不可能的)。
-
將狀態保留在ThreadLocal <?>實例中。
-
讓單例偵聽器委托給在SimpleThreadScope(或類似范圍)中聲明的bean。
為了便于清理線程狀態(對于前面列表中的第二項和第三項),從2.2版開始,偵聽器容器在每個線程退出時發布ConsumerStoppedEvent。您可以使用ApplicationListener或@EventListener方法使用這些事件來從作用域中刪除ThreadLocal <?>實例或remove()線程范圍的bean。請注意,SimpleThreadScope不會銷毀具有銷毀接??口的bean(例如DisposableBean),因此您應該自己銷毀()實例。
默認情況下,應用程序上下文的事件multicaster在調用線程上調用事件偵聽器。 如果更改多播程序以使用異步執行程序,則線程清理無效。
推薦閱讀:
3. Spring Boot 2.x最佳實踐之Spring for Apache Kafka集成
本節完~
總結
以上是生活随笔為你收集整理的第三集 Spring for Apache Kafka 接受消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FFmpeg m3u8文件返回Inval
- 下一篇: 二、结合各种图形库实现各种demo(21