阅读 64

5.Flink流处理API之Data Source

1.内置 Data Source

Flink Data Source 用于定义 Flink 程序的数据来源,Flink 官方提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流,具体如下:

1.1 从文件中读取数据

  1. readTextFile(path):按照 TextInputFormat 格式读取文本文件,并将其内容以字符串的形式返回。示例如下:
package com.jd.unbounded.calboundeddata

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 需求:使用flink无界流的api,计算特定目录下所有离线的日志文件
  * @author lijun
  * @create 2020-03
  */
object UnboundedFlowTest {

  def main(args: Array[String]): Unit = {
    //执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //计算
    env.readTextFile("/Users/lijun/Downloads/flink_input/")
      .flatMap(_.split("\\s+"))
      .filter(_.nonEmpty).map((_,1))
      .keyBy(0)
      .sum(1)
      .print()

    // 启动
    env.execute(this.getClass.getSimpleName)
  }
}
复制代码

1.2 从集合中读取数据

1.2.1 fromCollection

fromCollection(Collection):基于集合构建,集合中的所有元素必须是同一类型。参数类型是集合,集合既可以是java中的集合类型,也可以是scala中的集合类型

示例1如下:

介绍

场景: 在北京西站各个关键出入场所,安放了一些红外体温测量仪,对往来的行人进行体温的实时的监测
需求: 从日志文件中读取红外体温测量仪内置传感器实时采集到的数据,经由flink实时流技术处理,给出结果。
复制代码
package com.jd.unbounded.source_collection

import com.jd.unbounded.Raytek
import scala.io.Source
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**
  * 场景: 在北京西站各个关键出入场所,安放了一些红外体温测量仪,对往来的行人进行体温的实时的监测
  * 需求: 从日志文件中读取红外体温测量仪内置传感器实时采集到的数据,经由flink实时流技术处理,给出结果。
  */
object ReadDataFromCollection {

  def main(args: Array[String]): Unit = {
    //步骤
    //1.执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.读取文件中的数据转换成集合
    val lst = Source.fromFile("/Users/lijun/Downloads/flink_input/raytek/raytek.log")
      .getLines()
      .toList

    //3.将集合中的数据封装到DataStream中去
    val dataStream:DataStream[String] = env.fromCollection(lst)

    //4. 对无界流数据进行迭代处理,并显示结果
    dataStream.map(perEle=>{
      val arr = perEle.split(",")
      val id = arr(0).trim
      val temperature = arr(1).trim.toDouble
      val name = arr(2).trim
      val timestamp = arr(3).trim.toLong
      val location = arr(4).trim
      Raytek(id,temperature,name,timestamp,location)
    }).print("红外体温测量仪")
    //5. 启动
    env.execute(this.getClass.getSimpleName)
  }
}
复制代码

运行的结果

示例2如下:

代码

package com.jd.unbounded.source_javacollection

import java.util.Collections
import com.jd.entity.Student
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import scala.collection.JavaConversions._

/**
  * 将java集合封装到DataStream
  * @author lijun
  * @create 2020-03-26 19:10
  */
object FromJavaCollectionTest {

  def main(args: Array[String]): Unit = {
    //步骤
    //1. 执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.将java中的集合封装到DataStream
    val container = new java.util.LinkedList[Student]()
    Collections.addAll(
      container,
      new Student("张三",98.33),
      new Student("张四",70.22),
      new Student("张五",90.11)
    )

    env.fromCollection(container)
      .print("将java中的集合封装到DataStream->")

    //3.启动
    env.execute(this.getClass.getSimpleName)
  }
}
复制代码

效果

1.2.2 fromElements

fromElements(T …): 基于元素构建,所有元素必须是同一类型。参数是可变长的,类型可以是:基础数据类型,样例类,POJO, 元组

示例如下:

package com.jd.unbounded.source_fromEle

import com.jd.entity.Student
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

/**
  * Description: fromElements,参数是可变长的,类型可以是:基础数据类型,样例类,POJO, 元组
  * @author lijun
  * @create 2020-03-26 
  */
object FromElementsTest {
  def main(args: Array[String]): Unit = {
    //步骤
    //1. 执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2. 分别设置不同的source,计算并输出
    //a) 基础数据类型
    env.fromElements(56.48,90.34,66.89)
      .filter(_ >= 60)
      .print("基础数据类型: 及格的学生分数-->")

    //b) 元组
    env.fromElements(("张三",99.33),("张四",59.99),("张五",88.88))
      .filter(_._2 < 60)
      .print("元组:不及格的学生信息-->")


    //c) POJO
    env.fromElements(
      new Student("武三",77.77),
      new Student("武四",56.55),
      new Student("武五",51.11)
    ).filter(perStu=>perStu.getScore>56 && perStu.getScore<78)
      .print("POJO: 考分在[56,78)之间的学生信息->")

    //d) 样例类
    env.fromElements(
      LittleStudent("段三",90),
      LittleStudent("段四",59),
      LittleStudent("段明",70))
      .filter(_.score < 60)
      .print("样例类: 班上不及格的学生信息-->")

    //3. 启动
    env.execute(this.getClass.getSimpleName)
  }
}
//小学生样例类
case class LittleStudent(name:String,score:Double)
复制代码

效果:

1.3 基于 Socket 构建

Flink 提供了 socketTextStream 方法用于构建基于 Socket 的数据流
socketTextStream 方法有以下四个主要参数:

  • hostname:主机名;
  • port:端口号,设置为 0 时,表示端口号自动分配;
  • delimiter:用于分隔每条记录的分隔符;
  • maxRetry:当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试。

示例如下:

package com.jd.unbounded.wordcount

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 使用flink无界流处理实时监控netcat客户端和服务器交互的数据,进行实时的计算,将计算后的结果显示出来
  * @author lijun
  * @create 2020-03-19 
  */
object UnboundedFlowTest {

  def main(args: Array[String]): Unit = {
    //拦截非法参数
    if(args == null || args.length != 4){
      println("警告!应该传入参数 --hostname<主机名> --port<端口号>")
      sys.exit(-1)
    }
    //获得参数
    val tool = ParameterTool.fromArgs(args)
    val hostname = tool.get("hostname")
    val port = tool.getInt("port")

    //执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //计算
    env.socketTextStream(hostname,port)
      .flatMap(_.split("\\s+"))
      .filter(_.nonEmpty).map((_,1))
      .keyBy(0)
      .sum(1)
      .print()

    // 启动
    env.execute(this.getClass.getSimpleName)
  }
}
复制代码

2. 自定义 Data Source

2.1 SourceFunction

除了内置的数据源外,用户还可以使用 addSource 方法来添加自定义的数据源。自定义的数据源必须要实现 SourceFunction 接口

示例如下:

1.如何自定义source?
  a.准备一个自定义类,实现接口SourceFunction
  b.重写接口中的方法:
    void run(SourceContext<T> ctx) 从真实的源中读取数据,发往flink处理
    void cancel()结束对真实的数据源的数据采集
2.案例设计
   场景: 使用自定义的source实时读取指定的日志文件中的数据,送往flink进行实时的处理,并显示结果。模拟的是readTextFile底层的实现
复制代码

代码

package com.jd.unbounded.source_self

import com.jd.unbounded.Raytek
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.io.Source

/**
  * Desription 自定义source
  * @author lijun
  * @create 2020-03-24 
  */
class MySource extends SourceFunction[Raytek]{
  //计数器,用来记录文件中的数据
  var cnt = 0
  //手动控制流的运行flag
  var isRunning = true

  override def run(ctx: SourceFunction.SourceContext[Raytek]): Unit = {
    //读取文件
    val lst = Source.fromFile("/Users/lijun/Downloads/flink_input/raytek/raytek.log")
      .getLines()
      .toList

    //通过循环,发送数据
    while (cnt < lst.size && isRunning){
      val perEle = lst(cnt)
      val arr = perEle.split(",")
      val id = arr(0).trim
      val temperature = arr(1).trim.toDouble
      val name = arr(2).trim
      val timestamp = arr(3).trim.toLong
      val location = arr(4).trim
      val instance = Raytek(id,temperature,name,timestamp,location)
      //发送
      ctx.collect(instance)

      cnt = cnt+1
    }
  }

  override def cancel(): Unit = {
    isRunning = false;
  }
}
----------------------------------------
使用上面的自定义Source类: MySource

package com.jd.unbounded.source_self

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

/**
  * Description: 使用自定义的source实时读取指定的日志文件中的数据,送往flink进行实时的处理,并显示结果
  * @author lijun
  * @create 2020-03-24 
  */
object ReadDataFromSelfSource {

  def main(args: Array[String]): Unit = {
    //步骤:
    //1.环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.读取source
    env.addSource(new MySource {
    }).print("自定义source-->")

    //3.启动
    env.execute(this.getClass.getSimpleName)
  }
}
复制代码

效果:

2.2 ParallelSourceFunction 和 RichParallelSourceFunction

上面通过 SourceFunction 实现的数据源是不具有并行度的,如果你想要实现具有并行度的输入流,则需要实现 ParallelSourceFunction 或 RichParallelSourceFunction 接口,其与 SourceFunction 的关系如下图:


ParallelSourceFunction 直接继承自 SourceFunction,具有并行度的功能。RichParallelSourceFunction 则继承自 AbstractRichFunction,同时实现了 ParallelSourceFunction 接口,所以其除了具有并行度的功能外,还提供了额外的与生命周期相关的方法,如 open() ,closen() 。

3. Streaming Connectors

3.1 内置连接器

除了自定义数据源外, Flink 还内置了多种连接器,用于满足大多数的数据收集场景。当前内置连接器的支持情况如下:
Apache Kafka (支持 source 和 sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
Google PubSub (source/sink)

除了上述的连接器外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为分布式数据分析系统 (如 Spark,Flink) 等提供功能上的扩展,当前其支持的与 Flink 相关的连接器如下:

Apache ActiveMQ (source/sink)
Netty (source)

3.2 整合 Kakfa

3.2.1 业务场景介绍
场景描述:
安放在北京西站各个关键点的红外测温仪实时监控过往旅客的体温情况,并将采集到的数据信息送往后台的服务器
Flume实时采集日志文件中的信息到kafka分布式集群
Flink无界流应用实时从kafka中拉取数据进行计算
将计算的结果反馈给红外测温仪
体温正常,不做任何处理
否则,报警,提示安全人员过来进行后续处理...
复制代码
3.2.2 实施步骤:
前提:(启动kafka分布式集群)
1.在kafka中创建好相应的主题
2.使用kafka消息生产方模拟一个业务场景,flume将采集的数据送往kafka名为raytek的主题

步骤:
1.要添加kafka与flink整合的依赖包
2.编写代码,从kafka中读取数据,计算,并显示
3.启动应用
4.验证
实时察觉控制台是否输出了处理之后的数据
复制代码
3.2.3 步骤详解:
  • 启动kafka集群
# zookeeper启动命令
bin/zkServer.sh start

#
 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:
# bin/kafka-server-start.sh config/server.properties
复制代码
  • 在kafka中创建相应的主题
[robin@node01 ~]$ kafka-topics.sh --create --topic raytek --partitions 3 --replication-factor 3 --zookeeper node03:2181
Created topic "raytek".

#
查看所有主题
[robin@node01 ~]$ kafka-topics.sh --list --zookeeper node01:2181
__consumer_offsets
raytek
test
[robin@node01 ~]$ kafka-topics.sh --describe --topic raytek --zookeeper node01:2181
Topic:raytek    PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: raytek   Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
        Topic: raytek   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: raytek   Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
复制代码
  • 启动Producer 模拟一个业务场景:将采集的数据发往kafka的raytek主题
[robin@node01 ~]$ kafka-console-producer.sh --topic raytek --broker-list node01:9092
>
复制代码
  • 编写代码,从kafka中读取数据,计算,并显示

添加kafka与flink整合的依赖包

<!-- flink kafka-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
复制代码

consumer.properties

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=node01:2181
bootstrap.servers=node01:9092

#
 timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#
consumer group id
group.id=test-consumer-group
复制代码

代码

import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}

/**
  * Description: 从kafka分布式集群中实时采集数据
  * @author lijun
  * @create 2020-03-22 
  */
object ReadDataFromKafka {

  def main(args: Array[String]): Unit = {
    //步骤
    //1.环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.从kafka中获取数据,计算,并显示
    //topic: String, valueDeserializer: DeserializationSchema[T], props: Properties
    val topic = "raytek"
    val valueDeserializer: DeserializationSchema[String] = new SimpleStringSchema()
    val props:Properties = new Properties()
    //将资源目录下的配置文件装载到Properties中
    props.load(this.getClass.getClassLoader.getResourceAsStream("consumer.properties"))


    env.addSource(new FlinkKafkaConsumer[String](topic,valueDeserializer,props)).print("kafka-->")
    //3.启动
    env.execute(this.getClass.getSimpleName)
  }
}
复制代码

测试结果

在 Producer 上输入任意测试数据,之后观察程序控制台的输出:

[robin@node01 ~]$ kafka-console-producer.sh --topic raytek --broker-list node01:9092
>raytek_2,37.4,bush,15826411231,北京西站-北广场3号停车场
>raytek_1,36.3,jack,1582641121,北京西站-北广场
raytek_3,36.8,john,1582641323,北京西站-地铁站
raytek_9,36.3,tom,1582641124,北京西站-公交车站
raytek_2,36.4,leon,1582641127,北京西站-北广场3号停车场
raytek_3,36.4,kate,1582641128,北京西站-东5号停车场
raytek_3,36.8,alice,1582641129,北京西站-南广场-公交站
raytek_3,37.8,jerry,1582641330,北京西站-地铁站>>>>>>
>
复制代码

flink 无界流应用读取后 程序控制台的输出如下:

kafka-->:3> raytek_2,37.4,bush,15826411231,北京西站-北广场3号停车场
kafka-->:3> raytek_9,36.3,tom,1582641124,北京西站-公交车站
kafka-->:3> raytek_3,36.8,alice,1582641129,北京西站-南广场-公交站
kafka-->:2> raytek_1,36.3,jack,1582641121,北京西站-北广场
kafka-->:2> raytek_2,36.4,leon,1582641127,北京西站-北广场3号停车场
kafka-->:2> raytek_3,37.8,jerry,1582641330,北京西站-地铁站
kafka-->:1> raytek_3,36.8,john,1582641323,北京西站-地铁站
kafka-->:1> raytek_3,36.4,kate,1582641128,北京西站-东5号停车场
复制代码