Spring Kafka 2.x 生产/消费消息的基本用法详解

17,244 阅读13分钟

一 简介

跟Spring Data Redis、Spring Data MongoDB、Spring Data JPA等项目类似,Spring Kafka提供了在Spring应用中通过简单配置从而访问Kafka集群的途径。

本篇文章我将介绍在Spring应用中消息生产者如何向Kafka集群发送消息、消息消费者如何消费消息、如何批量消费消息以及多消费者组同时消费消息等等。

需要注意的是,为了使用Spring Kafka的最新特性,以下测试代码采用了 Spring Boot 2.0.0构建,全部可用源码参考地址:gitee.com/zifangsky/K…

二 Spring Kafka的基本用法

(1)在pom.xml中添加依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

注:因为我采用了Spring Boot构建项目,因此添加依赖时没有指定Spring Kafka的具体版本(实际jar包版本是2.1.x)。详细pom.xml文件可以参考:gitee.com/zifangsky/K…

(2)基本配置:

i)如果使用Spring Boot构建项目,那么可以简单在 properties 属性文件中添加以下配置:

#kafka,更多配置:org.springframework.boot.autoconfigure.kafka.KafkaProperties
#指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
#指定默认topic id
spring.kafka.template.default-topic=topic-test
#指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
#每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 
#指定默认消费者group id
spring.kafka.consumer.group-id=myGroup1
#若设置为earliest,那么会从头开始读partition
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

ii)如果使用普通Maven构建项目,或者想要自定义更多配置,可以采用以下JavaConfig配置:

package cn.zifangsky.kafkademo.config;
 
import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
 
/**
 * Kafka配置
 * @author zifangsky
 */
@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Value("${kafka.producer.bootstrapServers}")
    private String producerBootstrapServers; //生产者连接Server地址
    
    @Value("${kafka.producer.retries}")
    private String producerRetries; //生产者重试次数
    
    @Value("${kafka.producer.batchSize}")
    private String producerBatchSize;
    
    @Value("${kafka.producer.lingerMs}")
    private String producerLingerMs;
    
    @Value("${kafka.producer.bufferMemory}")
    private String producerBufferMemory;
    
    
    @Value("${kafka.consumer.bootstrapServers}")
    private String consumerBootstrapServers;
    
    @Value("${kafka.consumer.groupId}")
    private String consumerGroupId;
    
    @Value("${kafka.consumer.enableAutoCommit}")
    private String consumerEnableAutoCommit;
    
    @Value("${kafka.consumer.autoCommitIntervalMs}")
    private String consumerAutoCommitIntervalMs;
    
    @Value("${kafka.consumer.sessionTimeoutMs}")
    private String consumerSessionTimeoutMs;
    
    @Value("${kafka.consumer.maxPollRecords}")
    private String consumerMaxPollRecords;
    
    @Value("${kafka.consumer.autoOffsetReset}")
    private String consumerAutoOffsetReset;
    
    /**
     * ProducerFactory
     * @return
     */
    @Bean
    public ProducerFactory<Object, Object> producerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
        configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
        configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        
        return new DefaultKafkaProducerFactory<Object, Object>(configs);
    }
    
    /**
     * KafkaTemplate
     * @param producerFactory
     * @return
     */
    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate(){
        return new KafkaTemplate<Object, Object>(producerFactory(), true);
    }
 
    /**
     * ConsumerFactory
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> consumerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
        configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    
        return new DefaultKafkaConsumerFactory<Object, Object>(configs);
    }
    
    /**
     * 添加KafkaListenerContainerFactory,用于批量消费消息
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        containerFactory.setConsumerFactory(consumerFactory());
        containerFactory.setConcurrency(4);
        containerFactory.setBatchListener(true); //批量消费
        containerFactory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        
        return containerFactory;
    }
    
}

注:在上面用到的属性分别是:

kafka.producer.bootstrapServers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
kafka.producer.retries=3
#16K
kafka.producer.batchSize=16384
kafka.producer.lingerMs=1
#32M
kafka.producer.bufferMemory=33554432
 
kafka.consumer.bootstrapServers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
kafka.consumer.groupId=0
kafka.consumer.enableAutoCommit=false
kafka.consumer.autoCommitIntervalMs=1000
kafka.consumer.sessionTimeoutMs=30000
kafka.consumer.maxPollRecords=100
#earliest,latest
kafka.consumer.autoOffsetReset=earliest

注:关于这些属性的详细含义可以参考官方文档:kafka.apache.org/documentati…

(3)第一个消息生产/消费的实例:

i)消息生产者:

package cn.zifangsky.kafkademo.producer;
 
import java.text.MessageFormat;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
 
/**
 * 消息生产者的第一个示例
 * @author zifangsky
 */
@Component("simpleProducer")
public class SimpleProducer {
    private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    
    /**
     * 使用KafkaTemplate向Kafka推送数据
     * @param topicName topic
     * @param data
     */
    public void sendMessage(String topicName,String data){
        logger.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
        
        try {
            kafkaTemplate.send(topicName, data);
            logger.info("推送数据成功!");
        } catch (Exception e) {
            logger.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
                    ,topicName,data));
        }
    }
 
}
 

ii)在controller里调用:

package cn.zifangsky.kafkademo.controller;
 
import javax.annotation.Resource;
 
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
import cn.zifangsky.kafkademo.producer.SimpleProducer;
 
@RestController
@RequestMapping("/kafka")
public class TestKafkaController {
    @Resource(name="simpleProducer")
    private SimpleProducer producer;
    
    private final String TOPIC = "topic-test"; //测试使用topic
    
    @RequestMapping("/send")
    public String send(String data){
        producer.sendMessage(TOPIC, data);
        
        return "发送数据【" + data + "】成功!";
    }
 
}
 

iii)消息消费者:

package cn.zifangsky.kafkademo.consumer;
 
import java.text.MessageFormat;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
import cn.zifangsky.kafkademo.producer.SimpleProducer;
 
/**
 * 消息消费者的第一个示例
 * @author zifangsky
 */
@Component("simpleConsumer")
public class SimpleConsumer {
    private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    
    @KafkaListener(id="test",topics={"topic-test"})
    public void listen(String data){
        System.out.println("SimpleConsumer收到消息:" + data);
        logger.info(MessageFormat.format("SimpleConsumer收到消息:{0}", data));
    }
    
}
 

iv)测试:

启动项目后,在浏览器中访问:http://127.0.0.1:9090/kafka/send?data=哈哈哈1111

控制台中输出如下:

可以发现,这个最简单的实例已经可以正常运行了。

(4)发送/接收自定义类型消息:

在上面的示例中,我们发送/接收的消息均是简单字符串,其本质是使用 StringDeserializer 和 StringDeserializer 来编码、解码消息。然而在实际开发中,可能有时我们想要发送比较复杂的消息(比如想要发送一条描述某个地区天气状况的消息,此消息同时包含了温度、湿度、污染指数等多个维度)。这种情况下我们通常有两种方式来实现:

  • 方式一:在发送消息前将Java对象转化为JSON字符串,然后再发送到Kafka集群
  • 方式二:自定义消息编码器和解码器,直接发送Java对象

i)自定义消息编码器和解码器:

package cn.zifangsky.kafkademo.common;
 
import java.util.Map;
 
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.util.SerializationUtils;
 
public class ObjectSerializer implements Serializer<Object> {
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
 
    }
 
    /**
     * 序列化
     */
    @Override
    public byte[] serialize(String topic, Object data) {
        return SerializationUtils.serialize(data);
    }
 
    @Override
    public void close() {
 
    }
 
}
 

package cn.zifangsky.kafkademo.common;
 
import java.util.Map;
 
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.util.SerializationUtils;
 
public class ObjectDeserializer implements Deserializer<Object> {
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
 
    }
 
    /**
     * 反序列化
     */
    @Override
    public Object deserialize(String topic, byte[] data) {
        return SerializationUtils.deserialize(data);
    }
 
    @Override
    public void close() {
 
    }
 
}
 

ii)修改KafkaConfig里的相关配置:

    /**
     * ProducerFactory
     * @return
     */
    @Bean
    public ProducerFactory<Object, Object> producerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
        configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
        configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
//        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ObjectSerializer.class);
        
        return new DefaultKafkaProducerFactory<Object, Object>(configs);
    }
    
        ...
 
    /**
     * ConsumerFactory
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> consumerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
        configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
//        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class); //需要把原来的消息删掉,不然会出现反序列化失败的问题
        
        return new DefaultKafkaConsumerFactory<Object, Object>(configs);
    }

需要注意的是,改变消息编码器和解码器之后需要清空Topic中原有消息或者使用新的Topic,否则原来的字符串消息在反序列化时会出现异常,切记。

iii)在SimpleProducer.java中添加一个新的方法:

    /**
     * 使用KafkaTemplate向Kafka推送数据
     * @param topicName topic
     * @param data
     */
    public void sendObjectMessage(String topicName,Object data){
        logger.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
        
        try {
            kafkaTemplate.send(topicName, data);
            logger.info("推送数据成功!");
        } catch (Exception e) {
            logger.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
                    ,topicName,data));
        }
    }

iv)测试:

先新增一个测试使用的实体类DemoObj.java:

package cn.zifangsky.kafkademo.model;
 
import java.io.Serializable;
 
public class DemoObj implements Serializable{
    private static final long serialVersionUID = -8094247978023094250L;
    private Long id;
    private String data;
 
    public DemoObj() {
 
    }
 
    public DemoObj(Long id, String data) {
        this.id = id;
        this.data = data;
    }
 
    public Long getId() {
        return id;
    }
 
    public void setId(Long id) {
        this.id = id;
    }
 
    public String getData() {
        return data;
    }
 
    public void setData(String data) {
        this.data = data;
    }
 
    @Override
    public String toString() {
        return "DemoObj [id=" + id + ", data=" + data + "]";
    }
 
}

接着在TestKafkaController.java中添加一个新方法:

    @RequestMapping("/send2")
    public String send2(DemoObj demoObj){
        producer.sendObjectMessage(TOPIC2, demoObj);
        
        return "发送数据【" + demoObj + "】成功!";
    }

最后添加对应的消息消费者:

package cn.zifangsky.kafkademo.consumer;
 
import java.text.MessageFormat;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
import cn.zifangsky.kafkademo.model.DemoObj;
 
/**
 * 消息消费者(group1)
 * @author zifangsky
 *
 */
@Component("groupListener1")
public class GroupListener1 {
    private static final Logger logger = LoggerFactory.getLogger(GroupListener1.class);
    
    @KafkaListener(topics={"topic-test2"},groupId="group1")
    public void listenTopic2(DemoObj data){
        System.out.println("Group1收到消息:" + data);
        logger.info(MessageFormat.format("Group1收到消息:{0}", data));
    }
    
}

在浏览器中访问:http://127.0.0.1:9090/kafka/send2?id=8&data=测试9

控制台中输出效果如下:

(5)多消费者组消费同一条消息:

在Spring Kafka 2.x版本之后,@KafkaListener注解新增了一个 groupId 参数,用于指定其所属的 consumer group。根据Kafka的设计原理可知,如果两个不同的 consumer 分别处于两个不同的 consumer group,那么它们就可以同时消费同一条消息(producer发送到某个 topic 的某个 partition 的某条消息)

因此在上面代码的基础上添加一个新的消费者组:

package cn.zifangsky.kafkademo.consumer;
 
import java.text.MessageFormat;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
import cn.zifangsky.kafkademo.model.DemoObj;
 
/**
 * 消息消费者(group2)
 * @author zifangsky
 *
 */
@Component("groupListener2")
public class GroupListener2 {
    private static final Logger logger = LoggerFactory.getLogger(GroupListener2.class);
    
    @KafkaListener(topics={"topic-test2"},groupId="group2")
    public void listenTopic2(DemoObj data){
        System.out.println("Group2收到消息:" + data);
        logger.info(MessageFormat.format("Group2收到消息:{0}", data));
    }
    
//    @KafkaListener(topics={"topic-test2"},groupId="group2")
//    public void listenTopic2_2(DemoObj data){
//        System.out.println("Group2_2收到消息:" + data);
//        logger.info(MessageFormat.format("Group2_2收到消息:{0}", data));
//    }
    
}

再次测试,可以发现输出如下:

很明显,输出结果是符合我们预期的。

(6)设置@KafkaListener批量消费消息:

由Spring Kafka官方文档可知,从1.1版本开始@KafkaListener支持批量消费消息,官方示例是这样的:

实际上相关配置我在上面的KafkaConfig里已经配置过了,也就是:

    /**
     * ConsumerFactory
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> consumerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
        configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
//        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class); //需要把原来的消息删掉,不然会出现反序列化失败的问题
        
        return new DefaultKafkaConsumerFactory<Object, Object>(configs);
    }
    
    /**
     * 添加KafkaListenerContainerFactory,用于批量消费消息
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        containerFactory.setConsumerFactory(consumerFactory());
        containerFactory.setConcurrency(4);
        containerFactory.setBatchListener(true); //批量消费
        containerFactory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        
        return containerFactory;
    }

接着在上面的GroupListener1.java中添加一个新方法用于测试:

    @KafkaListener(topics={"topic-test"},groupId="group1",containerFactory="batchContainerFactory")
    public void listenTopic1(List<String> data){
        System.out.println("Group1收到消息:" + data);
        logger.info(MessageFormat.format("Group1收到消息:{0}", data));
    }

最后测试效果如下:

参考: