Flink基础教程:时间语义、Event Time和Watermark机制原理与实践

2,132 阅读14分钟

在流处理中,时间是一个非常核心的概念,是整个系统的基石。比如,我们经常会遇到这样的需求:给定一个时间窗口,比如一个小时,统计时间窗口的内数据指标。那如何界定哪些数据将进入这个窗口呢?在窗口的定义之前,首先需要确定一个应用使用什么样的时间语义。

本文将介绍Flink的Event Time、Processing Time和Ingestion Time三种时间语义,接着会详细介绍Event Time和Watermark的工作机制,以及如何对数据流设置Event Time并生成Watermark。

二维码

Flink的三种时间语义

Flink中三种时间语义

如上图所示,Flink支持三种时间语义:

Event Time

Event Time指的是数据流中每个元素或者每个事件自带的时间属性,一般是事件发生的时间。由于事件从发生到进入Flink时间算子之间有很多环节,一个较早发生的事件因为延迟可能较晚到达,因此使用Event Time意味着事件到达有可能是乱序的。

使用Event Time时,最理想的情况下,我们可以一直等待所有的事件到达后再进行时间窗口的处理。假设一个时间窗口内的所有数据都已经到达,基于Event Time的流处理会得到正确且一致的结果:无论我们是将同一个程序部署在不同的计算环境还是在相同的环境下多次计算同一份数据,都能够得到同样的计算结果。我们根本不同担心乱序到达的问题。但这只是理想情况,现实中无法实现,因为我们既不知道究竟要等多长时间才能确认所有事件都已经到达,更不可能无限地一直等待下去。在实际应用中,当涉及到对事件按照时间窗口进行统计时,Flink会将窗口内的事件缓存下来,直到接收到一个Watermark,以确认不会有更晚数据的到达。Watermark意味着在一个时间窗口下,Flink会等待一个有限的时间,这在一定程度上降低了计算结果的绝对准确性,而且增加了系统的延迟。在流处理领域,比起其他几种时间语义,使用Event Time的好处是某个事件的时间是确定的,这样能够保证计算结果在一定程度上的可预测性。

一个基于Event Time的Flink程序中必须定义Event Time,以及如何生成Watermark。我们可以使用元素中自带的时间,也可以在元素到达Flink后人为给Event Time赋值。

使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。

Processing Time

对于某个算子来说,Processing Time指算子使用当前机器的系统时钟来定义时间。在Processing Time的时间窗口场景下,无论事件什么时候发生,只要该事件在某个时间段达到了某个算子,就会被归结到该窗口下,不需要Watermark机制。对于一个程序在同一个计算环境来说,每个算子都有一定的耗时,同一个事件的Processing Time,第n个算子和第n+1个算子不同。如果一个程序在不同的集群和环境下执行时,限于软硬件因素,不同环境下前序算子处理速度不同,对于下游算子来说,事件的Processing Time也会不同,不同环境下时间窗口的计算结果会发生变化。因此,Processing Time在时间窗口下的计算会有不确定性。

Processing Time只依赖当前执行机器的系统时钟,不需要依赖Watermark,无需缓存。Processing Time是实现起来非常简单也是延迟最小的一种时间语义。

Ingestion Time

Ingestion Time是事件到达Flink Souce的时间。从Source到下游各个算子中间可能有很多计算环节,任何一个算子的处理速度快慢可能影响到下游算子的Processing Time。而Ingestion Time定义的是数据流最早进入Flink的时间,因此不会被算子处理速度影响。

Ingestion Time通常是Event Time和Processing Time之间的一个折中方案。比起Event Time,Ingestion Time可以不需要设置复杂的Watermark,因此也不需要太多缓存,延迟较低。比起Processing Time,Ingestion Time的时间是Souce赋值的,一个事件在整个处理过程从头至尾都使用这个时间,而且后续算子不受前序算子处理速度的影响,计算结果相对准确一些,但计算成本稍高。

设置时间语义

在Flink中,我们需要在执行环境层面设置使用哪种时间语义。下面的代码使用Event Time:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

如果想用另外两种时间语义,需要替换为:TimeCharacteristic.ProcessingTimeTimeCharacteristic.IngestionTime

Event Time和Watermark

Flink的三种时间语义中,Processing Time和Ingestion Time都可以不用设置Watermark。如果我们要使用Event Time语义,以下两项配置缺一不可:第一,使用一个时间戳为数据流中每个事件的Event Time赋值;第二,生成Watermark。

实际上,Event Time是每个事件的元数据,Flink并不知道每个事件的发生时间是什么,我们必须要为每个事件的Event Time赋值一个时间戳。关于时间戳,包括Flink在内的绝大多数系统都支持Unix时间戳系统(Unix time或Unix epoch)。Unix时间戳系统以1970-01-01 00:00:00.000 为起始点,其他时间记为距离该起始时间的整数差值,一般是毫秒(millisecond)精度。

有了Event Time时间戳,我们还必须生成Watermark。Watermark是Flink插入到数据流中的一种特殊的数据结构,它包含一个时间戳,并假设后续不会有小于该时间戳的数据。下图展示了一个乱序数据流,其中方框是单个事件,方框中的数字是其对应的Event Time时间戳,圆圈为Watermark,圆圈中的数字为Watermark对应的时间戳。

一个包含时间戳和Watermark的乱序数据流

Watermark的生成有以下几点需要注意:

  • Watermark与事件的时间戳紧密相关。一个时间戳为T的Watermark假设后续到达的事件时间戳都大于T。
  • 假如Flink算子接收到一个违背上述规则的事件,该事件将被认定为迟到数据,如上图中时间戳为19的事件比Watermark(20)更晚到达。Flink提供了一些其他机制来处理迟到数据。
  • Watermark时间戳必须单调递增,以保证时间不会倒流。
  • Watermark机制允许用户来控制准确度和延迟。Watermark设置得与事件时间戳相距紧凑,会产生不少迟到数据,影响计算结果的准确度,整个应用的延迟很低;Watermark设置得非常宽松,准确度能够得到提升,但应用的延迟较高,因为Flink必须等待更长的时间才进行计算。

分布式环境下Watermark的传播

在实际计算过程中,Flink的算子一般分布在多个并行的分区(或者称为实例)上,Flink需要将Watermark在并行环境下向前传播。如下图所示,Flink的每个并行算子子任务会维护针对该子任务的Event Time时钟,这个时钟记录了这个算子子任务Watermark处理进度,随着上游Watermark数据不断向下发送,算子子任务的Event Time时钟也要不断向前更新。由于上游各分区的处理速度不同,到达当前算子的Watermark也会有先后快慢之分,每个算子子任务会维护来自上游不同分区的Watermark信息,这是一个列表,列表内对应上游算子各分区的Watermark时间戳等信息。

当上游某分区有Watermark进入该算子子任务后,Flink先判断新流入的Watermark时间戳是否大于Partition Watermark列表内记录的该分区的历史Watermark时间戳,如果新流入的更大,则更新该分区的Watermark。例如,某个分区新流入的Watermark时间戳为4,算子子任务维护的该分区Watermark为1,那么Flink会更新Partition Watermark列表为最新的时间戳4。接着,Flink会遍历Partition Watermark列表中的所有时间戳,选择最小的一个作为该算子子任务的Event Time。同时,Flink会将更新的Event Time作为Watermark发送给下游所有算子子任务。算子子任务Event Time的更新意味着该子任务将时间推进到了这个时间,该时间之前的事件已经被处理并发送到下游。例如,图中第二步和第三步,Partition Watermark列表更新后,导致列表中最小时间戳发生了变化,算子子任务的Event Time时钟也相应进行了更新。整个过程完成了数据流中的Watermark推动算子子任务Watermark的时钟更新过程。Watermark像一个幕后推动者,不断将流处理系统的Event Time向前推进。我们可以将这种机制总结为:

  1. Flink某算子子任务根据各上游流入的Watermark来更新Partition Watermark列表。
  2. 选取Partition Watermark列表中最小的时间作为该算子的Event Time,并将这个时间发送给下游算子。

这样的设计机制满足了并行环境下Watermark在各算子中的传播问题,但是假如某个上游分区的Watermark一直不更新,Partition Watermark列表其他地方都在正常更新,唯独个别分区的时间停留在很早的某个时间,这会导致算子的Event Time时钟不更新,相应的时间窗口计算也不会被触发,大量的数据积压在算子内部得不到处理,整个流处理处于空转状态。这种问题可能出现在使用数据流自带的Watermark,自带的Watermark在某些分区下没有及时更新。针对这种问题,一种解决办法是根据机器当前的时钟周期性地生成Watermark。

此外,在union等多数据流处理时,Flink也使用上述Watermark更新机制,那就意味着,多个数据流的时间必须对齐,如果一方的Watermark时间较老,那整个应用的Event Time时钟也会使用这个较老的时间,其他数据流的数据会被积压。一旦发现某个数据流不再生成新的Watermark,我们要在SourceFunction中的SourceContext里调用markAsTemporarilyIdle设置该数据流为空闲状态。

抽取时间戳及生成Watermark

至此,我们已经了解了Flink的Event Time时间戳和Watermark机制的大致工作原理,接下来我们将展示如何在代码层面设置时间戳并生成Watermark。显然,对时间和Watermark的设置只对Event Time时间语义起作用,如果一个作业基于Processing Time或Ingestion Time,那时间的设置没有什么意义。因为时间在后续处理中都会用到,时间的设置要在任何时间窗口操作之前,总之,时间越早设置越好。Flink提供了以下方法设置时间戳和Watermark:

Source

我们可以在Source阶段,通过自定义SourceFunctionRichSourceFunction,在SourceContext里重写void collectWithTimestamp(T element, long timestamp)void emitWatermark(Watermark mark)两个方法,其中,collectWithTimestamp给数据流中的每个元素T赋值一个timestamp作为Event Time,emitWatermark生成Watermark。下面的代码展示了使用Scala调用这两个方法抽取时间戳并生成Watermark。

case class MyType(data: Double, eventTime: Long, hasWatermark:Boolean, watermarkTime: Long)

class MySource extends RichSourceFunction[MyType] {
  
  override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
      val next: MyType = getNext()
      ctx.collectWithTimestamp(next, next.eventTimestamp)

      if (next.hasWatermark) {
        ctx.emitWatermark(new Watermark(next.watermarkTime))
      }
    }
	}
}

在Source之后通过TimestampAssigner设置

如果我们不想修改Source,也可以在Source之后,通过时间戳指定器(TimestampAssigner)来设置。TimestampAssigner是一个在DataStream[T]上调用的算子,它会给数据流生成时间戳和Watermark,但不改变数据流的类型T。比如,我们可以在Source之后,先过滤掉不需要的内容,然后设置时间戳和Watermark。下面的代码展示了使用TimestampAssigner的大致流程。

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 使用EventTime时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyType] = env.addSource(...)

// 先过滤不需要的内容,然后设置Timestamp和Watermark。
val withTimestampsAndWatermarks: DataStream[MyType] = stream
        .filter( item => "ERROR".equals(item.info) )
				// 我们要实现一个MyTimestampsAndWatermarks,MyTimestampsAndWatermarks继承并实现了TimestampAssigner,告知Flink如何抽取时间戳并生成Watermark。
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy(...)
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

MyTimestampsAndWatermarks需要继承并实现TimestampAssignerTimestampAssigner是一个函数式接口类,它的源码如下:

public interface TimestampAssigner<T> extends Function {
  
	long extractTimestamp(T element, long previousElementTimestamp);
  
}

extractTimestamp方法为数据流中的每个元素T的Event Time赋值。

TimestampAssigner主要有两种实现方式,一种是周期性地(Periodic)生成Watermark,一种是逐个式地(Punctuated)生成Watermark。如果同时也在Source阶段设置了时间戳,那使用这种方式设置的时间戳和Watermark会将Source阶段的设置覆盖。

AssignerWithPeriodicWatermarks

AssignerWithPeriodicWatermarks是一个继承了TimestampAssigner的接口类:

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
  
	Watermark getCurrentWatermark();
  
}

它可以周期性地生成Watermark,其中,这个周期是可以设置的,默认情况下是每200毫秒生成一个Watermark,或者说Flink每200毫秒调用一次getCurrentWatermark方法。我们可以在执行环境中设置这个周期:

// 每5000毫秒生成一个Watermark
env.getConfig.setAutoWatermarkInterval(5000L)

下面的代码具体实现了AssignerWithPeriodicWatermarks,它抽取元素中的第二个字段为Event Time,每次抽取完时间戳后,更新时间戳最大值,然后以时间戳最大值慢1分钟的时间作为Watermark发送出去。

input.assignTimestampsAndWatermarks(new MyPeriodicAssigner)

// 假设数据流的元素有两个字段(String, Long),其中第二个字段是该元素的时间戳
class MyPeriodicAssigner extends AssignerWithPeriodicWatermarks[(String, Long)] {
  val bound: Long = 60 * 1000     // 1分钟
  var maxTs: Long = Long.MinValue // 已抽取的timestamp最大值

  override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
    // 更新maxTs为当前遇到的最大值
    maxTs = maxTs.max(element._2)
    // 使用第二个字段作为这个元素的Event Time
    element._2
  }

  override def getCurrentWatermark: Watermark = {
    // Watermark比Timestamp最大值慢1分钟
    val watermark = new Watermark(maxTs - bound)
    watermark
  }
}

上面的代码假设了Watermark比已流入数据中时间戳最大者慢1分钟,超过1分钟的将被视为迟到数据。考虑到这种场景比较普遍,Flink已经帮我们封装好了这样的代码,名为BoundedOutOfOrdernessTimestampExtractor,其内部实现与上面的代码几乎一致,我们只需要将最大的延迟时间作为参数传入。

val boundedOutOfOrder = input.assignTimestampsAndWatermarks(
  new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.minutes(1)) {
    override def extractTimestamp(element: (String, Long)): Long = {
      element._2
    }
})

AssignerWithPunctuatedWatermarks

public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

	Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
  
}

这种方式对数据流中的每个元素逐个进行检查,如果数据流的元素中有一些特殊标记,我们要在checkAndGetNextWatermark方法中加以判断,并生成Watermark。checkAndGetNextWatermark方法会在extractTimestamp方法之后调用。

// 数据流有三个字段 第二个字段是时间戳,第三个字段判断是否为Watermark的标记
class MyPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[(String, Long, Boolean)] {

  override def extractTimestamp(element: (String, Long, Boolean), previousElementTimestamp: Long): Long = {
    element._2
  }

  override def checkAndGetNextWatermark(element: (String, Long, Boolean), extractedTimestamp: Long): Watermark = {
    if (element._3) 
      new Watermark(extractedTimestamp) 
    else 
      null
  }
}

上面的代码中,假设数据流有三个字段,第二个字段是Event Time时间戳,第三个字段标记是否是Watermark。checkAndGetNextWatermark对每个元素进行检查,判断是否需要生成新的Watermark。

平衡延迟和准确性

至此,我们已经了解了Flink的Event Time和Watermark生成方法,那么具体如何操作呢?实际上,这个问题可能并没有一个标准答案。批处理中,数据都已经准备好了,不需要考虑未来新流入的数据,而流处理中,我们无法完全预知有多少迟到数据,数据的流入依赖业务的场景、数据的输入、网络的传输、集群的性能等等。Watermark是一种在延迟和准确性之间平衡的策略:Watermark与事件的时间戳贴合较紧,一些重要数据将被当成迟到数据,影响计算结果的准确性;Watermark设置得较松,整个应用的延迟增加,更多的数据会先缓存起来以等待计算,会增加内存的压力。对待具体的业务场景,我们可能需要反复尝试,通过一些监控手段来不断迭代和调整时间策略。