kafka数据如何被重复消费

666 阅读2分钟

近段时间学习极客时间李玥老师的后端存储实战课时,看到一个很多意思的东西:用kafka存储点击流的数据,并重复处理。在以往的使用中,kafka只是一个消息传输的载体,消息被消费后就不能再次消费。新知识与印象相冲突,于是就有了本篇文章:kafka数据如何被重复消费。

前期理论了解

首先我先去官网纠正了我对kafka的整体了解。

官网对kafka的描述是:一个分布式流平台。怪自己的学艺不精。

其次,我重新看了一下kafka消费者的消费过程:kafka首先通过push/poll(默认为poll)获取消息,接收消息处理完成后手动/自动提交消费成功,kafka服务器则根据提交情况决定是否移动当前偏移量。

方案确定

kafka消费者读取数据的位置是通过偏移量判断,那如果我能将偏移量手动设置为起始位置,就能实现重复消费?这个有搞头。

如何手动设置偏移量是关键。

show me the code

代码的关键主要在于偏移量设置 api 的调用,其余没什么特别。

要注意的是,代码中我分别调用了作用不同的设置偏移量,仅作为展示,可按需取用。

最后消费者消息消息时,我只使用默认的拉取条数设置消费一次,可按需进行修改。

/**
 * repeat kafka message
 * @param host kafka host
 * @param groupId kafka consumer group id
 * @param autoCommit whether auto commit consume
 * @param topic consume topic
 * @param consumeTimeOut consume time out
*/
    private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){
        //form a properties to new consumer
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString());
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //subscribe incoming topic
        consumer.subscribe(Collections.singletonList(topic));
        //get consumer consume partitions
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>();
        for(PartitionInfo partitionInfo : partitionInfos){
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            topicPartitions.add(topicPartition);
        }
        // poll data from kafka server to prevent lazy operation
        consumer.poll(Duration.ofSeconds(consumeTimeOut));
        //reset offset from beginning
        consumer.seekToBeginning(topicPartitions);
        //reset designated partition offset by designated spot
        int offset = 20;
        consumer.seek(topicPartitions.get(0), offset);
        //reset offset to end
        consumer.seekToEnd(topicPartitions);
        //consume message as usual
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
        while (iterator.hasNext()){
            ConsumerRecord<String, String> record = iterator.next();
            log.info("consume data: {}", record.value());
        }
    }
运行结果

需注意的点

在手动设置偏移量时,遇到了一个exception

java.lang.IllegalStateException: No current assignment for partition test-0

翻了一下stackoverflow以及官方文档后,才了解到设置偏移量是一个lazy operation,官网的解释如下。

Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only when poll(long) or position(TopicPartition) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.

于是我先进行一次 poll 操作后再设置偏移量。

    本文首发于 cartoon的博客
    转载请注明出处:cartoonyu.github.io/cartoon-blo…