kafka概述

564 阅读11分钟

初识kafka

kafka是由LinkedIn公司采用Scala语言开发的一个多分区,多副本且基于Zookeeper协调的分布式消息系统,现在被捐献给Apache基金。目前kafka的一个定位是分布式流式处理平台,具有⾼吞吐、可持久化、可⽔平扩展、⽀持流数据处理等多种特性 kafka之所以受到越来越多的青睐,与它扮演的三个角色有关:

  • 消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了⼤多数消息系统难以实 现的消息顺序性保障及回溯消费的功能。
  • 存储系统: Kafka 把消息持久化到磁盘,相⽐于其他基于内存存储的系统⽽⾔,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的 数据存储系统来使⽤,只需要把对应的数据保留策略设置为“永久”或启⽤主题的⽇志压缩功能即可。
  • 流式处理平台: Kafka 不仅为每个流⾏的流式处理框架提供了可靠的数据来源,还提供了⼀个完整的流式处理类库,⽐如窗口、连接、变换和聚合等各类操作。

基本概念

一个典型的kafka体系架构包括若干Producer,若干Consumer,以及一个Zookeeper集群,如下图所示。其中Zookeeper是kafka用来负责对元数据的管理、控制器的选举等操作的。Producer将消息发送到broker,Broker负责将接收到的消息存储到磁盘中,而Consumer负责Broker订阅并消费信息。

  • 消息:kafka中每条数据或者说记录称为消息,消息由字节数组组成。
  • 批次:批次就是一组消息,这些消息属于同一个主题和分区。
  • producer:生产者,也被称为发布者或者写入者,一般情况下,一条消息会被发布到一个特定的主题上。生产者默认会将消息均衡的发布到同一主题不同分区上,也可以指定分区。
  • topic:主题,kafka的消息是通过主题划分的,主题就好比是数据库中的表的概念。主题可以划分成多个分区,因此无法保证整个主题消费的顺序性。
  • 分区(partition):一个主题可以有多个分区,一个分区就是一个提交日志,消息以追加的方式写入,然后以先入先出的顺序进行消费,每个分区只能被同一消费组中的某一个消费者进行消费(也就是所有权关系),但是能够同时被多个消费组同时消费。
  • Consumer:消费者,也被称为订阅者或读者,能够订阅一个或者多个主题,并按照消息生成的顺序进行消费,消费者通过偏移量来确定是否消费过数据。
  • 消费者群组:消费者只是消费群组的一部分,也就是说一个群组中,可以有一个或者多个消费者共同消费同一个主题,而消费者群组能保证在每个分区只能被一个消费者使用。
  • broker:一个独立的kafka服务器称为broker,broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。同时,broker为消费者服务,对读取分区的请求作出相应,返回生产者生产的数据。
  • 集群:由多个broker组成的组。会从broker中选出一个集群控制器的角色。

console客户端使用

在安装目录下,可以使用命令行:

./kafka-console-producer.sh --broker-list 127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093 --topic test

来创建一个topic,并进入生产消息输入客户端 再重新新开一个命令终端,执行:

sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic test --from-beginning

来创建一个消费者,在生产者输入消息,就可以被该消费者消费

java客户端使用

生产者创建

//这里只创建一个最简单的生产者
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers", "host1:9092,host2:9092,host3:9093");
kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProperties);

消息发送 发送消息一共分为三种模式

  • 发送即忘记
  • 同步发送
  • 异步发送

同步即忘记

/**
 * 发送消息并忘记,虽然会进行重试,但是还是会丢失数据而客户端并不知道
 */
@Test
public void sendMessageAndForget() {
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_TEST, "test_message", "this test message");
    kafkaProducer.send(producerRecord);
}

同步发送

/**
 * 同步发送消息,通过send方法获取到future,当发送失败后,get()方法会返回异常
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void sendMessageWithSync() throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_TEST, "test_message", "this test message");
    Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
    RecordMetadata recordMetadata = future.get();
    System.out.println("record offset:" + recordMetadata.offset());
}

异步发送

/**
 * 异步发送消息,通过在send方法中,增加一个回调函数,能够知道发送结果
 * @throws InterruptedException
 */
@Test
public void sendMessageWithAsync() throws InterruptedException {
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_TEST, "test_message", "this test message");
    kafkaProducer.send(producerRecord, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            System.out.println(metadata.offset());
        }
    });
    TimeUnit.SECONDS.sleep(3);
}

消息组成

  • key:用于选择分区,当没有设置的时候,由kafka随机分配一个分区,随机策略是每隔topic.metadata.refresh.interval.ms设置的时间随机获取一个分区,这段时间内的消息都是发往这个分区。
  • value:消费端能接收到的内容

生产者配置

必选参数

  • bootstrap.servers:指定brokers地址,可以指定多个,当其中broker一个宕机,能够连接到这个集群中的其他节点。
  • key.serializer:键序列化类,该类必须实现org.apache.kafka.common.serialization.Serializer接口,kafka已经提供基本类型的序列化类
  • value.serializer:值序列化类,见key.serializer

可选参数

  • acks:指定需要多少个分区接收到消息,才认为消息发送成功,提供以下三个值:
    • 0:不用等待服务器响应,就默认为成功,会导致丢失消息,速度最快
    • 1:只需要集群领导者接收成功并响应就算是成功。此时吞吐量取决于是异步发送还是同步发送。
  • all:需要所有的节点接收成功并响应才算是成功,安全级别最高,吞吐量最低。
  • buffer.memory:生产者内存缓冲区的大小,生产者用该内存缓冲发送到服务器的消息。
  • compression.type:压缩类型,提供以下三个值:
    • snappy:Google发明的压缩算法,能够占用较少的CPU来达到不错的压缩比。
    • gzip:占用更多的CPU,更高的压缩比
    • lz4:侧重于效率,压缩速度很快。
  • retries:重试次数,生产者从服务器接收的错误如果是临时性的,该值将会决定重试的次数,每次重试之间会间隔100ms,该值也可以通过retry.backoff.ms来决定
  • batch.siz:一个批次可以使用的内存大小。当多个消息需要被发送到一个分区时,生产者会将消息放到批次中,等满了再一次性发送,减少网络开销。但并不是一定要等该缓存满了才会发送,只有部分满的批次也会发送。
  • linger.ms:指定生产者发送批次之前,会等待多长时间将发送的消息写入批次。
  • client.id:可以是任何字符,服务器用它来识别消息的来源
  • max.in.flight.requests.per.connection:指定生产者接收到服务器响应之前可以发送多少个消息(批次),它的值越高,占用越多的内存,响应的会提升吞吐量,将它设置成1可以保证发送的顺序性,即使发生了重试。
  • timeout.ms:指定broker等待同步副本返回消息的时间
  • request.timeout.ms:指定生产者发送数据等待服务器响应时间
  • metadata.max.age.ms:指定生产者获取元数据等待服务器的时间
  • max.block.ms:指定调用send()和partitionFor()阻塞方法时的等待时间。
  • max.request.size:指定生产者发送的请求的大小
  • receive.buffer.bytes:指定TCP socket接收数据包缓冲区的大小,如果设置成-1,则默认使用系统的默认值
  • send.buffer.bytes:指定TCP socket发送数据包缓冲区的大小,如果设置成-1,则默认使用系统的默认值

消费者创建

Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers", "host1:9092,host2:9092,host3:9092");
kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put("group.id", "group_test");
kafkaConsumer = new KafkaConsumer<>(kafkaProperties);

订阅主题

数组方式

kafkaConsumer.subscribe(Collections.singletonList("test"));

通配符方式

kafkaConsumer.subscribe("test.*");

消费者配置

必填参数

  • bootstrap.servers:同生产者
  • key.deserializer:键反序列化类,该类必须实现org.apache.kafka.common.serialization.Deserializer接口,kafka已经提供基本类型的序列化类
  • value.deserializer:值反序列化类,见key.deserializer
  • group.id:消费组id

可选参数

  • fetch.min.bytes:指定服务器获取记录的最小字节数,当没有达到这个字节数时,则等待达到最小数再消费返回
  • fetch.max.wait.ms:指定等待服务器没有达到最小数时,等到多久返回给消费者。
  • max.partition.fetch.bytes:指定从每个分区返回给消费者最大字节数
  • session.timeout.ms:该属性指定消费者在被认定死亡之前可以和服务器断开的时间,如果超过该时间没有发送心跳,则认定消费者死亡,会触发再平衡过程,将分配给该消费者的分区分配给同一消费组中其他的消费者。
  • heartbeat.interval.ms:指定心跳间隔
  • auto.offset.reset:指定消费者读取一个没有偏移量或者偏移量失效(消费者长时间失效,包含的偏移量已经失效),可以指定以下两个值:
    • latest:默认值,当偏移量无效的情况下,从最新的记录开始消费,即消费者启动之后生产的数据
    • earliest:当偏移量无效的情况下,从最起始的位置开始读取分区的记录
  • enable.auto.commit:指定消费者提交偏移量的方式,可以设置为以下两个值
    • true:默认值,消费者自动提交偏移量
    • false:消费者显式提交偏移量
  • partition.assignment.strategy:指定分区分配给消费者的策略,可以设置为以下两个值:
    • Range:class org.apache.kafka.clients.consumer.RangeAssignor
    • RoundRobin:org.apache.kafka.clients.consumer.RoundRobinAssignor
    • Sticky:org.apache.kafka.clients.consumer.StickyAssignor
  • client.id:客户端ID
  • max.poll.records:指定单次调用call()方法能够最多返回记录的数量
  • receive.buffer.bytes:指定TCP socket接收数据包缓冲区的大小,如果设置成-1,则默认使用系统的默认值
  • send.buffer.bytes:指定TCP socket发送数据包缓冲区的大小,如果设置成-1,则默认使用系统的默认值

偏移量

kafka和其他消息中间件的一个独特之处就是kafka消费的消息不需要确认,而是通过偏移量来维护消费消息的开始位置。 更新分区的偏移量我们称为提交

提交

kafka会有一个特殊的主题:_consumer_offset,消费者会往这个主题中发送包含每个分区的偏移量的消息。如果一个消费者一直处于可用状态,这个偏移量就没有什么用处,但是当消费者发生重启或者新的消费者进入触发再均衡,则会从该主题中读取每个分区的最后一次提交的偏移量。

提交方式

自动提交
  • 当enable.auto.commit设置为true时,则每隔auto.commit.interval.ms时间提交偏移量,带来的问题是当还有提交就消费者宕机引起再均衡,这一段时间内的消息可能会被重复消费
  • 同步提交 当enable.auto.commit设置为false时,通过commitSync()方法进行提交,该方法会一直阻塞,直到提交成功或者抛出异常,影响效率
  • 异步提交 当enable.auto.commit设置为true时,通过commitAsync()方法进行提交,该方法会异步提交且不会重试,效率较高
更好的提交方式

同步和异步进行组合

@Test
public void consumerMessage() {
    try {
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(10));
            consumerRecords.forEach(consumerRecord -> {
                System.out.println(consumerRecord.value());
            });
            //异步提交偏移量
            kafkaConsumer.commitAsync();
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //同步提交偏移量
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
    }
}
提交特定的偏移量

commitSync()和commitAsync()提交的都是当前消费批次的最后一个偏移量,下面我们来示范如何提交特定的偏移量:

/**
 * 消费消息并提交特定的偏移量
 */
@Test
public void consumerAndCommitPositionOffset() {
    long counter = 0L;
    try {
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(10));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                counter++;
                System.out.println();
                //每1000条数据提交一次偏移量,并指定偏移量
                if (counter % 1000 == 0) {
                    Map<TopicPartition, OffsetAndMetadata> concurrentOffsetMap = new HashMap<>();
                    concurrentOffsetMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()),
                            new OffsetAndMetadata(consumerRecord.offset() + 1));
                    kafkaConsumer.commitSync(concurrentOffsetMap);
                }
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //同步提交偏移量
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
    }
}
再均衡监听

程序可以在订阅主题的手设置再平衡监听,该监听会在Consumer发生再均衡,分区重新分配时调用

kafkaConsumer.subscribe(Collections.singletonList(TOPIC_TEST), new ConsumerRebalanceListener() {
    /**
     * 该方法会在再均衡之前,消费者停止读取消息之后被调用
     * @param partitions
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("will rebalance ...");
    }

    /**
     * 该方法会在再均衡完成之后,消费者重新开始读取消息之前被调用
     * @param partitions
     */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println("rebalance has done ...");
    }
});