阅读 8

12. Flink窗口模型


Flink Windows

1、窗口概述

streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。
Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。

按照统计维度的不同,Flink 中的窗口可以分为

时间窗口 (Time Windows) 和 计数窗口 (Count Windows) 。

2、Time Windows (按照时间生成window)

时间窗口(Time Windows) 用于以时间为维度来进行数据聚合,具体分为以下3类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

2.1 滚动窗口 (Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片。

例如:每隔1小时统计过去1小时内的商品点击量,那么 1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的,具体如下:


滚动窗口特点:

  • 时间对齐
  • 窗口长度固定
  • 没有重叠

适用场景:适合做BI统计等(做每个时间段的聚合计算)。

2.1.1 案例
需求:  每隔4秒钟,求出不同红外测温仪监测到的体温最高的旅客信息
结果:
    红外测温仪id  最高温度
复制代码
2.1.2 源码
package com.jd.unbounded.sample_window.timewindow.a_tubling

import com.jd.unbounded.Raytek
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**
  * Description 滚动窗口演示
  *
  * @author lijun
  * @create 2020-04-01
  */
object TublingWindowTest {

  def main(args: Array[String]): Unit = {
    //1.环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.获取两个无界流
    val srcDataStream = env.socketTextStream("localhost", 6666)
      .filter(_.trim.nonEmpty)
      .map(perTraveller => {
        val arr = perTraveller.split(",")
        val id = arr(0).trim
        val temperature = arr(1).trim.toDouble
        val name = arr(2).trim
        val timestamp = arr(3).trim.toLong
        val location = arr(4).trim
        Raytek(id, temperature, name, timestamp, location)
      })

    //3.每隔4秒,求出不同红外测温仪所监测到的体温最高的旅客信息
    //获得窗口流
    val ws: WindowedStream[Raytek, Tuple, TimeWindow] = srcDataStream.keyBy("id")
      .timeWindow(Time.seconds(4))

    //聚合,求最高的体温的旅客信息并显示输出
    ws.reduce((nowTraveller:Raytek,nextTraveller:Raytek)=>{
      if(nowTraveller.temperature > nextTraveller.temperature) nowTraveller
      else nextTraveller
    }).print("滚动时间窗口效果 -->")

    //4.启动
    env.execute()
  }
}
复制代码
2.1.3 效果
2.1.3.1 source端
2.1.3.2 控制台输出

2.2 滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1天可以分为 240 个窗口。图示如下:

滑动窗口特点:

  • 窗口长度固定
  • 可以有重叠

适用场景:对最近一个时间段内的统计

2.2.1 案例
需求:  每隔2秒,统计过去4秒之内,不同红外测温仪所监测到的体温最高的旅客信息
结果:
    红外测温仪id  最高温度
复制代码
2.2.2 源码
package com.jd.unbounded.sample_window.timewindow.b_slide

import com.jd.unbounded.Raytek
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**
  * Description 
  *
  * @author lijun
  * @create 2020-04-01
  */
object SlideTimeWindowTest {

  def main(args: Array[String]): Unit = {
    //1.环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.获取两个无界流
    val srcDataStream = env.socketTextStream("localhost", 6666)
      .filter(_.trim.nonEmpty)
      .map(perTraveller => {
        val arr = perTraveller.split(",")
        val id = arr(0).trim
        val temperature = arr(1).trim.toDouble
        val name = arr(2).trim
        val timestamp = arr(3).trim.toLong
        val location = arr(4).trim
        Raytek(id, temperature, name, timestamp, location)
      })

    //3.每隔2秒,统计过去4秒之内,不同红外测温仪所监测到的体温最高的旅客信息
    //获得窗口流
    val ws: WindowedStream[Raytek, Tuple, TimeWindow] = srcDataStream.keyBy("id")
      .timeWindow(Time.seconds(4),Time.seconds(2))

    //聚合,求最高的体温的旅客信息并显示输出
    ws.reduce((nowTraveller:Raytek,nextTraveller:Raytek)=>{
      if(nowTraveller.temperature > nextTraveller.temperature) nowTraveller
      else nextTraveller
    }).print("滑动时间窗口效果 -->")

    //4.启动
    env.execute()
  }
}
复制代码
2.2.3 效果
2.2.3.1 source
2.2.3.2 控制台输出

2.3 窗口分配器的使用

window()方法接收的输入参数是一个WindowAssigner,WindowAssigner负责将每条输入的数据分发到正确的window中

2.4 会话窗口(Session Windows)

一段时间没有接收到新数据就会生成新的窗口
想要实现这类统计,可以通过 Session Windows 来进行实现。


特点:时间无对齐。

3、Count Windows:按照指定的数据条数生成一个window

计数窗口(Count Windows) 根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key,同样也分为滚动窗口和滑动窗口,滚动窗口和滑动窗口的函数名是完全一致的,实现方式也和时间窗口完全一致,只是调用的 API 不同,具体如下:

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行
// 滚动计数窗口,每1000次点击则计算一次
countWindow(1000)
// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况
countWindow(1000,10)
复制代码

3.1 滚动计数窗口案例

3.1.1 案例设计
需求:
    安放在北京西站各个监测口的红外测温仪每监测到三个旅客后,就求出这三个旅客中体温最高的旅客信息
复制代码
3.1.2 源码
package com.jd.unbounded.sample_window.countwindow

import com.jd.unbounded.Raytek
import org.apache.flink.api.scala._
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow}

/**
  * Description  计数滚动窗口演示
  *
  * @author lijun
  * @create 2020-04-01
  */
object CountTublingWindowTest {

  def main(args: Array[String]): Unit = {
    //1.环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.获取两个无界流
    val srcDataStream = env.socketTextStream("localhost", 6666)
      .filter(_.trim.nonEmpty)
      .map(perTraveller => {
        val arr = perTraveller.split(",")
        val id = arr(0).trim
        val temperature = arr(1).trim.toDouble
        val name = arr(2).trim
        val timestamp = arr(3).trim.toLong
        val location = arr(4).trim
        Raytek(id, temperature, name, timestamp, location)
      })

    //3.红外测温仪每监测到三个旅客后,就求出这三个旅客中体温最高的旅客信息
    //获得窗口流
    val ws: WindowedStream[Raytek, Tuple, GlobalWindow] = srcDataStream.keyBy("id")
      .countWindow(3)

    //聚合,求最高的体温的旅客信息并显示输出
    ws.reduce((nowTraveller:Raytek,nextTraveller:Raytek)=>{
      if(nowTraveller.temperature > nextTraveller.temperature) nowTraveller
      else nextTraveller
    }).print("滚动计数窗口效果 -->")

    //4.启动
    env.execute()
  }
}
复制代码
3.1.3 效果
3.1.3.1 source端
3.1.3.2 控制台输出

3.2 滑动计数窗口案例

3.2.1 案例设计
需求:
    安放在北京西站各个监测口的红外测温仪每监测到2个旅客后,统计过去5个旅客中体温最高的旅客信息
复制代码
3.2.2 源码
package com.jd.unbounded.sample_window.countwindow

import com.jd.unbounded.Raytek
import org.apache.flink.api.scala._
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

/**
  * Description  滑动计数窗口演示
  *
  * @author lijun
  * @create 2020-04-01
  */
object CountSlideWindowTest {
  def main(args: Array[String]): Unit = {
    //1.环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.获取两个无界流
    val srcDataStream = env.socketTextStream("localhost", 6666)
      .filter(_.trim.nonEmpty)
      .map(perTraveller => {
        val arr = perTraveller.split(",")
        val id = arr(0).trim
        val temperature = arr(1).trim.toDouble
        val name = arr(2).trim
        val timestamp = arr(3).trim.toLong
        val location = arr(4).trim
        Raytek(id, temperature, name, timestamp, location)
      })

    //3.红外测温仪每监测到2个旅客后,统计过去5个旅客中体温最高的旅客信息
    //获得窗口流
    val ws: WindowedStream[Raytek, Tuple, GlobalWindow] = srcDataStream.keyBy("id")
      .countWindow(5,2)


    //聚合,求最高的体温的旅客信息并显示输出
    ws.reduce((nowTraveller:Raytek,nextTraveller:Raytek)=>{
      if(nowTraveller.temperature > nextTraveller.temperature) nowTraveller
      else nextTraveller
    }).print("滑动计数窗口效果 -->")

    //4.启动
    env.execute()
  }
}
复制代码
3.2.3 效果
3.2.3.1 source端
3.2.3.2 控制台输出
关注下面的标签,发现更多相似文章
评论