高吞吐的「小卡」kafka

960 阅读25分钟


概述

kafka名字由来:Franz Kafka 弗兰兹·卡夫卡,奥匈帝国作家,西方现代主义文学先驱和大师。

最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的流处理平台

它是一个一个高吞吐的分布式流处理平台kafka

它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写

Kafka的特性

高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。

可扩展性:kafka集群支持热扩展

持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

高并发:支持数千个客户端同时读写

kafka用途:

存储:将数据流安全的存储在分布式可复制的容错集群中。

流处理:编写可伸缩的流处理应用程序,以实时响应事件

发布订阅:像消息系统一样读写数据

Kafka®用于构建实时数据管道和流应用程序。它具有水平可伸缩性,容错性,极快,已在数千家公司中投入生产。


kafka是数据流处理者

kafka最新版本2.5

ApacheKafka®是

一个分布式流平台
。这到底是什么意思呢?

流平台具有三个关键功能:

  • 发布和订阅数据流,类似于消息队列或企业消息传递系统。

  • 以容错持久化方式存储记录流。

  • 处理数据记录流。

Kafka通常用于两大类应用程序:

  • 建立实时流数据管道,以可靠地在系统或应用程序之间获取数据

  • 构建实时流应用程序以转换或响应数据流

要了解Kafka如何执行这些操作,让我们从头开始深入研究Kafka的功能。

首先几个概念:

  • Kafka在一个或多个可以跨越多个数据中心的服务器上作为集群运行。

  • Kafka集群将

    记录
    流存储在称为
    topic的
    类别中。

  • 每个记录由一个键,一个值和一个时间戳组成。


Kafka具有五个核心API:

  • 生产者API允许应用程序发布一个记录流至一个或多个kafka的主题中。

  • 消费者API允许应用程序订阅一个或多个主题,并处理发送给他们的数据流。

  • 所述流API允许应用程序充当

    流处理器
    ,从一个或多个主题消费一个输入流,并产生一个输出流至一个或多个输出的主题,有效地传输输入流到输出流。

  • 连接器API允许构建和运行可重复使用的生产者或消费者连接kafka 主题,为现有的应用程序或数据系统。例如,关系数据库的连接器可能会捕获对表的所有更改。

  • 管理员API允许管理和检查的Topic,broker和其他kafka对象。

在Kafka中,客户端和服务器之间的通信是通过简单,高性能,与语言无关的TCP协议完成的。该协议已版本化,并与旧版本保持向后兼容性。我们为Kafka提供了Java客户端,但是客户端支持多种语言

客户端支持的语言:例如:Java、.NET、PHP、Ruby、Python


kafka生态

kafka 生态: cwiki.apache.org/confluence/…

kafka应用场景

消息传递

网站活动跟踪

指标监控

日志处理

流处理

事件溯源

Event
Sourcing

提交日志

kafka.apache.org/uses

快速入门

单机

kafka2.5 下载地址: mirror.bit.edu.cn/apache/kafk…

解压:


> tar -xzf kafka_2.12-2.5.0.tgz
> cd kafka_2.12-2.5.0

启动zookeeper

如果本地没有zookeeper。您可以使用kafka附带的脚本来获取快速且简陋的单节点ZooKeeper实例。

> bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

> bin/kafka-server-start.sh config/server.properties

创建主题

让我们用一个分区和一个副本创建一个名为“ test”的主题

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

现在,如果运行list topic命令,我们可以看到该主题:

> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

发送消息

Kafka带有一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
This is a message
This is another message

消费消息

Kafka还有一个命令行使用者,它将消息转储到标准输出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果上面的每个命令都在不同的终端上运行,那么您现在应该能够在生产者终端中键入消息,并看到它们出现在消费者终端中。

所有的命令行工具都有其他选项。在不带参数的情况下运行该命令将显示用法信息,并对其进行详细记录。

多broker集群

上面是1个broker的集群,现在扩展到3个broker,3个节点:

> ``cp` `config``/server``.properties config``/server-1``.properties
> ``cp` `config``/server``.properties config``/server-2``.properties

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的唯一且永久的名称。我们只需要覆盖端口和日志目录,这是因为我们都在同一台计算机上运行它们,并且希望所有代理都不要试图在同一端口上注册或覆盖彼此的数据。

我们已经有Zookeeper并启动了单个节点,因此我们只需要启动两个新节点:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

现在,创建一个复制因子是3的新主题:

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好的,但是现在有了集群,我们如何知道哪个broker在做什么?这个需要运行“描述主题”命令:

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

这是输出的说明。第一行给出了所有分区的摘要,每一行都给出了有关一个分区的信息。由于该主题只有一个分区,因此只有一行。

leader 是负责给分区的所有读取和写入的节点。每个节点将成为分区随机选择时的部分leader。

replicas 是为分区复制日志的节点列表,不管它们是leader还是当前处于活动状态。

isr 是 "in-sync"副本的集合。这是当前仍处于活动状态并追随leader的副本列表的子集。

请注意,在我的示例中,节点1是该主题的唯一分区的领导者。

我们可以在创建的上一个Topic上运行相同的命令,以查看其位置:

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

发送消息

> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-replicated-topic
...
my ``test` `message 1
my ``test` `message 2
^C

消费消息

> bin``/kafka-console-consumer``.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my ``test` `message 1
my ``test` `message 2
^C

测试容错

> ``ps` `aux | ``grep` `server-1.properties
7564 ttys002    0:15.91 ``/System/Library/Frameworks/JavaVM``.framework``/Versions/1``.8``/Home/bin/java``...
> ``kill` `-9 7564

或者windows上

> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f

领导权已切换到followers 之一,并且节点1不再位于同步副本集中:

> bin``/kafka-topics``.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    ``Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

但是,即使最初进行写操作的领导者处于失败状态,消息仍然可供使用:

> bin``/kafka-console-consumer``.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my ``test` `message 1
my ``test` `message 2
^C

使用Kafka Connect导入/导出数据

从控制台读取数据并将其写回到控制台是一个方便的起点,但是您可能要使用其他来源的数据或将数据从Kafka导出到其他系统。对于许多系统,可以使用Kafka Connect导入或导出数据,而无需编写自定义集成代码。

Kafka Connect是Kafka附带的工具,用于将数据导入和导出到Kafka。它是运行

连接器
的可扩展工具,该
连接器
实现用于与外部系统进行交互的自定义逻辑。在本快速入门中,我们将看到如何使用简单的连接器运行Kafka Connect,该连接器将数据从文件导入到Kafka主题,并将数据从Kafka主题导出到文件。

首先,我们将从创建一些种子数据开始进行测试:

`> ``echo` `-e ``"foo\nbar"` `> ``test``.txt`

或在Windows上:

`> ``echo` `foo> ``test``.txt``> ``echo` `bar>> ``test``.txt`

接下来,我们将启动两个以

独立
模式运行的连接器,这意味着它们将在单个本地专用进程中运行。我们提供了三个配置文件作为参数。第一个始终是Kafka Connect流程的配置,其中包含通用配置,例如要连接的Kafka代理和数据的序列化格式。其余配置文件均指定要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类,以及连接器所需的任何其他配置。

`> bin``/connect-standalone``.sh config``/connect-standalone``.properties config``/connect-file-source``.properties config``/connect-file-si`


这些示例配置文件(随Kafka一起提供)使用您之前启动的默认本地集群配置,并创建两个连接器:第一个是源连接器,它从输入文件中读取行并将每个行生成到Kafka主题,第二个是水槽(sink)连接器。从Kafka主题读取消息,并在输出文件中将它们作为一行显示。

在启动过程中,您会看到许多日志消息,其中包括一些表明正在实例化连接器的消息。Kafka Connect进程开始后,源连接器应开始从test.txt主题中读取行并将其生成到主题中connect-test,而接收器连接器应开始从主题中读取消息connect-test 并将其写入文件中test.sink.txt。我们可以通过检查输出文件的内容来验证数据已通过整个管道传递:

> ``more` `test``.sink.txt
foo
bar

请注意,数据存储在Kafka主题中connect-test,因此我们也可以运行控制台使用者以查看该主题中的数据(或使用自定义使用者代码进行处理):

> bin``/kafka-console-consumer``.sh --bootstrap-server localhost:9092 --topic connect-``test` `--from-beginning
{``"schema"``:{``"type"``:``"string"``,``"optional"``:``false``},``"payload"``:``"foo"``}
{``"schema"``:{``"type"``:``"string"``,``"optional"``:``false``},``"payload"``:``"bar"``}
...

连接器继续处理数据,因此我们可以将数据添加到文件中,并查看它在管道中的移动情况:

> echo Another line>> test.txt

您应该看到该行出现在控制台使用者输出和接收器文件中。

使用Kafka Streams处理数据

Kafka Streams是用于构建关键任务实时应用程序和微服务的客户端库,其中输入和/或输出数据存储在Kafka集群中。Kafka Streams结合了在客户端编写和部署标准Java和Scala应用程序的简便性以及Kafka服务器端集群技术的优势,使这些应用程序具有高度可伸缩性,弹性,容错性,分布式等等。此快速入门示例将演示如何运行此库中编码的流应用程序。

这是WordCountDemo示例代码的要点

github.com/apache/kafk…

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
 
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(
      "streams-plaintext-input",
      Consumed.with(stringSerde, stringSerde)
    );
 
KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
 
    // Group the text words as message keys
    .groupBy((key, value) -> value)
 
    // Count the occurrences of each word (message key).
    .count();
 
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

它实现了WordCount算法,该算法从输入文本中计算单词出现的直方图。但是,与您之前可能看到的其他WordCount示例不同,WordCount演示应用程序的行为略有不同,因为它被设计用来操作无限的没边界数据流。与有界变体类似,它是一种有状态算法,可跟踪和更新单词计数。但是,由于它必须假定可能不受限制的输入数据,因此它将定期输出其当前状态和结果,同时继续处理更多数据,因为它不知道何时处理了“所有”输入数据。

第一步,我们将启动Kafka(除非您已经启动了它),然后我们将为Kafka主题准备输入数据,随后将由Kafka Streams应用程序对其进行处理。

创建输入topic:streams-plaintext-input

> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".


创建输出topic:streams-wordcount-output

> bin``/kafka-topics``.sh --create \
    ``--bootstrap-server localhost:9092 \
    ``--replication-factor 1 \
    ``--partitions 1 \
    ``--topic streams-wordcount-output \
    ``--config cleanup.policy=compact
Created topic ``"streams-wordcount-output"``.

查看topic:

> bin``/kafka-topics``.sh --bootstrap-server localhost:9092 --describe

Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact,segment.bytes=1073741824
    ``Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:segment.bytes=1073741824
    ``Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

该演示应用程序将从输入的主题streams-plaintext-input中读取内容,对每个读取的消息执行WordCount算法的计算,并将其当前结果连续写入输出主题streams-wordcount-output中。因此,除了日志条目外,没有任何STDOUT输出,因为结果被写回到Kafka中。

现在,我们可以在单独的终端中启动producer,以向该主题写入一些输入数据:

> bin``/kafka-console-producer``.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

并通过在单独的终端与consumer控制台一起读取WordCount演示应用程序的输出主题来检查其输出:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
all streams lead to kafka

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1

java 客户端API

生产者:

import java.util.*;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

Partitioning Code:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {
 
    }
 
    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }
 
}

消费者:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
 
    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
 
    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
   }
 
    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
 
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
 
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
 
        return new ConsumerConfig(props);
    }
 
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
 
        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);
 
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        example.shutdown();
    }
}

测试:

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
 
public class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;
 
    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }
 
    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

关于Consumer的一个细节说明:

topicCountMap.put(topic, new Integer(a_numThreads));

这里,如果提供的thread数目(a_numThreads)大于这个topic的partition的数目,有些thread会永远读不到消息。

如果如果提供的thread数目(a_numThreads)小于这个topic的partition的数目,有些thread会从多个partition读到消息。

如果一个线程从多个partition读取消息,无法保证的消息的顺序,只能保证从同一个partition读取到的消息是顺序的。

增加更多的进程/线程消费消息,会导致Kafka re-balance,可能会改变Partition和消费Thread的对应关系。

kafka应用进阶

kafka课程:www.shiyanlou.com/courses/859

这个课程以 Java 代码实战的方式学习 Kafka。包括 Kafka 的安装配置、Producer API 的使用、Consumer API 的使用以及与第三方框架 Flume、Spark Streaming 的集成开发。对在大数据项目中经常用到的 Kafka 关键知识点,进行全方位、源代码级别的学习,使学员深入理解 Kafka 的内部机制。

  • MQ 消息系统的概念

  • Zookeeper 和 Kafka 的关系

  • 使用 Java 调用 Kafka topic 相关 API

  • Producer 入门开发

  • Consumer 入门开发

  • Kafka 整合 Flume

  • Kafka 发送接收非结构化数据实战

  • Kafka 消息系统的基本架构

  • 使用 shell 命令修改和删除 topic

  • 查看 Kafka 中消息的存储位置

  • Producer 进阶开发

  • Consumer 进阶开发

  • Kafka 发送接收结构化数据实战

  • Spark 2.0 读取 Kafka 数据实战

kafka架构


Kafka各组件说明

Broker

每个kafka server称为一个Broker,多个borker组成kafka cluster。

一个机器上可以部署一个或者多个Broker,这多个Broker连接到相同的ZooKeeper就组成了Kafka集群。

Topic

Topic 就是消息类别名,一个topic中通常放置一类消息。每个topic都有一个或者多个订阅者,也就是消息的消费者consumer。

Producer将消息推送到topic,由订阅该topic的consumer从topic中拉取消息。

Topic 与broker

一个Broker上可以创建一个或者多个Topic。同一个topic可以在同一集群下的多个Broker中分布。

Partition log

Kafka会为每个topic维护了多个分区(partition),每个分区会映射到一个逻辑的日志(log)文件:


Partition distribution

日志分区是分布式的存在于一个kafka集群的多个broker上。每个partition会被复制多份存在于不同的broker上。这样做是为了容灾。具体会复制几份,会复制到哪些broker上,都是可以配置的。经过相关的复制策略后,每个topic在每个broker上会驻留一到多个partition。

对于同一个partition,它所在任何一个broker,都有能扮演两种角色:leader、follower。

每个partition的Leader的用于处理到该partition的读写请求的。

每个partition的followers是用于异步的从它的leader中复制数据的。

Kafka会动态维护一个与Leader保持一致的同步副本(in-sync replicas (ISR))集合,并且会将最新的同步副本(ISR )集合持久化到zookeeper。如果leader出现问题了,就会从该partition的followers中选举一个作为新的leader。

所以呢,在一个kafka集群中,每个broker通常会扮演两个角色:在一个partition中扮演leader,在其它的partition中扮演followers。Leader是最繁忙的,要处理读写请求。这样将leader均分到不同的broker上,目的自然是要确保负载均衡。

Producer

Producer作为消息的生产者,在生产完消息后需要将消息投送到指定的目的地(某个topic的某个partition)。Producer可以根据指定选择partition的算法或者是随机方式来选择发布消息到哪个partition。

Consumer

在Kafka中,同样有consumer group的概念,它是逻辑上将一些consumer分组。因为每个kafka consumer是一个进程。所以一个consumer group中的consumers将可能是由分布在不同机器上的不同的进程组成的。Topic中的每一条消息可以被多个consumer group消费,然而每个consumer group内只能有一个consumer来消费该消息。所以,如果想要一条消息被多个consumer消费,那么这些consumer就必须是在不同的consumer group中。所以也可以理解为consumer group才是topic在逻辑上的订阅者。

每个consumer可以订阅多个topic。

每个consumer会保留它读取到某个partition的offset。而consumer 是通过zookeeper来保留offset的。

组件说明来源:www.cnblogs.com/f1194361820…

副本(Replica)

提到副本,肯定就会想到正本。副本是正本的拷贝。在kafka中,正本和副本都称之为副本(Repalica),但存在leader和follower之分。活跃的称之为leader,其他的是follower。

每个分区的数据都会有多份副本,以此来保证Kafka的高可用。

原文链接:blog.csdn.net/liyiming201…

Topic、partition、replica的关系如下图:


kafka原理

Kafka提供的顺序保障

1、如果producer往特定的partition发送消息时,会按照先后顺序存储,也就是说如果发送顺序是message1、message2、message3。那么这三个消息在partition log中的记录的offset就是 message1_offset < message2_offset < message3_offset。

2、consumer也是有序的浏览log中的记录。

3、如果一个topic指定了replication factor为N,那么就允许有N-1个Broker出错。

Kafka存储策略

1)kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。

2)每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。

3)每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。

4)发布者发到某个topic的消息会被均匀的分布到多个partition上(或根据用户指定的路由规则进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

Kafka数据保留策略

1)N天前的删除。

2)保留最近的多少Size数据。

Kafka broker

与其它消息系统不同,Kafka broker是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护,broker完全不管(有offset manager broker管理)。

  • 从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka创新性地解决了这个问题,它将一个简单的基于时间的SLA应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。

  • 这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。

复制(Replication)

1)一个partition的复制个数(replication factor)包括这个partition的leader本身。

2)所有对partition的读和写都通过leader。

3)Followers通过pull获取leader上log(message和offset)

4)如果一个follower挂掉、卡住或者同步太慢,leader会把这个follower从”in sync replicas“(ISR)列表中删除。

5)当所有的”in sync replicas“的follower把一个消息写入到自己的log中时,这个消息才被认为是”committed“的。

6)如果针对某个partition的所有复制节点都挂了,Kafka默认选择最先复活的那个节点作为leader(这个节点不一定在ISR里)。

Leader选举

Kafka在Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)。

在这种模式下,对于f+1个副本,一个Kafka topic能在保证不丢失已经commit消息的前提下容忍f个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,为了容忍f个副本的失败,“少数服从多数”的方式和ISR在commit前需要等待的副本的数量是一样的,但是ISR需要的总的副本的个数几乎是“少数服从多数”的方式的一半。

The Producer

发送确认

通过request.required.acks来设置,选择是否等待消息commit(是否等待所有的”in sync replicas“都成功复制了数据)

Producer可以通过acks参数指定最少需要多少个Replica确认收到该消息才视为该消息发送成功。acks的默认值是1,即Leader收到该消息后立即告诉Producer收到该消息,此时如果在ISR中的消息复制完该消息前Leader宕机,那该条消息会丢失。

推荐的做法是,将acks设置为all或者-1,此时只有ISR中的所有Replica都收到该数据(也即该消息被Commit),Leader才会告诉Producer该消息发送成功,从而保证不会有未知的数据丢失。

负载均衡

1)producer可以自定义发送到哪个partition的路由规则。默认路由规则:hash(key)%numPartitions,如果key为null则随机选择一个partition。

2)自定义路由:如果key是一个user id,可以把同一个user的消息发送到同一个partition,这时consumer就可以从同一个partition读取同一个user的消息。

异步批量发送

批量发送:配置不多于固定消息数目一起发送并且等待时间小于一个固定延迟的数据。

The Consumer

consumer控制消息的读取。

Push vs Pull

1) producer push data to broker,consumer pull data from broker

2) consumer pull的优点:consumer自己控制消息的读取速度和数量。

3) consumer pull的缺点:如果broker没有数据,则可能要pull多次忙等待,Kafka可以配置consumer long pull一直等到有数据。

Consumer Position

1) 大部分消息系统由broker记录哪些消息被消费了,但Kafka不是。

2) Kafka由consumer控制消息的消费,consumer甚至可以回到一个old offset的位置再次消费消息。

Consumer group

每一个consumer实例都属于一个consumer group。

每一条消息只会被同一个consumer group里的一个consumer实例消费。

不同consumer group可以同时消费同一条消息。

Consumer Rebalance

Kafka consumer high level API:

如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据。

如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据。

如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。

Message Delivery Semantics

三种:

At most once—Messages may be lost but are never redelivered.

At least once—Messages are never lost but may be redelivered.

Exactly once—this is what people actually want, each message is delivered once and only once.

Producer:有个”acks“配置可以控制接收的leader的在什么情况下就回应producer消息写入成功。

Consumer:

* 读取消息,写log,处理消息。如果处理消息失败,log已经写入,则无法再次处理失败的消息,对应“At most once”。

* 读取消息,处理消息,写log。如果消息处理成功,写log失败,则消息会被处理两次,对应“At least once”。

* 读取消息,同时处理消息并把result和log同时写入。这样保证result和log同时更新或同时失败,对应“Exactly once”。

Kafka默认保证at-least-once delivery,容许用户实现at-most-once语义,exactly-once的实现取决于目的存储系统,kafka提供了读取offset,实现也没有问题。

Distribution

Consumer Offset Tracking

1)High-level consumer记录每个partition所消费的maximum offset,并定期commit到offset manager(broker)。

2)Simple consumer需要手动管理offset。现在的Simple consumer Java API只支持commit offset到zookeeper。

Consumers and Consumer Groups

1)consumer注册到zookeeper

2)属于同一个group的consumer(group id一样)平均分配partition,每个partition只会被一个consumer消费。

3)当broker或同一个group的其他consumer的状态发生变化的时候,consumer rebalance就会发生。

Zookeeper协调控制

1)管理broker与consumer的动态加入与离开。

2)触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡。

3)维护消费关系及每个partition的消费信息。

日志压缩(Log Compaction)

1)针对一个topic的partition,压缩使得Kafka至少知道每个key对应的最后一个值。

2)压缩不会重排序消息。

3)消息的offset是不会变的。

4)消息的offset是顺序的。

5)压缩发送和接收能降低网络负载。

6)以压缩后的形式持久化到磁盘。

kafka原理 : www.cnblogs.com/luxiaoxun/p…

分区的读和写


参考:

官网:kafka.apache.org/

Kafka史上最详细原理总结上 www.jianshu.com/p/734cf729d…

Kafka基本原理www.cnblogs.com/luxiaoxun/p…

kafka : recomm.cnblogs.com/blogpost/82…

Apache Kafka核心概念-多图-形象易懂 blog.csdn.net/liyiming201…

原创Kafka轻松学系列教程blog.csdn.net/liyiming201…