Flink的checkPoint机制

5,116 阅读14分钟

前言

距离上一篇Flink已经有一些时日了,当时算是把一些算子都过了一遍,所以在进来这篇之前,我们先热一下身,回忆一下代码

现在我们想实现这么一个功能,也是单词计数,不过这个单词计数要实现,自定义一个阈值然后每次到达阈值时就进行print的功能。要是已经对Flink有一定了解的小伙伴就肯定知道,我们只需要自定义一个下游就好了

public class TestOperatorState {
    public static void main(String[] args) throws Exception{
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        DataStreamSource<Tuple2<String,Integer>> dataStreamSource = env.fromElements(
                Tuple2.of("spark",3),
                Tuple2.of("kafka",3),
                Tuple2.of("flink",3),
                Tuple2.of("hive",3),
                Tuple2.of("hbase",3),
                Tuple2.of("es",3)
        );
        dataStreamSource.addSink(new MySink(2));
        env.execute("TestOperatorState");
    }
}

然后通过一个MySink去实现这个功能

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

public class MySink implements SinkFunction<Tuple2<String,Integer>> {

    private List<Tuple2<String,Integer>> bufferElements;

    // 定义一个阈值
    private int threshold;
    public MySink(int threshold){
        this.threshold = threshold;
        bufferElements = new ArrayList<Tuple2<String, Integer>>();
    }

    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        bufferElements.add(value);
        if (bufferElements.size() == threshold){
            System.out.println("数据:"+bufferElements);
            bufferElements.clear();
        }
    }
}

运行时老是报这个 Failed to load class "org.slf4j.impl.StaticLoggerBinder",如果大家看的非常不顺眼的话,那也可以添加这个pom

<dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
</dependency>

但是这会把本来输出3句的东西一下子变得满屏都是一些莫名其妙的日志,所以自己斟酌

可是这个程序明显存在着一个问题,因为数据是存储在内存里面的,程序重启的时候就会丢失,所以为了保证 state 的容错性,Flink需要对state进行checkpoint。

一、Content

1.1 先回到刚刚的程序

如果我们要对刚写完的这个程序进行checkPoint改造,我们应该在MySink给它一个

private ListState<Tuple2<String,Integer>> checkPointState;

然后实现一个CheckpointedFunction接口,这个接口有两个需要实现的方法,一个是snapshotState,一个是initializeState,它们俩英文直译过来就已经非常的通俗易懂了。一个是对state进行快照,一个是初始化state,如下图

然后如果是看过了我们 过一下Flink的各种State 的伙计们,就知道下面该怎么操作啦,先注册状态,后取出来用

ok,此时我们也是依样画葫芦,initializeState是我们重启的时候进行数据恢复的方法。需要注意的是这里所使用的数据类型需要特别去记一下,这个checkPoint其实就是自己去维护一个比较特殊的state去记录它的状态而已,和以前看到的state是一个道理。

public void initializeState(FunctionInitializationContext context) throws Exception {
    ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<Tuple2<String, Integer>>(
            "buffer", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
            @Override
            public TypeInformation<Tuple2<String, Integer>> getTypeInfo() {
                return super.getTypeInfo();
            }
        })
    );
    checkPointState = context.getOperatorStateStore().getListState(descriptor);
    // 如果任务重启
    if (context.isRestored()){
        for (Tuple2<String, Integer> lostData : checkPointState.get()) {
            bufferElements.add(lostData);
        }
    }
}

snapshotState的作用是每隔一段时间将我们内存中的数据写入bufferElements

public void snapshotState(FunctionSnapshotContext context) throws Exception {
    checkPointState.clear();
    for (Tuple2<String, Integer> data : bufferElements) {
        bufferElements.add(data);
    }
}

这个例子是官网中原有的例子,完全一模一样的代码,但是其实我们平时是不会这么去写代码的,因为这样的话,就等同于我要自己去维护一个state去记录这个状态的信息,我们按套路来应该是交由Flink去给我们进行管理。

那么Flink中状态到底是被存放在哪里呢?

1.2 state存放的位置

Flink支持的StateBackend,state就默认存放在下面3个地方:

1.2.1 MemoryStateBackend

state存放在内存里,这也是一个主从式的架构,Flink会启动一个JobManager的服务,然后从为TaskManager。状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到 JobManager 的堆内存中

而我们在程序中也可以手动指定,比如这样

所以此时我们的程序如果挂了,那内存里面的数据自然就没了,所以为了处理这个问题,我们会对这个参数进行修改

1.2.2 FsStateBackend

FsStateBackend对于上一个MemoryStateBackend来说进行了一些优化,它的TaskManager会定期地把state存到HDFS上。也就是checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)

env.setStateBackend(new FsStateBackend("hdfs path"));

缺点:状态大小受TaskManager内存限制(默认支持5M,可以配置),如果在存入HDFS之前,内存中的数据就已经超过这个值的大小,那数据也还是会丢失的。优点就是对内存操作,状态访问速度很快

1.2.3 RocksDBStateBackend

env.setStateBackend(new RocksDBStateBackend("path"));

这是生产环境去使用的配置,状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中
checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)、

缺点就是状态访问速度相比FsStateBackend有所下降。优点:可以存储超大量的状态信息,因为这个也是分布式的

1.3 checkpoint简介

为了保证state的容错性,Flink需要对state进行checkpoint。

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常

Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:
持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)
用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)

一个Flink任务中有许许多多的Task,它们都会产生许许多多的state,这些state就会被定时的存储到某个地方,在图中搞得表现为checkPointState,

恢复的时候就直接拿出来即可

1.4 设置参数

因为默认情况下是不开启checkPoint的,所以我们需要通过设置开启

env.enableCheckpointing(10000);

这里设置为10秒进行一次checkPoint。但是比较推荐的值为20~120秒之间。

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

这里是设置了checkPoint的模式为一次语义,默认也是这个

At Least One的性能肯定会好一点,可是数据就可能出现重复,所以要参考场景使用,这一块和Spark Streaming还是不一样的,Spark Streaming得益于RDD的容错机制,所以可以做到EXACTLY_ONCE

checkpointConfig.setMinPauseBetweenCheckpoints(500);

这个是两次checkPoint之间最小的时间间隔,此时小伙伴们可能会有点疑惑,我在开启checkPoint的时候不是已经给了一个10秒作为checkPoint的执行间隔了吗,这个参数有啥意义?这里解释一下,我们执行checkPoint肯定是需要一定的时间的,比如说我这次执行checkPoint就花了10秒或者10秒钟我还是没执行完,setMinPauseBetweenCheckpoints(500)就是说,我会让这俩checkPoint的操作至少会有一定的时间间隔,稍微的等一下上一个checkPoint的意思

checkpointConfig.setCheckpointTimeout(60000);

设置了checkPoint的超时时间,如果当前checkPoint一分钟都没有结束,那么就放弃并执行下一次的checkPoint

checkpointConfig.setMaxConcurrentCheckpoints(1);

保留checkPoint的个数,默认为1,就是只保留最新的一个checkPoint结果

env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint,你也可以设置,让程序停止的时候,自动删除checkPoint

1.5 使用checkPoint来恢复数据

恢复数据其实就是重启任务很简单:

Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。 如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略,默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。

常用的重启策略
    (1)固定间隔策略 (Fixed delay)
    (2)失败率策略 (Failure rate)
    (3)无重启 (No restart)

如果没有启用 checkpointing,则使用无重启 (no restart) 策略。

如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略,
尝试重启次数默认值是:Integer.MAX_VALUE,重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置。

1.5.1 固定间隔重启策略

第一种:全局配置 flink-conf.yaml
restart-strategyfixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

我们还可以使用代码来进行配置

第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3// 尝试重启的次数
  Time.of(10, TimeUnit.SECONDS) // 每次重试的间隔
));

1.5.2 失败率策略(场景少)

第一种:

全局配置 flink-conf.yaml
restart-strategyfailure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

也可以通过代码设置

env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3// 一个时间段内的最大失败次数
  Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的时间段
  Time.of(10, TimeUnit.SECONDS) // 间隔
));

这里的意思是,5分钟之内允许重启3次

1.5.3 无重启

任务挂了就挂了,随它去把

第一种:全局配置 flink-conf.yaml
restart-strategynone

代码设置

env.setRestartStrategy(RestartStrategies.noRestart());

当然我们如果是日常开发,是不太可能设置全局配置的,都是按照不同的需求去修改这一块的设置

1.6 多checkpoint的设置

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前Flink可以支持保留多个Checkpoint。

需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数

state.checkpoints.num-retained: 20

当然我们会选择在代码中配置,就是刚刚的

checkpointConfig.setMaxConcurrentCheckpoints(20)

此时任务自动重启之后,就使用最新的一个checkPoint来恢复数据

这样设置以后就查看对应的Checkpoint在HDFS上存储的文件目录
hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现

1.7 手动通过checkPoint恢复数据

我们的每一个Flink的任务都会有一个专属的JobID,然后我们checkPoint保存在HDFS中的数据也会是按照这个JobId去命名的,这个需要注意。如果我们需要手动去通过checkPoint去恢复数据,那我们需要去到HDFS目录,然后找到我们的checkPoint文件夹 --- 默认命名为chk-xx,后面接的是一个数字,表示当前的checkPoint已经是第几个了。命令如下

bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar

需要注意的是,找到了chk-xx还不行,我们还需要指定到它里面的_metadata,你也可以就理解为数据的元数据信息了

当然这个也会出现一个问题,因为Flink刚刚的

checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

并不是端到端的一次语义,我们在业务发生变化,需要停止任务修改代码的时候,再次上线数据有可能出现重复。

所以此时我们就要使用savePoint,Flink通过Savepoint功能可以做到程序升级后,继续从升级前的那个点开始执行计算,保证数据不中断。
可以保存数据源offset,operator操作状态等信息,可以从应用在过去任意做了savepoint的时刻开始继续消费

1:在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints

2:触发一个savepoint【直接触发或者在cancel的时候触发】
bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】

// 使用这个命令停止的Flink,会在退出前帮你再保存一个checkPoint
bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn模式需要指定-yid参数,也就是application_xxx】

3:从指定的savepoint启动job
bin/flink run -s savepointPath [runArgs]

用户手动执行,是指向Checkpoint的指针,不会过期,在升级的情况下使用。

注意:为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过 uid(String) 方法手动的给算子赋予 ID,


这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点(savepoint)将程序恢复回来。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动的设置 ID。

Finally

watermark