1、如何获取 topic 主题的列表
2、生产者和消费者的命令行是什么?
3、consumer 是推还是拉?
4、讲讲 kafka 维护消费状态跟踪的方法
5、讲一下主从同步
Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topci配置副本的数量。Kafka会自动在每个个副本上备份数据,所以当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。
创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。flowers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。
许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否
着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
- 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接。
- 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。
符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。
只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数request.required.acks决定的。
Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。
Leader的选择
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。
Kafaka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。
一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。
实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:
- 等待ISR中的任何一个节点恢复并担任leader。
- 选择所有节点中(不只是ISR)第一个恢复的节点作为leader.
这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。
副本管理
以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.
优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
6、为什么需要消息系统,mysql 不能满足需求吗?
1.解耦:
2.冗余:
3.扩展性:
4.灵活性 & 峰值处理能力:
5.可恢复性:
6.顺序保证:
7.缓冲:
8.异步通信:
7、Zookeeper 对于 Kafka 的作用是什么?
8、数据传输的事务定义有哪三种?
(1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
9、Kafka 判断一个节点是否还活着有那两个条件?
10、Kafka 与传统 MQ 消息系统之间有三个关键区别
11、讲一讲 kafka 的 ack 的三种机制
12、消费者如何不自动提交偏移量,由应用提交?
ConsumerRecords<> records = consumer.poll();
for (ConsumerRecord<> record : records){
。。。
tyr{consumer.commitSync()
}
。。。
}
13、消费者故障,出现活锁问题如何解决?
消费者提供两个配置设置来控制 poll 循环:
14、如何控制消费的位置
15、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?
16、kafka 的高可用机制是什么?
17、kafka 如何减少数据丢失
Kafka到底会不会丢数据(data loss)? 通常不会,但有些情况下的确有可能会发生。下面的参数配置及Best practice列表可以较好地保证数据的持久性(当然是trade-off,牺牲了吞吐量)。笔者会在该列表之后对列表中的每一项进行讨论,有兴趣的同学可以看下后面的分析。
- block.on.buffer.full = true
- acks = all
- retries = MAX_VALUE
- max.in.flight.requests.per.connection = 1
- 使用KafkaProducer.send(record, callback)
- callback逻辑中显式关闭producer:close(0)
- unclean.leader.election.enable=false
- replication.factor = 3
- min.insync.replicas = 2
- replication.factor > min.insync.replicas
- enable.auto.commit=false
- 消息处理完成之后再提交位移
给出列表之后,我们从两个方面来探讨一下数据为什么会丢失:
1. Producer端
目前比较新版本的Kafka正式替换了Scala版本的old producer,使用了由Java重写的producer。新版本的producer采用异步发送机制。KafkaProducer.send(ProducerRecord)方法仅仅是把这条消息放入一个缓存中(即RecordAccumulator,本质上使用了队列来缓存记录),同时后台的IO线程会不断扫描该缓存区,将满足条件的消息封装到某个batch中然后发送出去。显然,这个过程中就有一个数据丢失的窗口:若IO线程发送之前client端挂掉了,累积在accumulator中的数据的确有可能会丢失。
Producer的另一个问题是消息的乱序问题。假设客户端代码依次执行下面的语句将两条消息发到相同的分区
producer.send(record1);
producer.send(record2);
如果此时由于某些原因(比如瞬时的网络抖动)导致record1没有成功发送,同时Kafka又配置了重试机制和max.in.flight.requests.per.connection大于1(默认值是5,本来就是大于1的),那么重试record1成功后,record1在分区中就在record2之后,从而造成消息的乱序。很多某些要求强顺序保证的场景是不允许出现这种情况的。
鉴于producer的这两个问题,我们应该如何规避呢??对于消息丢失的问题,很容易想到的一个方案就是:既然异步发送有可能丢失数据, 我改成同步发送总可以吧?比如这样:
producer.send(record).get();
这样当然是可以的,但是性能会很差,不建议这样使用。因此特意总结了一份配置列表。个人认为该配置清单应该能够比较好地规避producer端数据丢失情况的发生:(特此说明一下,软件配置的很多决策都是trade-off,下面的配置也不例外:应用了这些配置,你可能会发现你的producer/consumer 吞吐量会下降,这是正常的,因为你换取了更高的数据安全性)
- block.on.buffer.full = true 尽管该参数在0.9.0.0已经被标记为“deprecated”,但鉴于它的含义非常直观,所以这里还是显式设置它为true,使得producer将一直等待缓冲区直至其变为可用。否则如果producer生产速度过快耗尽了缓冲区,producer将抛出异常
- acks=all 很好理解,所有follower都响应了才认为消息提交成功,即"committed"
- retries = MAX 无限重试,直到你意识到出现了问题:)
- max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序
- 使用KafkaProducer.send(record, callback)而不是send(record)方法 自定义回调逻辑处理消息发送失败
- callback逻辑中最好显式关闭producer:close(0) 注意:设置此参数是为了避免消息乱序
- unclean.leader.election.enable=false 关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失
- replication.factor >= 3 这个完全是个人建议了,参考了Hadoop及业界通用的三备份原则
- min.insync.replicas > 1 消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用
- 保证replication.factor > min.insync.replicas 如果两者相等,当一个副本挂掉了分区也就没法正常工作了。通常设置replication.factor = min.insync.replicas + 1即可
2. Consumer端
consumer端丢失消息的情形比较简单:如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,现给出两点建议:
- enable.auto.commit=false 关闭自动提交位移
- 在消息被完整处理之后再手动提交位移