过一下Flink的各种State

6,438 阅读35分钟

前言

我们终于是迎来了我们的重点篇了,下面的代码都是官网中解释算子所运用到的代码,所以如果觉得官网看起来比较吃力,过来这边我们慢慢看也是未尝不可

一、先扯一下

1.1 Flink的 wordCount 完整代码

首先我们回到上一次的单词计数那块

/**
 * 单词计数
 */

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("localhost"8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }).keyBy("0")
                .sum(1);

        result.print();

        env.execute("WordCount");
    }
}

### 1.2 代码流程的分析

单词计数的那部分逻辑就不说明了,都一个样。从第一句开始。首先我们定义一个配置

Configuration conf = new Configuration();

然后就是程序的入口

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

这个单词计数我们要注意一下,对于代码

env.execute("test word count");

我这里的做法是直接抛出了异常,为啥呢?因为如果是生产环境中这里出现了异常,你的任务启动都出现了问题,那你捕获它有什么意义呢是吧?所以我们这里直接抛出去即可。

之后从socket中获取数据即可,对应代码为

DataStreamSource<String> dataStream = env.socketTextStream("localhost"8888).setParallelism(1);

之后就是写单词计数了,不说明了。

在上一篇中我们知道,此时如果打开netcat输入几个单词,就会发现,Flink帮我们存储了中间状态,如果是通过Spark Streaming
来实现这种功能的话,就必须借助checkPoint啊,updateStateByKey或者mapWithState这种高级算子,再或者我们自己把中间状态存储在一些存储介质中,比如Redis,Hbase···等。那这次我们就来具体说说,Flink是如何通过状态去实现这种累加的功能的

1.3 Flink的state

state:一般指一个具体的task/operator的状态。State可以被记录,在失败的情况下数据还可以恢复,Flink中有两种基本类型的State:Keyed State,Operator State,它们两种都可以以两种形式存在:原始状态(raw state)和托管状态(managed state)

比如说我们刚刚的单词计数,经过了代码“keyBy()”之后,后面的算子的状态就是key state,但是如果我把这句代码删除,那剩下的算子就是operate state,很简单吧,区分的条件就是keyBy

托管状态:由Flink框架管理的状态,我们通常使用的就是这种。

原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。可是我们使用Flink的时候,基本是不会自定义状态的。

1.3.1 operator state

里面没有shuffle操作的state,换句话说,就是没有keyBy这个操作

  1. operator state是task级别的state,说白了就是每个task对应一个state
  2. Kafka Connector source中的每个分区(task)都需要记录消费的topic的partition和offset等信息。
  3. operator state 只有一种托管状态:ValueState

1.3.2 Keyed State

  1. keyed state 记录的是每个key的状态

  2. Keyed state托管状态有六种类型:

  3. ValueState

  4. ListState

  5. MapState

  6. ReducingState

  7. AggregatingState

  8. FoldingState(这个不算太重要哈)

二、 各种state的演示

2.1 ValueState

现在我们说一下需求:当接收到的相同 key 的元素个数等于 3 个或者超过 3 个的时候,就计算这些元素的 value 的平均值。计算 keyed stream 中每 3 个元素的 value 的平均值

这个需求不难理解,在下面的代码中,main方法那模拟了一段数据

DataStreamSource<Tuple2<LongLong>> dataStreamSource = env.fromElements(
Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), 
Tuple2.of(1L, 7L),Tuple2.of(2L, 4L), 
Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

这里我key为1的数据出现了3次,key为2的数据也出现了3次,那我就把它们的平均值给计算出来,就那么简单。那我们的结果就肯定是(1,5),(2,3.6666···)了。

2.1.1 定义一个TestKeyedStateMain类

public class TestKeyedStateMain {}

2.3 代码的前置部分

很简单,就是先获取程序的入口,然后再把我刚刚提到的模拟数据给丢进来

此时我们模拟的本身就是key-value这样的数据,所以我们连flatMap这种操作都省了,直接就keyBy,按0(也就是第一个Tuple.of()中的第一个位置)进行keyBy,而且因为我们是要取平均值,而Flink自身提供的算子是有限的,所以我们要进行一些附加的操作。

此时我们选用了flatMap,不过我们要实现自己去对状态进行管理,也就有点那种自定义算子的味道

2.1.2 状态自行管理的代码

public class CountWindowAverageWithValueState
    extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>>{

}

这里我们定义了一个输入类型Tuple2和一个输出类型Tuple2,这俩货对应的是模拟数据的key-value对,和输出的(1,5),(2,3.6666···)

因为我们需要通过ValueState去存我们的状态,所以我们初始化一个ValueState

//第一个Long用来保存key出现的次数
//第二个Long代表和key对应的value的总值
ValueState<Tuple2<LongLong>> countAndSum;

而且我们需要注意的是,我们的数据源中每一个key都会有自己的一个对应的valueState

继承了RichFlatMapFunction后我们可以覆写两个方法,一个是open(),一个是flatMap(),open()方法仅会执行一次,在open里面我们会进行状态的注册,而且把这个状态交由Flink去管理。

注册状态的套路是固定的,ValueStateDescriptor

@Override
public void open(Configuration parameters) throws Exception {

   //注册状态,其实就是初始化一个描述,这个描述有两个参数
   //一个参数是一个名字,另一个也是固定套路,对应你Tuple的参数类型

   //比如你Tuple<Long, Long>对应就是Types.LONG, Types.LONG
    ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
            "average"//状态的名字
            Types.TUPLE(Types.LONG, Types.LONG));//状态存储的数据类型

    通过描述从Flink去获取状态
    countAndSum = getRuntimeContext().getState(descriptor);
}

上面是固定的操作,可以先记住,简单理解为把状态注册好,再取出来用即可

@Override
public void flatMap(Tuple2<LongLong> element,
                    Collector<Tuple2<LongDouble>> out) throws Exception {

    // 当前key出现的次数,及对应value的和
    Tuple2<LongLong> currentState = countAndSum.value();
    //如果第一次进来,currentState为空,进行初始化,简单设置为0
    if(currentState == null){
        currentState = Tuple2.of(0L,0L);
    }
    //更新状态中的元素
    currentState.f0+=1;
    //更新状态中的总值
    currentState.f1+= element.f1;
    //更新状态
    countAndSum.update(currentState);

    //判断
    if(currentState.f0 >= 3){
        double avg=(double)currentState.f1 / currentState.f0;
        //对出对应的key及其对应的平均值
        out.collect(Tuple2.of(element.f0,avg));

        // 算了一次累计3,清除状态
        countAndSum.clear();
    }
}

注意,这里的currentState.f0,f1对应的是Tuple2的两个long参数,这里再把main方法的flatMap给补上状态相关的CountWindowAverageWithValueState()

public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L3L), Tuple2.of(1L5L), Tuple2.of(1L7L),
                        Tuple2.of(2L4L), Tuple2.of(2L2L), Tuple2.of(2L5L));

        // 输出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithValueState())
                .print();

        env.execute("TestStatefulApi");
    }
}

2.1.3 完整代码一览

/**
 *  ValueState<T> :这个状态为每一个 key 保存一个值
 *  value() 获取状态值
 *  update() 更新状态值
 *  clear() 清除状态
 */

public class CountWindowAverageWithValueState
        extends RichFlatMapFunction<Tuple2<LongLong>, Tuple2<LongDouble>> {
    // 用以保存每个 key 出现的次数,以及这个 key 对应的 value 的总值
    // managed keyed state
    //1. ValueState 保存的是对应的一个 key 的一个状态值
    private ValueState<Tuple2<LongLong>> countAndSum;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        ValueStateDescriptor<Tuple2<LongLong>> descriptor =
                new ValueStateDescriptor<Tuple2<LongLong>>(
                        "average",  // 状态的名字
                        Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型
        countAndSum = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<LongLong> element,
                        Collector<Tuple2<LongDouble>> out) throws Exception {
        // 拿到当前的 key 的状态值
        Tuple2<LongLong> currentState = countAndSum.value();

        // 如果状态值还没有初始化,则初始化
        if (currentState == null) {
            currentState = Tuple2.of(0L, 0L);
        }

        // 更新状态值中的元素的个数
        currentState.f0 += 1;

        // 更新状态值中的总值
        currentState.f1 += element.f1;

        // 更新状态
        countAndSum.update(currentState);

        // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
        if (currentState.f0 >= 3) {
            double avg = (double)currentState.f1 / currentState.f0;
            // 输出 key 及其对应的平均值
            out.collect(Tuple2.of(element.f0, avg));
            //  清空状态值
            countAndSum.clear();
        }
    }
}


public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<LongLong>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 输出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithValueState())
                .print();

        env.execute("TestStatefulApi");
    }
}

2.2 ListState

照样是实现和上面一样的功能,因为套路基本完全一样所以这个不展开说明了

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Collections;

public class CountWindowAverageWithListState
        extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>>{
    //List里面保存这所有的key出现的次数
    ListState<Tuple2<LongLong>> elementByKey;

    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<Tuple2<LongLong>> descriptor = new ListStateDescriptor<>(
                "average",
                Types.TUPLE(Types.LONG, Types.LONG));
       elementByKey = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<LongLong> element,
                        Collector<Tuple2<LongDouble>> out) throws  Exception {
        Iterable<Tuple2<LongLong>> currentState = elementByKey.get();
        //初始化
        if(currentState == null){
            elementByKey.addAll(Collections.emptyList());
        }
        //更新状态
        elementByKey.add(element);
        ArrayList<Tuple2<LongLong>> allElement = Lists.newArrayList(elementByKey.get());
        if(allElement.size() >= 3){
            long count=0;
            long sum=0;
            for (Tuple2<Long,Long> ele:allElement){
                count++;
                sum += ele.f1;
            }
            double avg=(double) sum/count;
            out.collect(new Tuple2<>(element.f0,avg));
            //清除数据
            elementByKey.clear();
        }

    }
}

2.4 MapState

也是实现同样的需求,不过这个其实会存在一个问题,因为mapState不同于上面的两个state,mapState的特点是相同的key它会做一个覆盖操作,也就是同样

Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), 
Tuple2.of(1L, 7L),Tuple2.of(2L, 4L), 
Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

这份数据,在Tuple2.of(1L, 3L)来之后,Tuple2.of(1L, 5L)再过来,它就会把前面的3L替换成5L,而不是统计起来。其实这个就和Java的map一毛一样

所以一句话解释就是,mapState中key相同的数据会处于同一个state,所以我们这次要采用字符串类型的key,设计成1_1,1_2,1_3这种形式

2.4.1 前置条件

这次我们用String,还有就是open方法的老套路,注册后使用,大家记住这个套路就好了

public class CountWindowAverageWithMapState
    extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>> {
        private MapState<String,Long> mapState;
        @Override
        public void open(Configuration parameters) throws Exception {
            MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<>(
                    "average",
                    String.classLong.class);
            mapState = getRuntimeContext().getMapState(descriptor);
        }
    }

2.4.2 flatMap的编写

此时我们来一条元素就放入这个map结构即可

@Override
public void flatMap(Tuple2<LongLong> element,
                    Collector<Tuple2<LongDouble>> out) throws Exception {
    mapState.put(UUID.randomUUID().toString(),element.f1);
    ArrayList<Long> arrayList = Lists.newArrayList(mapState.values());
    if(arrayList.size() >= 3){
        long count=0;
        long sum=0;
        for (Long ele:arrayList){
            count++;
            sum += ele;
        }
        double avg = (double) sum/count;
        out.collect(new Tuple2<Long,Double>(element.f0,avg));
        mapState.clear();
    }
}

我们的套路是这样的,在key那里我们使用UUID的方式,因为UUID是不会重复的,这样的操作虽然导致了我们根本不知道这个key长的到底是啥样,可是我们根本对key不关心。所以基本就是没影响。

然后我们通过判断Arraylist的长度是否大于3,大于的时候就执行我们的算平均值的算法即可。此时我们得出运算结果,是正常的

此时有小伙伴就要说了,哎你这结果太假了,怎么UUID后的这个key还是这么正常的一个数字呢

此时我们注意,我拿的根本不是我进行了UUID的那个key,而是本身数据过来时候带过来的key,所以它是正常的

2.5 使用ReducingState来实现sum算子的功能

ReducingState具有聚合效果,所以它能模拟出sum的累加并最后得出结果的效果

2.5.1 前置代码

此时我们就不是那种固定的套路了,首先我们会有一个ReducingStateDescriptor descriptor来接收描述,而且除了名字之外,我们是实现了一个ReduceFunction接口,之后还有一个数据类型Long.class

其实相比之前就是多了一个实现接口的操作而已

public class SumFunction
    extends RichFlatMapFunction<Tuple2<LongLong>, Tuple2<LongLong>> {

        // 这个东西是拿来保存同一个key累加的值
        ReducingState<Long> reducingState;

        @Override
        public void open(Configuration parameters) throws Exception {
            ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(
                    "sum",//状态的名字
                    new ReduceFunction<Long>() {//聚合函数
                        @Override
                        public Long reduce(Long v1, Long v2) throws Exception {
                            return v1 + v2;
                        }
                    }, Long.class);
            reducingState = getRuntimeContext().getReducingState(descriptor);

        }

然后之后就很简单了

@Override
public void flatMap(Tuple2<LongLong> element,
                    Collector<Tuple2<LongLong>> out) throws Exception {
    //将数据放到状态中
    reducingState.add(element.f1);
    out.collect(new Tuple2(element.f0,reducingState.get()));

}

2.6 AggregatingState

此时我们想实现这么一个功能,比如我们的数据还是那份,我们要实现的效果是

(1,Contains:3 and 5 and 7)
(2,Contains:4 and 2 and 5)

就是把相同的key出现过的数据都一并记录下来的功能。这个算子基本就是最复杂的了😂,如果前面的觉得吃力,那可以跳过这个直接看下一个模拟需求的那个例子,那个例子会很详细的说明,这个state后面我们还会用,所以不急,到时候细说

2.6.1 前置代码

首先key我们是long类型的,但是输出变成String了,所以我们使用Tuple作为输出

此时我们的描述类和平时的有些不一样点进去源码看到

贴过去百度翻译瞧瞧

第一个是输入数据类型,第三个是输出数据类型,中间是累加的一个辅助变量,此时你要实现一个new AggregateFunction()这样的接口,你会发现一下子搞出4个需要实现的方法

createAccumulator---创建一个累加变量
    我们就是想把结果给一个一个拼起来,这个东西充当String str = ""的作用
add---拼接的作用
    return accumulator+" and "+value;也就是Contains:+value

merge---这玩意在这里没用

getResult---得出结果
    此时就是将accumulator+" and "+value的最终值给输出出来

merge在AggregatingState设计之初是为了说明不同的task中计算出来的结果是需要通过一个merge操作来进行合并结果的,可是我们现在的需求是相同key会处于同一个task里面,也就是说我们根本不可能遇到不同task计算出来的相同key不同value需要合并的情况,即使有3个task,我们key为1的都会在task1,key为2的都会在task2

所以就此时此刻而言,这玩意真的是没啥用

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

public class ContainsValueFunction
        extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,String>> {
    AggregatingState<Long, String> aggregatingState;
    @Override
    public void open(Configuration parameters) throws Exception {
        AggregatingStateDescriptor<Long, String, String> descriptor = new AggregatingStateDescriptor<>(
                "totalStr"//状态的名字
                new AggregateFunction<Long, String, String>() {
                    @Override
                    public String createAccumulator() {
                        return "Contains:";
                    }

                    @Override
                    public String add(Long value, String accumulator) {
                        if("Contains:".equals(accumulator)){
                            return accumulator+value;

                        }

                        return accumulator+" and "+value;
                    }

                    @Override
                    public String getResult(String s) {
                        return s;
                    }

                    @Override
                    public String merge(String accumulator1, String accumulator2) {
                        if("Contains:".equals(accumulator1)){
                            return accumulator2;
                        }
                        if("Contains:".equals(accumulator2)){
                            return accumulator1;
                        }
                        String[] fields = accumulator1.split(":");
                        return accumulator2+fields[1];
                    }
                }, String.class);

        aggregatingState = getRuntimeContext().getAggregatingState(descriptor);

    }

flatMap中的操作就是来了数据就存进去aggregatingState即可,然后输出数据的key和aggregatingState中取出来的各个元素即可。

    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, String>> out
) throws Exception 
{
        aggregatingState.add(element.f1);
        out.collect(new Tuple2<>(element.f0,aggregatingState.get()));
    }
}

2.7 FoldingState

2.8 模拟一个需求

两个流中订单号一样的数据合并在一起输出,不同的业务线打印出来的日志可能不太一样,所以我们其实是有挺多机会遇到这种需要把不同业务线的数据拼接起来的场景的,这就类比于一个实时的ETL的效果

那为啥我们不用join呢?别忘了我们这个可是实时的场景,数据来的有快有慢,当然一些特殊的场景和手段我们在这里先不考虑

orderinfo1数据,数据就在Kafka的一个topic里面

订单号:123,商品名:拖把,价格:30.0
订单号:234,商品名:牙膏,价格:20.0
订单号:345,商品名:被子,价格:114.4
订单号:333,商品名:杯子,价格:112.2
订单号:444,商品名:Mac电脑,价格:30000.0

orderinfo2数据,也在kafka的另一个topic

订单号:123,下单时间:2019-11-11 10:11:12,下单地点:江苏
订单号:234,下单时间:2019-11-11 11:11:13,下单地点:云南
订单号:345,下单时间:2019-11-11 12:11:14,下单地点:安徽
订单号:333,下单时间:2019-11-11 13:11:15,下单地点:北京
订单号:444,下单时间:2019-11-11 14:11:16,下单地点:深圳

输出这样的:(123,拖把,30.0,2019-11-11 10:11:12,江苏)

2.8.1 架子搭起来

反正怎么说,这两行代码始终还是跑不了吧

public class OrderETLStream {
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.execute("OrderETLStream");
}

2.8.2 两个数据流

当然这里本身是kafka来提供数据的,但是现在奈何我本地没环境,所以我先用一个自定义数据源,用面向对象的思想去模拟

orderInfo1和orderInfo2都很简单,就是把字段定义好然后提供构造方法,getter,setter,toString这种基础方法而已

orderInfo1.java

public class OrderInfo1 {
    //订单号
    private Long orderId;
    //商品
    private String productName;
    //价格
    private double price;

    public static OrderInfo1 line2Info1(String line){
        String[] fields = line.split(",");
        OrderInfo1 orderInfo1 = new OrderInfo1();
        orderInfo1.setOrderId(Long.parseLong(fields[0]));
        orderInfo1.setProductName(fields[1]);
        orderInfo1.setPrice(Double.parseDouble(fields[2]));
        return orderInfo1;
    }

    public OrderInfo1(){
    }

    @Override
    public String toString() {
        return "OrderInfo1{" +
                "orderId=" + orderId +
                ", productName='" + productName + '\'' +
                ", price=" + price +
                '}';
    }

    public OrderInfo1(Long orderId, String productName, double price) {
        this.orderId = orderId;
        this.productName = productName;
        this.price = price;
    }

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}

orderInfo2

public class OrderInfo2 {
    //订单号
    private Long orderId;
    //下单日期
    private String orderDate;
    //下单的地点
    private String address;

    public static OrderInfo2 line2Info2(String line){
        String[] fields = line.split(",");
        OrderInfo2 orderInfo2 = new OrderInfo2();
        orderInfo2.setOrderId(Long.parseLong(fields[0]));
        orderInfo2.setOrderDate(fields[1]);
        orderInfo2.setAddress(fields[2]);
        return orderInfo2;
    }

    public OrderInfo2(){

    }

    @Override
    public String toString() {
        return "OrderInfo2{" +
                "orderId=" + orderId +
                ", orderDate='" + orderDate + '\'' +
                ", address='" + address + '\'' +
                '}';
    }

    public OrderInfo2(Long orderId, String orderDate, String address) {
        this.orderId = orderId;
        this.orderDate = orderDate;
        this.address = address;
    }

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public String getOrderDate() {
        return orderDate;
    }

    public void setOrderDate(String orderDate) {
        this.orderDate = orderDate;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }
}

2.8.3 数据源的实现

这个东西也不难,实现了SourceFunction接口之后我们要实现两个方法,一个是run一个是cancel

第一个参数是filePath,为啥需要它呢?因为我们这个数据源不是有两个吗,那我们怎么区分是哪一个,这个参数的作用就是区分作用,此时我们需要通过类的构造函数来传递这个值

之后cancel方法的逻辑很简单,我们肯定要创建一个流去读我们的文件,当这个流为空的时候,就关闭即可

run的方法逻辑也不难,首先就是读文件,当行的数据不为空时,就执行.collect(line)把数据往下游发送即可,当然这样又太快了,为了效果我让它停一下,为了真实点我整了一个随机值(0~500毫秒之内),这就是我们的全部的代码逻辑了

FileSource.java

public class FileSource implements SourceFunction<String{
    private String filePath;
    BufferedReader reader;
    Random random=new Random();
    public FileSource(String filePath){
        this.filePath=filePath;
    }

    @Override
    public void run(SourceContext<String> sct) throws Exception {
      reader = new BufferedReader(
                new InputStreamReader(
                        new FileInputStream(filePath)));
      String line=null;
      while((line = reader.readLine()) != null){
          //模拟数据源源不断的感觉,所以我让线程sleep一下
          TimeUnit.MILLISECONDS.sleep(random.nextInt(500));
          sct.collect(line);
      }

    }

    @Override
    public void cancel() {
        try{
            if(reader == null ){
                reader.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

2.8.4 常量类

我们一般还会定义一个类专门放置我们的常量,当然这个例子很简单所以不整问题不大,在实际开发过程中的话,建议还是这样操作

Constants.java

public class Constants {
    public static final String ORDER_INFO1_PATH="I:/OrderInfo1.txt";
    public static final String ORDER_INFO2_PATH="I:/OrderInfo2.txt";
}

2.8.5 回到OrderETLStream

首先是两个数据流的获取

DataStreamSource<String> info1Stream = env.addSource(new FileSource(Constants.ORDER_INFO1_PATH));
DataStreamSource<String> info2Stream = env.addSource(new FileSource(Constants.ORDER_INFO2_PATH));

此时我们读取文件的内容过来了,不过面对这些字符串是不是并不好操作,那我们就想把这些字符串转成我们上面定义好的一个个对象来操作,细心的小伙伴肯定已经注意到了,我的orderInfo1和orderInfo2都存在一个静态方法,这个静态方法就是帮助我们把字符串先进行切割然后转化成对应对象的字段类型的

之后我就直接用lambda表达式了

SingleOutputStreamOperator<OrderInfo1> orderInfo1Stream = info1Stream
            .map(line -> OrderInfo1.line2Info1(line));

SingleOutputStreamOperator<OrderInfo2> orderInfo2Stream = info2Stream
            .map(line -> OrderInfo2.line2Info2(line));

之后我们使用keyBy对这两个数据源进行分组,key字段我们取订单号

KeyedStream<OrderInfo1, Long> keyByInfo1 = orderInfo1Stream
        .keyBy(orderInfo1 -> orderInfo1.getOrderId());

KeyedStream<OrderInfo2, Long> keyByInfo2 = orderInfo2Stream
        .keyBy(orderInfo2 -> orderInfo2.getOrderId());

之后使用connect拼接起来,但是我们考虑到时间先后的问题,直接操作join,可能操作出来的效果会出问题,所以我们就要借助我们刚刚学完的state了,这里我们拿最常见的valueState来完成

2.8.6

注意,这里使用了RichCoFlatMapFunction,在上一篇文章中已经提过了,针对两个不同的数据源,我们使用co的,输出的数据类型就是Tuple2,因为是它们两个的合并嘛

接下来的套路就是之前的老操作了,首先是open,先描述,然后描述后注册state,注册后就拿来用

flatMap1,和flatMap2,flatMap中要是来了数据123,第二个流如果有数据2,1,那第一个流就可以对第二个的123进行合并啦,但是如果是第一个流先到了数据123,第二个流没到,那就先用update存起来,第二个流也是同理,判断第一个流的。这就是我们的逻辑

EnrichmentFunction.java

/**
 * IN1 第一个类的数据类型
 * IN2 第二个流的数据类型
 * OUT 输出的数据类型
 */

public class EnrichmentFunction
        extends RichCoFlatMapFunction<OrderInfo1,OrderInfo2,
        Tuple2<OrderInfo1,OrderInfo2>> 
{
    //同一个订单号


    private ValueState<OrderInfo1> orderInfo1ValueState;
    private ValueState<OrderInfo2> orderInfo2ValueState;
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<OrderInfo1> descriptor1 = new ValueStateDescriptor<>(
                "info1",
                OrderInfo1.class
        );
        ValueStateDescriptor<OrderInfo2> descriptor2 = new ValueStateDescriptor<>(
                "info2",
                OrderInfo2.class
        );
        orderInfo1ValueState = getRuntimeContext().getState(descriptor1);
        orderInfo2ValueState = getRuntimeContext().getState(descriptor2);
    }
    //第一个流的 key

    //123
    //123
    @Override
    public void flatMap1(OrderInfo1 orderInfo1,
                         Collector<Tuple2<OrderInfo1, OrderInfo2>> out)
 throws Exception 
{
    //这个方法要是被运行,那说明第一个流肯定是来数据了。
        OrderInfo2 value2 = orderInfo2ValueState.value();
        if(value2 != null ){
            orderInfo2ValueState.clear();
            out.collect(Tuple2.of(orderInfo1,value2));
        }else{
            orderInfo1ValueState.update(orderInfo1);
        }
    }
    //第二个流的key
    @Override
    public void flatMap2(OrderInfo2 orderInfo2,
                         Collector<Tuple2<OrderInfo1, OrderInfo2>> out)
 throws Exception 
{
        OrderInfo1 value1 = orderInfo1ValueState.value();
        if(value1 != null){
            orderInfo1ValueState.clear();;
            out.collect(Tuple2.of(value1,orderInfo2));
        }else{
            orderInfo2ValueState.update(orderInfo2);

        }
    }
}

此时执行代码,正常跑,挺好的

finally

刚刚演示的state都是官网中提到的做法,我是觉得学习都是要从官网开始,所以把这些例子都拿出来提了一下

我们下一篇继续敲代码🤣