Flink 的算子操作

3,797 阅读27分钟

前言

好的,我们这一篇来介绍一下Flink的算子操作

Flink的下载很简单,下好之后解压出来,进到Flink的bin目录下,启动 start-cluster.sh 即可,此时我们可以访问 localhost:8081 去进行访问它那精美的页面


停止的话,使用 stop-cluster.sh 即可

1.1 (补充)Flink Shell 使用

针对初学者,开发的时候容易出错,如果每次都打包进行调试,比较麻烦,并且也不好定位问题,可以在scala shell命令行下进行调试

scala shell方式支持流处理和批处理。当启动shell命令行之后,两个不同的ExecutionEnvironments会被自动创建。使用senv(Stream)和benv(Batch)分别去处理流处理和批处理程序。(类似于spark-shell中sc变量)

bin/start-scala-shell.sh [local|remote|yarn] [options] <args>

如果遇到以上这个错误,我们可以看看这个Error的信息,它说我们要确认一下执行的模式,所以我们就要带上这部分的参数,这里提供了三种不同的指定方式,分别是

[local | remote <host> <port> | yarn]

那我们试一下吧,先从local开始

[root@node1 bin]# ./start-scala-shell.sh local

此时我指定了它的运行模式local,就可以成功地打开了

···吗???

···🤣,相信你也有可能出现我现在的情况,此时报错 “ Could not create the DispatcherResourceManagerComponent ”

此时如果要修正这个问题,我们可以cd /usr/local/flink-1.10.0/conf,然后把添加一个这样的参数

把端口给修改了之后,就可以成功跑起来啦

remote的方式和on yarn的方式也是差不多的,大家喜欢也可以启动一下

[root@node1 bin]# ./start-scala-shell.sh remote 192.168.200.11 8081

此时我们成功的启动起来了,感动

而且细心的小伙伴应该还发现了,它顺带给我们展示了Flink的批处理和实时处理的俩例子。

当然这个东西其实并不太重要,因为Flink-shell远远不及Spark-shell好用,所以我们试着打开一下也就过了。

还记得我们当时说过,了解一个实时的程序,主要我们需要去了解3个方面,数据源,数据的处理及数据的输出,那我们先来看看Flink的数据源吧

1.2.1 实时的source简介

source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。

flink提供了大量的已经实现好的source方法,你也可以自定义source(后面会有对应的小demo,直接复制上去自己的IDEA跑起来即可):

  1. 通过实现sourceFunction接口来自定义无并行度的source

  2. 通过实现 ParallelSourceFunction 接口 or 继承 RichParallelSourceFunction 来自定义有并行度的source

不过大多数情况下,我们使用自带的source即可。

1.2.2 获取source的方式

1.基于文件

readTextFile(path)
读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。

2.基于socket

socketTextStream
从socket中读取数据,元素可以通过一个分隔符切开。

3.基于集合

fromCollection(Collection)
通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。

4.自定义输入

addSource 可以实现读取第三方数据源的数据
系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】

官网中还提到了其它的数据源,可是重点毕竟是Kafka,所以其它的了解下即可

1.2.3 数据源之Collection(代码可直接复制跑起来)

public class StreamingSourceFromCollection {
    public static void main(String[] args) throws Exception {
        //步骤一:获取环境变量
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //步骤二:模拟数据
        ArrayList<String> data = new ArrayList<String>();
        data.add("hadoop");
        data.add("spark");
        data.add("flink");
        //步骤三:获取数据源
        DataStreamSource<String> dataStream = env.fromCollection(data);
        //步骤四:transformation操作
        SingleOutputStreamOperator<String> addPreStream = dataStream.map(new MapFunction<StringString>() {
            @Override
            // 简单地遍历一下数据
            public String map(String word) throws Exception {
                return "testCollection_" + word;
            }
        });
        //步骤五:对结果进行处理(打印)
        addPreStream.print().setParallelism(1);
        //步骤六:启动程序
        env.execute("StreamingSourceFromCollection");

    }
}

输出结果

1.2.4 自定义单并行度数据源(代码可直接复制跑起来)

模拟一个每隔一秒产生一条数据的数据源

/**
 * 功能:每秒产生一条数据
 */

public class MyNoParalleSource implements SourceFunction<Long{
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Long> sct) throws Exception {
        while (isRunning){
         sct.collect(number);
         number++;
         //每秒生成一条数据
         Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}

此时我们对这个数据源进行处理,处理也很简单,就是进行了一个map操作和一个filter操作,filter就是把偶数给选出来而已

/**
 * 功能:从自定义的数据数据源里面获取数据,然后过滤出偶数
 */

public class StreamingDemoWithMyNoPralalleSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收数据源
        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

        filterDataStream.print().setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

运行结果就是

1.2.5 自定义多并行度数据源

/**
 * 每秒产生一条数据
 */

public class MyParalleSource implements ParallelSourceFunction<Long{
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Long> sct) throws Exception {
        while (isRunning){
            sct.collect(number);
            number++;
            //每秒生成一条数据
            Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}

这里我们可以看到,只是实现了一个不同的接口而已,然后在业务代码中设置并行度即可

public class StreamingDemoWithMyPralalleSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 这个代码.setParallelism(2)设置了并行2
        DataStreamSource<Long> numberStream = env.addSource(new MyParalleSource()).setParallelism(2);
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

        filterDataStream.print().setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

1.3 Flink 的常用Transformation算子

1.3.1 Map和Filter(刚刚演示过了)

1.3.2 flatMap,keyBy,sum,union(和Spark是基本一样的)

1.3.3 connect,MapFunction和coMapFunction

connect操作在spark那里是没有的,所以稍微看一下,它与union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法,CoMapFunction和MapFunction的区别就是从对一个流的数据处理变成了两个流(注意也只能是两个)的处理而已。

public class ConnectionDemo {
    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据源
        //注意:针对此source,并行度只能设置为1
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);

        SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                // 这里是第二个数据源,字符串我加了一个前缀str_
                return "str_" + value;
            }
        });
        ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
        SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, StringObject>() {
            @Override
            public Object map1(Long value) throws Exception {
                // 在这里可以进行业务处理
                return value;
            }
            @Override
            public Object map2(String value) throws Exception {
                // 在这里也可以进行业务处理
                return value;
            }
        });

        //打印结果
        result.print().setParallelism(1);
        String jobName = ConnectionDemo.class.getSimpleName();
        env.execute(jobName);
    }
}

输出结果中两个流过来的数据也有可能不是你一条我一条的,而是有可能一个流快的时候就先来了多条

1.3.4 Split和Select

这个的作用是把一个数据流切割成为多个数据流

可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,
把一个数据流切分成多个数据流,这样每个数据流就可以使用不用的处理逻辑了,而select就是帮助我们把不同的流给抽取出来的一个作用

public class SplitDemo {
    public static void main(String[] args) throws  Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
        //对流进行切分,按照数据的奇偶性进行区分
        SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
            @Override
            public Iterable<String> select(Long value
{
                ArrayList<String> outPut = new ArrayList<>();
                if (value % 2 == 0) {
                    outPut.add("even");//偶数
                } else {
                    outPut.add("odd");//奇数
                }
                return outPut;
            }
        });

        //选择一个或者多个切分后的流
        DataStream<Long> evenStream = splitStream.select("even");
        DataStream<Long> oddStream = splitStream.select("odd");
        DataStream<Long> moreStream = splitStream.select("odd","even");

        //打印结果,此时我选择的全是偶数的数据
        evenStream.print().setParallelism(1);
        String jobName = SplitDemo.class.getSimpleName();
        env.execute(jobName);
    }
}

运行结果

1.4 Flink的常用sink算子

数据的输出方面其实就是比较简单了,我觉得这个东西可能不需要结合代码展开来讲,大致过一遍即可

1.4.1 print() 和 printToErr()

打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

1.4.2 writeAsText()

/**
 * 数据源:1 2 3 4 5.....源源不断过来
 * 通过map打印一下接受到数据
 * 通过filter过滤一下数据,我们只需要偶数
 */

public class WriteTextDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

    // 没有集群的小伙伴也可以指定一个本地的路径,并写入一个文件中
   filterDataStream.writeAsText("your path").setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

1.4.3 自定义sink

除了我们上文中提到的以下几种

当然我这边的现状还有把数据写入到redis里面,此时我们需要先引入一个依赖

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

如果想要去了解一下Redis的小伙伴完全可以自己过去菜鸟教程这种网站去看看,下面的代码已经打好注释了

/**
 * 把数据写入redis
 */

public class SinkForRedisDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("xxx", xxx, "\n");
        //lpush l_words word
        //对数据进行组装,把string转化为tuple2<String,String>
        DataStream<Tuple2<StringString>> l_wordsData = text.map(new MapFunction<String, Tuple2<StringString>>() {
            @Override
            public Tuple2<StringString> map(String value) throws Exception {
                return new Tuple2<>("l_words", value);
            }
        });
        //创建redis的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(xxx).setPort(xxx).build();

        //创建redissink
        RedisSink<Tuple2<StringString>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
        l_wordsData.addSink(redisSink);
        env.execute("StreamingDemoToRedis");

    }

    public static class MyRedisMapper implements RedisMapper<Tuple2<StringString>> {
        //表示从接收的数据中获取需要操作的redis key
        @Override
        public String getKeyFromData(Tuple2<StringString> data) {
            return data.f0;
        }
        //表示从接收的数据中获取需要操作的redis value
        @Override
        public String getValueFromData(Tuple2<StringString> data) {
            return data.f1;
        }

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }
    }
}

1.5 批处理方面的算子

Flink的批处理方面做的其实一般般,在企业的开发中使用也较少,不过负责它的团队还是很勤奋的,所以估计在未来不久会变得更加优秀,其实就是把我们之前的 Spark-core 的功能给回顾了而已

1.5.1 source

基于文件

readTextFile(path)

基于集合

fromCollection(Collection)

1.5.2 transform

算子概览:

  • Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

  • FlatMap:输入一个元素,可以返回零个,一个或者多个元素

  • MapPartition>:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】

  • Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下

  • Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

  • Aggregate:sum、max、min等

  • Distinct:返回一个数据集中去重之后的元素,data.distinct()

  • Join:内连接

  • OuterJoin:外链接

  • Cross:获取两个数据集的笛卡尔积

  • Union:返回两个数据集的总和,数据类型需要一致

  • First-n:获取集合中的前N个元素

  • Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序

1.5.3 sink

  • writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
  • writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法
  • print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

需求:flink从数据源中获取到用户的姓名,最终需要把用户的姓名和年龄信息打印出来

分析:所以就需要在中间的map处理的时候获取用户的年龄信息,把用户的关系数据集使用广播变量进行处理

我们在下方使用了RichMapFunction,这个东西的作用就是在mapFunction的基础上多了一个初始化的过程,在这个初始化的过程中我就可以获取到广播变量,并在map中获取到年龄值,然后再把res给输出出来。

public class BroadCastDemo {
    public static void main(String[] args) throws Exception{

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //1:准备需要广播的数据
        ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
        broadData.add(new Tuple2<>("zhangsan",18));
        broadData.add(new Tuple2<>("lisi",19));
        broadData.add(new Tuple2<>("wangwu",20));
        DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);

        //处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
        DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                HashMap<String, Integer> res = new HashMap<>();
                res.put(value.f0, value.f1);
                return res;
            }
        });
        //源数据
        DataSource<String> data = env.fromElements("zhangsan""lisi""wangwu");
        //注意:在这里需要使用到RichMapFunction获取广播变量
        DataSet<String> result = data.map(new RichMapFunction<StringString>() {
            List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
            HashMap<String, Integer> allMap = new HashMap<String, Integer>();

            /**
             * 这个方法只会执行一次
             * 可以在这里实现一些初始化的功能
             * 所以,就可以在open方法中获取广播变量数据
             */

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //获取广播数据
                this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
                for (HashMap map : broadCastMap) {
                    allMap.putAll(map);
                }
            }
            @Override
            public String map(String value) throws Exception {
                Integer age = allMap.get(value);
                return value + "," + age;
            }
        }).withBroadcastSet(toBroadcast, "broadCastMapName");//执行广播数据的操作
        result.print();
    }
}

运行的时间会比较长,打印出来的结果就是张三,李四,王五和他们的年龄

1.5.5 Flink之Counter

Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

Counter是一个具体的累加器(Accumulator)实现
IntCounter, LongCounter 和 DoubleCounter

用法

1:创建累加器
private IntCounter numLines = new IntCounter(); 
2:注册累加器
getRuntimeContext().addAccumulator("num-lines"this.numLines);
3:使用累加器
this.numLines.add(1); 
4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")

实例代码

public class CounterDemo {
    public static void main(String[] args) throws Exception{
        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> data = env.fromElements("a""b""c""d");
        DataSet<String> result = data.map(new RichMapFunction<String, String>() {

            //1:创建累加器
            private IntCounter numLines = new IntCounter();
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:注册累加器
                getRuntimeContext().addAccumulator("num-lines",this.numLines);

            }
            //int sum = 0;
            @Override
            public String map(String value) throws Exception {
                //如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
                //sum++;
                //System.out.println("sum:"+sum);
                this.numLines.add(1);
                return value;
            }
        }).setParallelism(8);
        //如果要获取counter的值,只能是任务
        //result.print();
        result.writeAsText("d:\\data\\mycounter");
        JobExecutionResult jobResult = env.execute("counter");
        //3:获取累加器
        int num = jobResult.getAccumulatorResult("num-lines");
        System.out.println("num:"+num);

    }
}

到这里批处理的算子也提的差不多了,有兴趣的朋友就可以把代码复制上去跑跑看看,没兴趣的话,那也没事,反正现在基本都是用实时的算子,不太影响

1.5.6 算子的状态

回到单词计数这个例子

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("localhost"8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }).keyBy("0")
                .sum(1);
        result.print();
        env.execute("WordCount");
    }
}

此时需要注意,我们必须监听8888端口,才能开始启动程序,不然会报connect refuse错误。因为我是windows下进行,所以我整了一个netcat来协助,此时我先启动netcat,然后nc -lk 8888对8888端口进行监听

然后我再进行一些单词的输入,此时我们看我们的打印信息

4> (hadoop,1)
4> (hadoop,2)
4> (flink,1)
4> (flink,2)
1> (hive,1)
1> (hive,2)
1> (hive,3)

这时候我们会发现,Flink就是真正意义上的实时处理,来一条处理一条,而且你会发现,在Spark中需要使用updateStateByKey或者mapWithState高级算子才能实现的累加,在Flink中可以很方便地去完成

这是为什么呢?正是因为官网所说的:Flink是一个有状态的数据流

所以,state(状态)才是我们学习Flink的一大重点。之后我们会陆续说明

finally

···