背景

系统采用Spring Cloud Stream框架集成Kafka来实现异步消息。

问题

客户端消费某个topic消息出错时,会连续输出这个消息内容3次,同时没有提交offset。当有大量消息出错时,topic出现消息积压。

分析

首先,我们知道实现异步消息的系统架构一般包含3个部分:生产者Producer、消息中间件Broker、消费者Consumer。其中,

  1. 消息中间件Broker,是指二进制安装包安装的单机Kafka或利用zookeeper组成的Kafka集群或者阿里云Kafka集群。

  2. 生产者Produer和消费者Consumer,是指基于kafka-clients包,实现我们业务逻辑的消息生产者及消费者。

而spring-cloud-stream,是一个高度可扩展的基于事件驱动的框架,在系统中属于更高层级。系统层级:业务代码 -> spring-cloud-stream -> spring-integration -> spring-messaging -> spring-kafka -> kafka-clients -> kafka broker。

其次,我们需要了解几个关键点:

  1. Kafka Broker支持Consumer提交offset,仅此而已。offset自动提交及手动提交,是由Kafka Consumer实现的。

  2. 不要误以为offset自动提交,是指Kafka Broker在Consumer拉取消息后自己自动提交offset。

  3. kafka-clients的自动提交offset与spring-cloud-stream的自动提交offset不是一回事儿。

    • kafka-clients通过设置KafkaConsumer的enable.auto.commit=true来实现自动提交offset。

    • spring-cloud-stream有spring.cloud.stream.kafka.bindings.channelName.consumer.autoCommitOffset配置项,默认true;但spring-kafka有自己定义的一套AckMode(org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode定义),设置autoCommitOffset=true,spring-kafka会负责提交offset,不会利用kafka-clients的KafkaConsumer的自动提交机制(即KafkaConsumer属性enable.auto.commit=false)。

那么,解答上面的问题:1、为什么会连续消费3次?2、为什么之后不再消费,消息出现积压?

1、没有消费3次。客户端只从Broker拉取了1次,spring-kafka使用RetryingAcknowledgingMessageListenerAdapter处理消息,在出现异常时,重新触发我们业务逻辑再处理2次(spring-retry的RetryTemplate负责重试),一共3次。3次日志是我们处理逻辑输出的。

2、大量消息拉取后处理失败,没有向Broker提交offset,肯定会导致消息出现积压。KafkaConsumer有存储上次消费的offset,每次拉取都会把这个offset传递给Broker,从这个offset之后拉取消息,拉取不到,所以不再消费。重启KafkaConsumer可以重新拉取消费。

Logo

开源、云原生的融合云平台

更多推荐