Cris 带你快速入门 Flink

16,638 阅读35分钟

一 概述

1.1 流处理技术的演变

在开源世界里,Apache Storm项目是流处理的先锋。Storm最早由Nathan Marz和创业公司BackType的一个团队开发,后来才被Apache基金会接纳。Storm提供了低延迟的流处理,但是它为实时性付出了一些代价:很难实现高吞吐,并且其正确性没能达到通常所需的水平,换句话说,它并不能保证exactly-once,即便是它能够保证的正确性级别,其开销也相当大。

在低延迟和高吞吐的流处理系统中维持良好的容错性是非常困难的,但是为了得到有保障的准确状态,人们想到了一种替代方法:将连续时间中的流数据分割成一系列微小的批量作业。如果分割得足够小(即所谓的微批处理作业),计算就几乎可以实现真正的流处理。因为存在延迟,所以不可能做到完全实时,但是每个简单的应用程序都可以实现仅有几秒甚至几亚秒的延迟。这就是在Spark批处理引擎上运行的Spark Streaming所使用的方法。

更重要的是,使用微批处理方法,可以实现exactly-once语义,从而保障状态的一致性。如果一个微批处理失败了,它可以重新运行,这比连续的流处理方法更容易。Storm Trident是对Storm的延伸,它的底层流处理引擎就是基于微批处理方法来进行计算的,从而实现了exactly-once语义,但是在延迟性方面付出了很大的代价。

对于Storm Trident以及Spark Streaming等微批处理策略,只能根据批量作业时间的倍数进行分割,无法根据实际情况分割事件数据,并且,对于一些对延迟比较敏感的作业,往往需要开发者在写业务代码时花费大量精力来提升性能。这些灵活性和表现力方面的缺陷,使得这些微批处理策略开发速度变慢,运维成本变高。

于是,Flink出现了,这一技术框架可以避免上述弊端,并且拥有所需的诸多功能,还能按照连续事件高效地处理数据,Flink的部分特性如下图所示:

1.2 初识Flink

Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014年4月Stratosphere的代码被复制并捐赠给了Apache软件基金会,参加这个孵化项目的初始成员是Stratosphere系统的核心开发人员,2014年12月,Flink一跃成为Apache软件基金会的顶级项目。

在德语中,Flink一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为logo,这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,而Flink的松鼠logo拥有可爱的尾巴,尾巴的颜色与Apache软件基金会的logo颜色相呼应,也就是说,这是一只Apache风格的松鼠。

官网链接

Flink主页在其顶部展示了该项目的理念:“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

1.3 Flink核心计算框架

Flink的核心计算架构是下图中的Flink Runtime执行引擎,它是一个分布式系统,能够接受数据流程序并在一台或多台机器上以容错方式执行。

Flink Runtime执行引擎可以作为YARN(Yet Another Resource Negotiator)的应用程序在集群上运行,也可以在Mesos集群上运行,还可以在单机上运行(这对于调试Flink应用程序来说非常有用)。

上图为Flink技术栈的核心组成部分,值得一提的是,Flink分别提供了面向流式处理的接口(DataStream API)和面向批处理的接口(DataSet API)。因此,Flink既可以完成流处理,也可以完成批处理。Flink支持的拓展库涉及机器学习(FlinkML)、复杂事件处理(CEP)、以及图计算(Gelly),还有分别针对流处理和批处理的Table API。

Flink 是一个真正的批流结合的大数据计算框架,将大数据背景下的计算统一整合在一起,不仅降低了学习和操作难度,也有效实现了离线计算和实时计算的大一统

能被Flink Runtime执行引擎接受的程序很强大,但是这样的程序有着冗长的代码,编写起来也很费力,基于这个原因,Flink提供了封装在Runtime执行引擎之上的API,以帮助用户方便地生成流式计算程序。Flink 提供了用于流处理的DataStream API和用于批处理的DataSet API。值得注意的是,尽管Flink Runtime执行引擎是基于流处理的,但是DataSet API先于DataStream API被开发出来,这是因为工业界对无限流处理的需求在Flink诞生之初并不大。

DataStream API可以流畅地分析无限数据流,并且可以用Java或者Scala来实现。开发人员需要基于一个叫DataStream的数据结构来开发,这个数据结构用于表示永不停止的分布式数据流。

Flink的分布式特点体现在它能够在成百上千台机器上运行,它将大型的计算任务分成许多小的部分,每个机器执行一部分。Flink能够自动地确保发生机器故障或者其他错误时计算能够持续进行,或者在修复bug或进行版本升级后有计划地再执行一次。这种能力使得开发人员不需要担心运行失败。Flink本质上使用容错性数据流,这使得开发人员可以分析持续生成且永远不结束的数据(即流处理)。

二 Flink基本架构

2.1 JobManager与TaskManager

Flink运行时包含了两种类型的处理器:

**JobManager处理器:**也称之为Master,用于协调分布式执行,它们用来调度task,协调检查点,协调失败时恢复等。Flink运行时至少存在一个master处理器,如果配置高可用模式则会存在多个master处理器,它们其中有一个是leader,而其他的都是standby。

TaskManager处理器:也称之为Worker,用于执行一个dataflow的task(或者特殊的subtask)、数据缓冲和data stream的交换,Flink运行时至少会存在一个worker处理器。

简单图示如下

Master和Worker处理器可以直接在物理机上启动,或者通过像YARN这样的资源调度框架启动。

Worker连接到Master,告知自身的可用性进而获得任务分配。

2.2 无界数据流与有界数据流

Flink用于处理有界和无界数据:

无界数据流无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性,无界流的处理称为流处理

有界数据流有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理

在无界数据流和有界数据流中我们提到了批处理和流处理,这是大数据处理系统中常见的两种数据处理方式。

批处理的特点是有界、持久、大量,批处理非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计流处理的特点是无界、实时,流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计

在Spark生态体系中,对于批处理和流处理采用了不同的技术框架,批处理由SparkSQL实现,流处理由Spark Streaming实现,这也是大部分框架采用的策略,使用独立的处理器分别实现批处理和流处理,而Flink可以同时实现批处理和流处理。

Flink是如何同时实现批处理与流处理的呢?答案是,Flink将批处理(即处理有限的静态数据)视作一种特殊的流处理

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们要实现的目标是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有MapReduce、Tez、Crunch、Spark,实现流处理的开源方案有Samza、Storm。

Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个Flink运行时(Flink Runtime),分别提供了流处理和批处理API,而这两种API也是实现上层面向流处理、批处理类型应用框架的基础。

2.3 数据流编程模型

Flink提供了不同级别的抽象,以开发流或批处理作业,如下图所示:

最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API中。底层过程函数(Process Function) 与 DataStream API 相集成,使其可以对某些特定的操作进行底层的抽象,它允许用户可以自由地处理来自一个或多个数据流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。

实际上,大多数应用并不需要上述的底层抽象,而是针对核心API(Core APIs) 进行编程,比如DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些API处理的数据类型以类(classes)的形式由各自的编程语言所表示。

Table API 以表为中心,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。Table API程序声明式地定义了什么逻辑操作应该执行,而不是准确地确定这些操作代码的看上去如何 。 尽管Table API可以通过多种类型的用户自定义函数(UDF)进行扩展,其仍不如核心API更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API程序在执行之前会经过内置优化器进行优化。

你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用

Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。

三 Flink集群搭建

Flink可以选择的部署方式有:

Local、Standalone(资源利用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。

我们主要对Standalone模式和Yarn模式下的Flink集群部署进行分析。

3.1 Standalone模式安装

我们对standalone模式的Flink集群进行安装,准备三台虚拟机,其中一台作为JobManager(hadoop101),另外两台作为TaskManager(hadoop102、hadoop103)。

  1. 首先官网下载

  2. 然后将下载的压缩包发送到虚拟机上,解压到指定位置

  3. 然后修改配置文件

    [cris@hadoop101 conf]$ vim flink-conf.yaml
    

    然后修改Worker 节点配置

    [cris@hadoop101 conf]$ vim slaves
    

  4. 最后将 Flink 同步到其他两台 Worker 节点即可

    [cris@hadoop101 module]$ xsync flink-1.6.1/
    
  5. 启动命令如下

    [cris@hadoop101 bin]$ ./start-cluster.sh
    

    非常简单~

    通过 jps 查看进程情况

    [cris@hadoop101 bin]$ jpsall 
    ----------jps of hadoop101---------
    2491 StandaloneSessionClusterEntrypoint
    2555 Jps
    ----------jps of hadoop102---------
    2338 Jps
    2285 TaskManagerRunner
    ----------jps of hadoop103---------
    2212 Jps
    2159 TaskManagerRunner
    
  6. 访问集群web界面(8081端口)

    出现如下界面表示 Flink 集群启动成功

  7. 简单跑个 WC 任务

  8. 关闭集群

    [cris@hadoop101 bin]$ ./stop-cluster.sh 
    Stopping taskexecutor daemon (pid: 2285) on host hadoop102.
    Stopping taskexecutor daemon (pid: 2159) on host hadoop103.
    Stopping standalonesession daemon (pid: 2491) on host hadoop101.
    [cris@hadoop101 bin]$ jpsall 
    ----------jps of hadoop101---------
    3249 Jps
    ----------jps of hadoop102---------
    2842 Jps
    ----------jps of hadoop103---------
    2706 Jps
    

3.2 Yarn模式安装

前四步同 Standalone 模式

  1. 明确虚拟机中已经设置好了环境变量HADOOP_HOME

  2. 启动Hadoop集群(HDFS和Yarn)

  3. 在hadoop101节点提交Yarn-Session,使用安装目录下bin目录中的yarn-session.sh脚本进行提交:

    [cris@hadoop101 ~]$ /opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 6 -jm 1024 -tm 1024 -nm test -d
    

    其中:

    -n(--container):TaskManager的数量。

    -s(--slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1。

    -jm:JobManager的内存(单位MB)。

    -tm:每个taskmanager的内存(单位MB)。

    -nm:yarn 的appName(现在yarn的ui上的名字)。

    -d:后台执行。

  4. 启动后查看Yarn的Web页面,可以看到刚才提交的会话:

    查看进程信息

  5. 简单的跑个任务

    [cris@hadoop101 flink-1.6.1]$ ./bin/flink run -m yarn-cluster examples/batch/WordCount.jar
    

    终端直接打印结果

    在看看web 界面

四 Flink运行架构

4.1 任务提交流程

Flink任务提交后,Client向HDFS上传Flink的Jar包和配置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster

ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager,之后ApplicationMaster向ResourceManager申请资源启动TaskManager,ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager

NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务

4.2 TaskManager与Slots

每一个TaskManager是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask。为了控制一个worker能接收多少个task,worker通过task slot来进行控制(一个worker至少有一个task slot)。·

每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个subtask将不需要跟来自其他job的subtask竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。

通过调整task slot的数量,允许用户定义subtask之间如何互相隔离。如果一个TaskManager一个slot,那将意味着每个task group运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的),而一个TaskManager多个slot意味着更多的subtask可以共享同一个JVM。而在同一个JVM进程中的task将共享TCP连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task的负载。

TaskSlot是静态的概念,是指TaskManager具有的并发执行能力**,可以通过参数taskmanager.numberOfTaskSlots进行配置,而并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。

也就是说,假设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。

4.3 Dataflow

Flink程序由Source、Transformation、Sink这三个核心组件组成,Source主要负责数据的读取,Transformation主要负责对属于的转换操作,Sink负责最终数据的输出,在各个组件之间流转的数据称为流(streams)。

Flink程序的基础构建模块是 (streams) 与 转换(transformations)(需要注意的是,Flink的DataSet API所使用的DataSets其内部也是stream)。一个stream可以看成一个中间结果,而一个transformations是以一个或多个stream作为输入的某种operation,该operation利用这些stream进行计算从而产生一个或多个result stream。

在运行时,Flink上运行的程序会被映射成streaming dataflows,它包含了streams和transformations operators。每一个dataflow以一个或多个sources开始以一个或多个sinks结束,dataflow类似于任意的有向无环图(DAG)。

4.4 并行数据流

Flink程序的执行具有并行、分布式的特性。在执行过程中,一个 stream 包含一个或多个 stream partition ,而每一个 operator 包含一个或多个 operator subtask,这些operator subtasks在不同的线程、不同的物理机或不同的容器中彼此互不依赖得执行。

一个特定operator的subtask的个数被称之为其parallelism(并行度)。一个stream的并行度总是等同于其producing operator的并行度。一个程序中,不同的operator可能具有不同的并行度。

Stream在operator之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于operator的种类。

One-to-onestream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着map operator的subtask看到的元素的个数以及顺序跟source operator的subtask生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。

Redistributing这种操作会改变数据的分区个数。每一个operator subtask依据所选择的transformation发送数据到不同的目标subtask。例如,keyBy() 基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。

4.5 task与operatorchains

出于分布式执行的目的,Flink将operator的subtask链接在一起形成task,每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程API中进行指定。

下面这幅图,展示了5个subtask以5个并行的线程来执行:

4.6 任务调度流程

客户端不是运行时和程序执行的一部分,但它用于准备并发送dataflow给Master,然后,客户端断开连接或者维持连接以等待接收计算结果,客户端可以以两种方式运行:要么作为Java/Scala程序的一部分被程序触发执行,要么以命令行./bin/flink run的方式执行。

五 Flink DataStream API

5.1 Flink运行模型

以上为Flink的运行模型,Flink的程序主要由三部分构成,分别为Source、Transformation、Sink。DataSource主要负责数据的读取,Transformation主要负责对属于的转换操作,Sink负责最终数据的输出。

5.2 Flink程序架构

每个Flink程序都包含以下的若干流程:

  • 获得一个执行环境;(Execution Environment)

  • 加载/创建初始数据;(Source)

  • 指定转换这些数据;(Transformation)

  • 指定放置计算结果的位置;(Sink)

  • 触发程序执行

5.3 Environment

执行环境StreamExecutionEnvironment是所有Flink程序的基础

创建执行环境有三种方式,分别为:

StreamExecutionEnvironment.getExecutionEnvironment
StreamExecutionEnvironment.createLocalEnvironment
StreamExecutionEnvironment.createRemoteEnvironment

StreamExecutionEnvironment.getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

val env = StreamExecutionEnvironment.getExecutionEnvironment

5.4 Source

I 基于File的数据源

  1. readTextFile(path)

    一列一列的读取遵循TextInputFormat规范的文本文件,并将结果作为String返回。

    object Test {
      def main(args: Array[String]): Unit = {
        // 1. 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        // 2. 读取指定路径的文本文件
        val stream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    
        // 3. action 算子对 DataStream 中的数据打印
        stream.print()
    
        // 4. 启动 Flink 应用
        executionEnvironment.execute("test")
      }
    }
    

    Terminal 打印结果

    1> apache spark hadoop flume
    1> kafka hbase hive flink
    4> apache spark hadoop flink
    5> kafka hbase hive flink
    6> sqoop hue oozie zookeeper
    8> apache spark hadoop flume
    3> kafka hbase oozie zookeeper
    2> sqoop hue oozie zookeeper
    7> flink oozie azakaban spark
    

    注意stream.print():每一行前面的数字代表这一行是哪一个并行线程输出的。

    还可以根据指定的 fileInputFormat 来读取文件

    readFile(fileInputFormat, path)

  2. 基于Socket的数据源

    从Socket中读取信息

    object Test {
      def main(args: Array[String]): Unit = {
        // 1. 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        
        val stream: DataStream[String] = executionEnvironment.socketTextStream("localhost", 1234)
    
        // 3. action 算子对 DataStream 中的数据打印
        stream.print()
    
        // 4. 启动 Flink 应用
        executionEnvironment.execute("test")
      }
    }
    

  3. 基于集合(Collection)的数据源

    1. fromCollection(seq):从集合中创建一个数据流,集合中所有元素的类型是一致的

      val stream: DataStream[Int] = executionEnvironment.fromCollection(List(1,2,3,4))
      
    2. fromCollection(Iterator):从迭代(Iterator)中创建一个数据流,指定元素数据类型的类由iterator返回

      val stream: DataStream[Int] = executionEnvironment.fromCollection(Iterator(3,1,2))
      
    3. fromElements(elements:_*):从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型

      val list = List(1,2,3)
      val stream: DataStream[List[Int]] = executionEnvironment.fromElements(list)
      
    4. generateSequence(from, to):从给定的间隔中并行地产生一个数字序列

      val stream: DataStream[Long] = executionEnvironment.generateSequence(1,10)
      

5.5 Sink

Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。

Flink有许多封装在DataStream操作里的内置输出格式。

1. writeAsText

将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的toString()方法来获取。

2. WriteAsCsv

将元素以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法。

3. print/printToErr

打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于1,那么输出也会有一个标识由哪个任务产生的标志。

4. writeUsingOutputFormat

自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。

5. writeToSocket

将元素写入到socket中.

5.6 Transformation

1. map

DataStream → DataStream:输入一个参数产生一个参数。

    // 初始化 Flink 执行环境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    // 针对每一行数据前面添加指定字符串
    val mapDataStream: DataStream[String] = dataStream.map("Apache:" + _)
    mapDataStream.print()

    // 启动 Flink 应用
    executionEnvironment.execute("test")

2. flatMap

DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    // 将每行数据按照空格分割成集合,最终 "压平"
    val mapDataStream: DataStream[String] = dataStream.flatMap(_.split(" "))
    mapDataStream.print()

3. filter

DataStream → DataStream:结算每个元素的布尔值,并返回布尔值为true的元素。

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val mapDataStream: DataStream[String] = dataStream.filter(_.contains("kafka"))

4. Connect

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

  // 初始化 Flink 执行环境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val listDataStream: DataStream[Int] = executionEnvironment.fromCollection(List(1, 2, 3))
    val connStreams: ConnectedStreams[String, Int] = dataStream.connect(listDataStream)
    // map函数中的第一个函数作用于 ConnectedStreams 的第一个 DataStream;第二个函数作用于第二个 DataStream
    connStreams.map(e => println(e + "-----"), println(_))

    // 启动 Flink 应用
    executionEnvironment.execute("test")

测试效果如下:

针对 ConnectedStreams 的map 和 flatMap 操作称之为 CoMap,CoFlatMap

作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

5. split

DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val flatMapDStream: DataStream[String] = dataStream.flatMap(_.split(" "))
    val splitDStream: SplitStream[String] = flatMapDStream.split(e => "hadoop".equals(e) match {
      case true => List("hadoop")
      case false => List("other")
    })

    splitDStream.select("hadoop").print()

通常配合 select 算子使用

6. Union

DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。

    val listDStream: DataStream[Int] = executionEnvironment.fromCollection(List(1,2))
    val unionDStream: DataStream[Int] = listDStream.union(listDStream)
    unionDStream.print()

7. KeyBy

DataStream → KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val kvDStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).map((_, 1))
    val result: KeyedStream[(String, Int), String] = kvDStream.keyBy(_._1)
    result.print()

通常结合 reduce 等聚合算子使用

8. Reduce,Fold,Aggregations

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val kvDStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).map((_, 1))
    val result: KeyedStream[(String, Int), String] = kvDStream.keyBy(_._1)
    val reduceDStream: DataStream[(String, Int)] = result.reduce((iter1, iter2) => (iter1._1, iter1._2 + iter2._2))
    reduceDStream.print()

可以发现,Flink 并不是像 Spark 那样将最后的总的统计结果返回,而是每次聚合统计都将结果返回,所以需要借助 Flink 的Window 来进行数据的聚合统计(fold 和 aggregation同理)

其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的结果.

fold

KeyedStream → DataStream:一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。

Aggregations

KeyedStream → DataStream:分组数据流上的滚动聚合操作。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

六 Time 和 Window(重点)

6.1 Time

在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:

Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。

Ingestion Time:是数据进入Flink的时间。

Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

例如,一条日志进入Flink的时间为2017-11-12 10:00:00.123,到达Window的系统时间为2017-11-12 10:00:01.234,日志的内容如下:

2017-11-02 18:37:15.624 INFO Fail over to rm2

对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。

通常我们需要指定日志中的哪条数据是 eventTime

6.2 Window

Window可以分成两类:

  • CountWindow:按照指定的数据条数生成一个Window,与时间无关。

  • TimeWindow:按照时间生成Window。

对于TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

对于CountWindow 可以分为滚动窗口和滑动窗口

1. 滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片

特点时间对齐,窗口长度固定,没有重叠

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:

适用场景:适合做BI统计等(做每个时间段的聚合计算)。

2. 滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成

特点时间对齐,窗口长度固定,有重叠

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的部分数据,如下图所示:

适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

3. 会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口

特点时间无对齐

session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

4. Window API

CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。 注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。

  • 滚动窗口

    默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

      def main(args: Array[String]): Unit = {
        // 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 只有等相同key 的元素个数达到3的时候才会进行 reduce 和 print 操作
        val windowDStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyDStream.countWindow(3)
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 启动 Flink 应用
        executionEnvironment.execute("test")
      }
    

    测试效果如下:

  • 滑动窗口

    滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。

    下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围是该 key 的前4个元素。

      def main(args: Array[String]): Unit = {
        // 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 只有等相同key 的元素个数达到2的时候才会对该 key 的前4条数据进行 reduce 和 print 操作
        val windowDStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyDStream.countWindow(4,2)
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 启动 Flink 应用
        executionEnvironment.execute("test")
      }
    }
    

TimeWindow

TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算。

  • 滚动窗口

    Flink默认的时间窗口根据Processing Time 进行窗口的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。

        // 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 每3 秒对进入该窗口的所有相同key 的数据进行reduce 和 print 操作
        val windowDStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyDStream.timeWindow(Time.seconds(3))
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 启动 Flink 应用
        executionEnvironment.execute("test")
    

  • 滑动窗口

    滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。

    下面代码中的sliding_size设置为了2s,也就是说,窗口每2s就计算一次,每一次计算的window范围是4s内的所有元素。

        // 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 每2 秒对进入该窗口的所有数据进行前 4 秒数据的 reduce 和 print 操作
        val windowDStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyDStream.timeWindow(Time.seconds(4),Time
          .seconds(2))
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 启动 Flink 应用
        executionEnvironment.execute("test")
    

Window Fold

WindowedStream → DataStream:给窗口赋一个fold功能的函数,并返回一个fold后的结果。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111,'\n',3)

// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)

// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 执行fold操作
val streamFold = streamWindow.fold(100){
  (begin, item) =>
	begin + item._2
}

// 将聚合数据写入文件
streamFold.print()

// 执行程序
env.execute("TumblingWindow")
Aggregation on Window

WindowedStream → DataStream:对一个window内的所有元素做聚合操作。min和 minBy的区别是min返回的是最小值,而minBy返回的是包含最小值字段的元素(同样的原理适用于 max 和 maxBy)。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111)

// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)

// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 执行聚合操作
val streamMax = streamWindow.max(1)

// 将聚合数据写入文件
streamMax.print()

// 执行程序
env.execute("TumblingWindow")

七 EventTime与waterMark

7.1 EventTime的引入

在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime

如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

这里日志的时间是 Flink 根据我们的规则去解析生成的eventTime,而不是默认的 processingTime

而window 的时间区间是左闭右开的,及 2019-01-25 00:00:06 时间的日志会进入第二个window

7.2 Watermark的引入

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的Watermark。

Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。

数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。

Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。

个人总结一下:针对进入窗口的每条数据,计算当前所有达到窗口的数据的最大eventTime,将这个eventTime和延迟时间(watermark)做减法,差值如果大于某一个窗口的的结束时间,那么该窗口就进行算子操作

有序流的Watermarker如下图所示:(Watermark设置为0)

乱序流的Watermarker如下图所示:(Watermark设置为2)

当Flink接收到每一条数据时,都会产生一条Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime - 延迟时长,也就是说,Watermark是由数据携带的,一旦数据携带的Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于Watermark是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发

上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s~5s,窗口2是6s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。

7.3 测试代码

    // 初始化 Flink 执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 将 Flink 时间由默认的processingTime 设置为 eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val source: DataStream[String] = env.socketTextStream("localhost", 1234)

    // 设置watermark 以及如何解析每条日志数据中的eventTime
    val stream: DataStream[String] = source.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {
        override def extractTimestamp(element: String): Long = {
          val time: Long = element.split(" ")(0).toLong
          println(time)
          time
        }
      }
    )

    val keyStream: KeyedStream[(String, Int), Tuple] = stream.map(e => (e.split(" ")(1), 1)).keyBy(0)
    // 设置滚动窗口的长度为5秒,及每5秒的eventTime 间隔计算一次
    val windowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
    val reduceStream: DataStream[(String, Int)] = windowStream.reduce(
      (e1, e2) => (e1._1, e1._2 + e2._2)
    )
    reduceStream.print()

    env.execute("test")
  }

测试如下

如果watermark 设置为2,那么等到7000(毫秒)以及大于这个时间的日志进入window 的时候,才会进行第一个窗口的计算

如果窗口类型设置为 SlidingEventTimeWindows ,那么watermark 影响的就是滑动窗口的计算时间,感兴趣的可以自己试试

如果窗口类型设置为 EventTimeSessionWindows.withGap(Time.seconds(10)),那么影响的就是相邻两条数据的时间间隔必须大于指定时间才会触发计算

八 总结

Flink是一个真正意义上的流计算引擎,在满足低延迟和低容错开销的基础之上,完美的解决了exactly-once的目标,真是由于Flink具有诸多优点,越来越多的企业开始使用Flink作为流处理框架,逐步替换掉了原本的Storm和Spark技术框架。