阅读 285

9.Flink 分布式缓存

1.分布式缓存

1.1 介绍

1.1.1 技术细节
1.分布式文件缓存的作用与广播变量类似,也是为了避免出现join 操作发生数据倾斜而设计出来的
2.与广播变量不同之处在于:
  广播变量中封装的数据类型是: DataSet, DataStream
  分布式文件缓存中封装的数据类型是File
3.分布式文件缓存底层如何运作?
  将分布式文件系统上的资源文件装载到TaskManager进程所开辟的内存中
  将TaskManager进程中所有slot所管理的线程都共享这个文件,不需要每个线程单独开启空间进行存储
复制代码
1.1.2 案例设计
hdfs上准备一个共通的资源文件,如genderinfos.txt->存储的学生的性别信息
根据依次录入的学生信息,结合学生性别信息,完整输出学生的信息
复制代码

1.2 实操

1.2.1 前提
1.准备hdfs上的资源
[robin@node01 ~]$ hdfs dfs -mkdir /flink/cache
[robin@node01 ~]$ vi gender.txt
1, 男
2, 女
[robin@node01 ~]$ hdfs dfs -put gender.txt /flink/cache
2.开启源(socket)
复制代码
1.2.2 代码实现
package com.jd.unbounded.sample_cache

import java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import scala.collection.mutable
import scala.io.{BufferedSource, Source}

/**
  * Description  分布式缓存演示
  * @author lijun
  * @create 2020-03-30
  */
object DistrubutedCacheTest {
  def main(args: Array[String]): Unit = {
    //1.环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2. 读取hdfs上的资源,并设置到分布式缓存中
    env.registerCachedFile("hdfs://node01:9000/flink/cache/gender.txt","hdfsGenderInfo")

    //3. 读取socket实时发送过来的学生信息,进行计算,并输出结果
    //(101,"jackson",1,"上海")
    env.socketTextStream("node01",8888)
      .filter(_.trim.nonEmpty)
      .map(new RichMapFunction[String,(Int,String,Char,String)] {

        //用于存储从分布式缓存中读取的学生信息
        val map:mutable.Map[Int,Char]= mutable.HashMap()
        var bs: BufferedSource = _

        override def open(parameters: Configuration): Unit = {
          //1.读取分布式缓存中存储的数据
          var file:File = getRuntimeContext.getDistributedCache.getFile("hdfsGenderInfo")

          //2.将读取到的信息封装到map实例中存储
          bs = Source.fromFile(file)
          val lst = bs.getLines().toList
          for(perLine <-lst){
              val arr = perLine.split(",")
            val genderFlg = arr(0).trim.toInt
            val genderName = arr(1).trim.toCharArray()(0)
            map.put(genderFlg,genderName)
          }
        }

        override def map(perStudentInfo: String): (Int, String, Char, String) = {
          //获得学生的详细信息
          val arr = perStudentInfo.split(",")
          val id = arr(0).trim.toInt
          val name = arr(1).trim
          val genderFlg = arr(2).trim.toInt
          val address = arr(3).trim
          //根据容器Map中存储的分布式缓存中的数据,将学生信息中的性别标识替换为真实的性别
          var genderName = map.getOrElse(genderFlg, 'x')
          (id, name, genderName, address)
        }

        override def close(): Unit = {
          if(bs != null){
            bs.close()
          }
        }
      }).print("学生完成的信息是->")

    //4. 启动
    env.execute(this.getClass.getSimpleName)
  }
}
复制代码
1.2.3 测试

开启源

控制台输出

1.2.4 Flink分布式缓存中从远程hdfs上下载资源到TaskManager本地文件系统目录