博文推荐 | 基于 Pulsar 事务实现 Exactly-Once 语义

1,482 阅读12分钟

译者简介

原文由李鹏辉发布于 StreamNative 英文站点:streamnative.io/en/blog/rel…

译者:王嘉凌@中国移动云能力中心,移动云 Pulsar 产品负责人,Apache Pulsar Contributor,活跃于 Apache Pulsar 等开源项目和社区

Apache Pulsar 社区在刚刚发布的 Pulsar 2.8.0 版本中实现了一个里程碑式功能:Exactly-once(精确一次)语义。在这之前,我们只能通过在 Broker 端开启消息去重来保证单个 Topic 上的 Exactly-once 语义。随着 Pulsar 2.8.0 的发布,利用事务 API 可以在跨 Topic 的场景下保证消息生产和确认的原子性操作。接下来,我会解释一下这两种方式的含义和实现方法,以及在实时数据消息和流计算系统中如何使用 Pulsar 事务特性来实现 Exactly-once 语义。

在深入理解 Pulsar 事务特性之前,我们先来回顾一下消息语义的概念。

什么是 Exactly-once (精确一次)语义?

在分布式系统中,任何节点都有可能出现异常甚至宕机。在 Apache Pulsar 中也一样,当 Producer 在生产消息时,可能会发生 Broker 或 Bookie 宕机不可用,或者网络突然中断的异常情况。根据在发生异常时 Producer 处理消息的方式,系统可以具备以下三种消息语义。

At-least-once (至少一次)语义

Producer 通过接收 Broker 的 ACK (消息确认)通知来确保消息成功写入 Pulsar Topic。然而,当 Producer 接收 ACK 通知超时,或者收到 Broker 出错信息时,会尝试重新发送消息。如果 Broker 正好在成功把消息写入到 Topic,但还没有给 Producer 发送 ACK 时宕机,Producer 重新发送的消息会被再次写入到 Topic,最终导致消息被重复分发至 Consumer。

At-most-once (最多一次)语义

当 Producer 在接收 ACK 超时,或者收到 Broker 出错信息时不重发消息,那就有可能导致这条消息丢失,没有写入到 Topic 中,也不会被 Consumer 消费到。在某些场景下,为了避免发生重复消费,我们可以容许消息丢失的发生。

Exactly-once (精确一次)语义

Exactly-once 语义保证了即使 Producer 多次发送同一条消息到服务端,服务端也仅仅会记录一次。Exactly-once 语义是最可靠的,同时也是最难理解的。Exactly-once 语义需要消息队列服务端,消息生产端和消费端应用三者的协同才能实现。比如,当消费端应用成功消费并且 ACK 了一条消息之后,又把消费位点回滚到之前的一个消息 ID,那么从那个消息 ID 往后的所有消息都会被消费端应用重新消费到。

实现 Exactly-once 语义的难点

在分布式消息中间件系统中实现 Exactly-once 语义面临不少挑战。下面通过一个简单的例子来描述。

假设有一个 Producer 发送一条内容为 “Hello StreamNative” 的消息到 Pulsar 上 “Greetings” 这个 Topic 中,然后有一个 Consumer 会从这个 Topic 中接收消息并打印出来。在理想的情况下,没有异常出现,“Hello StreamNative” 这条消息只会往 “Greetings” 这个 Topic 中写入一次,然后 Consumer 会接收到这条消息并进行处理,然后通过 ACK 来通知 Pulsar 消息已处理完毕。之后哪怕 Consumer 出现宕机或者重启, 也不会再次接收到这条消息。

然而, 异常和错误往往无处不在。

Bookie 可能出现宕机

Pulsar 利用 BookKeeper 来存储消息。BookKeeper 是一个高可用的持久化日志存储系统,写入 Ledger (Pulsar 中 Topic 的一个分片)的数据会保存在 N 个 Bookie 节点上,也就是说,BookKeeper 可以容忍 N-1 个 Bookie 节点的宕机。只要至少有一个 Bookie 节点可用,这个 Ledger 上的数据就不会丢失。依托 Zab 协议和 Paxos 算法,BookKeeper 的副本协议可以保证一旦数据成功写入 Bookie 中,这些数据将自动复制到属于同一组的 Bookie 节点上永久保存。

Broker 可能出现宕机,或者与 Producer 之间的网络出现中断

Producer 通过接收 Broker 的 ACK 通知来确保消息发送成功。然而,没有接收到 ACK 通知并不总是意味着发送消息失败。Broker 可能会在成功把消息写入到 Topic 之后,还没有给 Producer 发送 ACK 的时候出现异常,也可能会在把消息写入到 Topic 之前就出现异常。由于无法知道 Broker 出现异常的原因,Producer 在接收 ACK 失败的情况下默认会认为消息没有发送成功并重新发送。这意味着在某些情况下,Pulsar 会写入重复的消息,从而导致 Consumer 重复消费。

Pulsar 客户端可能出现宕机

实现 Exactly-once 时必须考虑到 Pulsar 客户端不可用的情况。很难去准确的区分客户端是不可恢复的宕机了还是只是暂时的不可用,但对于 Broker 来说具备这种判断能力是很重要的。Pulsar Broker 需要屏蔽掉非正常状态下的客户端发过来的消息。一旦客户端重新启动,客户端可以知道之前发生失败时的状态,并从恰当的地方接着处理后续的消息。

Pulsar 社区通过两个阶段来实现 Exactly-once 语义。在 Pulsar 1.20.0-incubating 版本中我们通过幂等性 Producer 来保证单个 Topic 上的 Exactly-once 语义。在最新发布的 Pulsar 2.8.0 版本中我们通过引入事务 API 来保证跨 Topic 场景下消息的原子性操作。

幂等性 Producer:实现单个 Topic 的 Exactly-once 语义

我们从在 Pulsar 1.20.0-incubating 版本中通过幂等性 Producer 来保证单个 Topic 上的 Exactly-once 语义开始讲起。

什么是幂等性 Producer ?幂等性就是指对于同一操作发起的一次或者多次请求的结果是一致的,不会因为多次操作而产生不同的结果。如果在集群(Cluster) 或者 命名空间(Namespace) 级别开启消息去重,同时在消息生产端配置幂等性 Producer,那么当出现由于异常导致 Producer 重发消息时,重复的消息只会在 Broker 中写入一次。

通过这个功能可以实现在单个 Topic 下不会有消息丢失,不会有重复消息,所有的消息都是有序的。我们可以通过以下配置来开启这个功能:

  • 在 Cluster 级别(针对所有 Namespace 下的 Topic 有效),Namespace 级别(针对该 Namespace下的 Topic 有效)或者 Topic 级别 (针对单个 Topic 有效)开启消息去重
  • 为 Producer 设置任意的名称并且设置消息超时时间为 0

这个功能是如何实现的?简单来讲,和 TCP 协议的消息去重机制非常类似:每条发送给 Pulsar 的消息都会带有一个唯一的序列号,Pulsar Broker 利用这个序列号来判断和去除重复的消息。不同的是,TCP 协议只能保证实时连接中的消息去重,而 Pulsar 会把消息体中的序列号保存到 Topic 中,并且记录最新接收到的序列号。所以哪怕 Broker 节点出现异常宕机了,另一个重新接管处理该 Topic 的 Broker 节点也可以判断消息是否重复。这个原理非常简单,和非幂等性 Producer 相比增加的性能损耗几乎可以忽略不计。

Pulsar 1.20.0-incubating 以后的版本都支持这个功能,可以从这里找到这个功能的介绍。

然而,幂等性 Producer 只能在特定的场景保证 Exactly-once 语义,在其他的场景却无能为力。比如:当 Producer 需要确保一条消息同时发送到多个 Topic 时,负责处理其中某些 Topic 的 Broker 宕机了。如果 Producer 不重发消息,就会导致一部分 Topic 中的消息丢失。如果 Producer 重发消息,就会导致其他 Topic 中的消息重复写入。

在消费端,Consumer 向 Broker 发出的 ACK 请求属于 Best-effort(尽力服务),也就是说 ACK 请求可能会丢失,并且 Consumer 无法知道 Broker 是否正常收到了 ACK 请求,在发生 ACK 请求丢失时也不会重新发送。这也会导致 Consumer 接收到重复的消息。

事务 API: 实现跨 Topic 消息生产和确认的原子性操作

为了解决上述问题,我们通过引入事务 API 来保证在跨 Topic 场景下消息发送和确认的原子性操作。通过这个功能,Producer 可以确保一条消息同时发送到多个 Topic,要么这些消息都发送成功,在所有 Topic 上都可以被消费,要么所有消息都不能被消费。这个功能也允许在一个事务操作中对多个 Topic 上的消息进行 ACK 确认,从而实现端到端的 Exactly-once 语义。

以下实例代码演示如何使用事务 API:

PulsarClient pulsarClient = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .enableTransaction(true)
        .build();
Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(1, TimeUnit.MINUTES)
        .build()
        .get();
producer.newMessage(txn).value("Hello Pulsar Transaction".getBytes()).send();
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message.getMessageId(), txn);
txn.commit().get();

这段代码展示了如何使用事务 API 实现消息发送和确认的原子性操作,以及如何使用事务 API 在同一个事务操作中确认消息。

需要注意的是:

  • 同一个 Topic 中可以有部分消息属于某个事务,部分消息不属于任何事务。
  • Pulsar 客户端中允许存在多个并行的未提交事务。这是和其他支持事务的消息系统最根本的区别,可以极大提高事务消息的处理能力。
  • 当前的 Pulsar 事务 API 只支持 READ_COMMITTED (读已提交)隔离级别。Consumer 只会消费到不属于任何事务的消息和已提交的事务中的消息,不会消费到未提交和已回滚事务中的消息。

在 Pulsar 客户端使用事务 API 不需要额外的配置和依赖。

端到端 Exactly-once 流计算变得更简单: Pulsar+Flink 的一个例子

通过 Pulsar 事务 API 我们已可以在流计算场景中实现 Exactly-once 语义。

在流计算系统中,有个关键的问题经常被提及:“如果在流计算过程中有一些中间节点宕机了,如何去保证最终计算结果不会出问题?”解决这个问题的关键,在于出现了异常的节点恢复后,如何从发生异常之前的状态重新开始处理流数据。

在 Apache Pulsar 上进行流计算,本质上是对多个 Topic 上的消息进行 Read-Process-Write 操作。Source 节点从一个或多个输入 Topic 中消费消息,然后通过 Process 节点对消息进行一系列的计算和状态处理,最后通过 Sink 节点把处理结果发送到记录结果的 Topic 中。流计算场景下的 Exactly-once 指的是对于 Read-Process-Write 的一整套操作的执行符合 Exactly-once 语义,即不会丢失输入 Topic 上的任何消息,也不会往记录结果的 Topic 上重复写入消息。这就是用户在流计算系统上期望的 Exactly-once 效果。

我们来看一个 Pulsar 结合 Flink 进行流计算的示例。

在 Pulsar 2.8.0 之前,Pulsar 结合 Flink 进行流计算只支持 Exactly-once Source Connector 和 At-least-once Sink Connector。这意味着使用 Pulsar 和 Flink 构建的端到端流计算系统最多只能实现 At-least-once 语义。也就是说发送到记录结果的 Topic 中的消息可能会重复。

利用在 Pulsar 2.8.0 引入的事务 API,Pulsar-Fink Sink Connector 通过简单的改造就可以支持 Exactly-once 语义。Flink 使用二阶段提交协议(Two-Phase Commit)来保证端到端的 Exactly-once 语义,所以我们可以实现 TwoPhaseCommitSinkFunction 并嵌入 Pulsar 的事务 API。当 Pulsar-Fink Sink Connector 调用 beginTransaction 时,我们创建一个 Pulsar 事务并保存事务 ID。后续所有写入到 Sink Connector 的消息都设置这个事务 ID。当 Connector 调用 preCommit 时将这些消息写入到 Pulsar。当 Connector 调用 recoverAndCommit 或者 recoverAndAbort 时分别调用 Pulsar 事务 API 来提交或者回滚 Pulsar 事务。这个改造非常简单,只需要在 Connector 中保存 Pulsar 事务 ID 和 Flink Checkpoints 的关联关系,从而在 Flink 的 事务提交和回滚操作中获取到对应的 Pulsar 事务 ID 即可。

基于 Pulsar 事务提供的幂等性和原子性操作,以及 Apache Flink 提供的全局一致性 CheckPoint 检查机制,我们可以很容易利用 Pulsar 和 Flink 构建出一套符合端到端 Exactly-once 语义的流计算系统。

后续

如果你想了解 Exactly-once 实现的更多细节,推荐阅读下 Pulsar 社区改进提案 PIP-31。想了解更多设计细节,也推荐阅读下设计文档

本文主要是基于用户的角度来介绍 Apache Pulsar 2.8.0 中的新特性事务 API,以及如何使用这个特性去实现 Exactly-once 语义。在下一篇文章中更详细的介绍事务 API 的设计与实现。

近期举办的 Pulsar Summit 北美峰会,有相关演讲《Exactly-Once Made Easy: Transactional Messaging in Apache Pulsar》,可查看视频

致谢

在过去的一年里有多个 Pulsar Committer 和 Contributor 参与开发了这个里程碑式的功能,在此感谢他们:李鹏辉、 高冉、丛博、Addison Higham、翟佳、张勇、冉小龙、Matteo Merli、郭斯杰。

同时,再次致谢译者王嘉凌@中国移动云能力中心的优秀翻译,让我们快速看到了这篇博文的中文版本。

相关阅读

点击链接 ,获取 Apache Pulsar 硬核干货资料!