Kafka学习

1,605 阅读8分钟

其他更多java基础文章:
java基础学习(目录)


学习资料:kafka数据可靠性深度解读

Kafka概述

  • Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。
  • 无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
  • 在流式计算中,Kafka一般用来缓存数据,Spark通过消费Kafka的数据进行计算。

Kafka架构

  • Producer :消息生产者,就是向kafka broker发消息的客户端。
  • Consumer :消息消费者,向kafka broker取消息的客户端
  • Topic :可以理解为一个队列。
  • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
  • Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

分区

消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:

我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。

  • 分区的原因
    1. 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
    2. 可以提高并发,因为可以以Partition为单位读写了。
  • 分区的原则
    1. 指定了patition,则直接使用;
    2. 未指定patition但指定key,通过对key的value进行hash出一个patition
    3. patition和key都未指定,使用轮询选出一个patition。

副本(Replication)

同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。

写入流程

  1. producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
  2. producer将消息发送给该leader
  3. leader将消息写入本地log
  4. followers从leader pull消息,写入本地log后向leader发送ACK
  5. leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK

ACK,HW,ISR等可以阅读kafka数据可靠性深度解读学习
简单来说:

  • HW是HighWatermark的缩写,是指consumer能够看到的此partition的位置
  • ISR (In-Sync Replicas),这个是指副本同步队列。所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
  • ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。

Kafka消费过程分析

kafka提供了两套consumer API:高级Consumer API和低级API。

高级API

  • 高级API优点
    • 高级API 写起来简单
    • 不需要去自行去管理offset,系统通过zookeeper自行管理
    • 不需要管理分区,副本等情况,系统自动管理
    • 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的的offset)
    • 可以使用group来区分对同一个topic的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
  • 高级API缺点
    • 不能自行控制offset(对于某些特殊需求来说)
    • 不能细化控制如分区、副本、zk等

低级API

  • 低级 API 优点
    • 能够开发者自己控制offset,想从哪里读取就从哪里读取。
    • 自行控制连接分区,对分区自定义进行负载均衡
    • 对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)
  • 低级API缺点
    • 太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。

消费者组

消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。

在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。 在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。

JAVA中使用Kafka

生产者

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class KafkaProducerDemo {

    public static void main(String[] args){
        //test();
        test2();
    }

    public static void test(){
        Properties props= new Properties();
        props.put("bootstrap.servers", "172.26.40.181:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer(props);
        for(int i = 0; i < 10; i++){
            producer.send(new ProducerRecord("first",Integer.toString(i), Integer.toString(i)));
        }

        producer.close();
    }

    /**
     * 带回调函数
     */
    public static void test2(){
        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "172.26.40.181:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 消息发送最大尝试次数
        props.put("retries", 0);
        // 一批消息处理大小
        props.put("batch.size", 16384);
        // 增加服务端请求延时
        props.put("linger.ms", 1);
// 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //拦截器
        List<String> interceptor = new ArrayList<>();
        interceptor.add("com.hiway.practice.kafka.TimeInterceptor");
        interceptor.add("com.hiway.practice.kafka.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptor);

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        for (int i = 0; i < 50; i++) {

            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (metadata != null) {

                        System.err.println(metadata.partition() + "---" + metadata.offset());
                    }
                }
            });
        }

        kafkaProducer.close();
    }

}

消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CustomNewConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        // 定义kakfa 服务的地址,不需要将所有broker指定上
        props.put("bootstrap.servers", "172.26.40.181:9092");
        // 制定consumer group
        props.put("group.id", "test");
        // 是否自动确认offset
        props.put("enable.auto.commit", "true");
        // 自动确认offset的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 定义consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 消费者订阅的topic, 可同时订阅多个
        consumer.subscribe(Arrays.asList("first"));

        while (true) {
            // 读取数据,读取超时时间为100ms
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

拦截器

public class TimeInterceptor implements ProducerInterceptor<String,String> {


    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),System.currentTimeMillis() + "," + record.value().toString());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

public class CounterInterceptor implements ProducerInterceptor<String, String> {
    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public void configure(Map<String, ?> configs) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 统计成功和失败的次数
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}

错误总结

1. Uncaught error in kafka producer I/O thread错误
这个问题主要是服务器上的kafka版本和IDEA中的kafka版本不一致导致的。

2.producer发送数据到集群上无反应

将kafka/config/server.properties文件中advertised.listeners改为如下属性。172.26.40.181是我虚拟机的IP。改完后重启,OK了。Java端的代码终于能通信了 advertised.listeners=PLAINTEXT://172.26.40.181:9092 advertised.listeners上的注释是这样的:

#Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().

意思就是说:hostname、port都会广播给producer、consumer。如果你没有配置了这个属性的话,则使用listeners的值,如果listeners的值也没有配置的话,则使用 java.net.InetAddress.getCanonicalHostName()返回值(这里也就是返回localhost了)。