Spark Streaming学习——DStream

2,386 阅读12分钟

其他更多java基础文章:
java基础学习(目录)


概述

SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。

运行原理

Spark Streaming架构

Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch interval(如5秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备

DStream

DStream(Discretized Stream)作为Spark Streaming的基础抽象,它代表持续性的数据流。这些数据流既可以通过外部输入源赖获取,也可以通过现有的Dstream的transformation操作来获得。在内部实现上,DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流。

下面是DStream的创建例子:

        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
            .set("spark.testing.memory","2147480000");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);

API

transform算子

Transformation含义
map(func)对DStream中的各个元素进行func函数操作,然后返回一个新的DStream
flatMap(func)与map方法类似,只不过各个输入项可以被输出为零个或多个输出项
filter(func)过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream
repartition(numPartitions)增加或减少DStream中的分区数,从而改变DStream的并行度
union(otherStream)将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
count()通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
reduce(func)对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
countByValue()对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
reduceByKey(func, [numTasks])利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
cogroup(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
transform(func)通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
updateStateByKey(func)根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream

Windows Operation

总结:

  • batch interval:5s
    每隔5秒切割一次batch,封装成DStream
  • window length:15s
    进行计算的DStream中包含15s的数据。即3个batch interval
  • sliding interval:10s
    每隔10s取最近3个batch(window length)封装的DStream,封装成一个更大的DStream进行计算
/**
* batch interval:5s
* sliding interval:10s
* window length: 60s
* 所以每隔 10s 会取 12 个 rdd,在计算的时候会将这 12 个 rdd 聚合起来
* 然后一起执行 reduceByKeyAndWindow 操作
* reduceByKeyAndWindow 是针对窗口操作的而不是针对 DStream 操作的
*/
JavaPairDStream<String, Integer> searchWordCountsDStream =
searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer,
Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}, Durations.seconds(60), Durations.seconds(10));

优化Windows Operation

假设 batch=1s, window length=5s, sliding interval=1s, 那么每个 DStream 重复计算了 5 次,优化后, (t+4)时刻的 Window 由(t+3)时刻的 Window 和(t+4)时刻的 DStream 组成, 由于(t+3)时刻的 Window 包含(t-1)时刻的 DStream,而(t+4)时刻的 Window 中不需要包含(t-1) 时刻的 DStream,所以还需要减去(t-1)时刻的 DStream,所以: Window(t+4) = Window(t+3) + DStream(t+4) - DStream(t-1)。注意,使用此方法必须设置checkpoint目录,用来保存Window(t+3)的数据

//必须设置 checkpoint 目录
jssc.checkpoint("hdfs://node01:8020/spark/checkpoint");
JavaPairDStream<String, Integer> searchWordCountsDStream =
searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer,
Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
},new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 - v2;
}
}, Durations.seconds(60), Durations.seconds(10));

Driver HA

提交任务时设置

spark-submit –supervise

以集群方式提交到 yarn 上时, Driver 挂掉会自动重启,不需要任何设置
提交任务,在客户端启动 Driver,那么不管是提交到 standalone 还是 yarn, Driver 挂掉后 都无法重启

代码中配置

上面的方式重新启动的 Driver 需要重新读取 application 的信息然后进行任务调度,实 际需求是,新启动的 Driver 可以直接恢复到上一个 Driver 的状态(可以直接读取上一个 StreamingContext 的 DSstream 操作逻辑和 job 执行进度,所以需要把上一个 StreamingContext 的元数据保存到 HDFS 上) ,直接进行任务调度,这就需要在代码层面进 行配置。

public class SparkStreamingOnHDFS {
    public static void main(String[] args) {
        final SparkConf conf = new SparkConf()
            .setMaster("local[1]")
            .setAppName("SparkStreamingOnHDFS");
    //这里可以设置一个线程,因为不需要一个专门接收数据的线程,而是监控一个目录
        final String checkpointDirectory = "hdfs://node01:9000/spark/checkpoint";
        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
            @Override
            public JavaStreamingContext create() {
                return createContext(checkpointDirectory,conf);
            }
        };
        JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
        jsc.start();
        jsc.awaitTermination();
        // jsc.close();
    }
    @SuppressWarnings("deprecation")
    private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf) {
        System.out.println("Creating new context");
        SparkConf sparkConf = conf;
        //每隔 15s 查看一下监控的目录中是否新增了文件
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(15));
        ssc.checkpoint(checkpointDirectory);
        /**
        * 只是监控文件夹下新增的文件,减少的文件是监控不到的,
        文件内容有改动也是监控不到
        */
        JavaDStream<String> lines = ssc.textFileStream("hdfs://node01:8020/spark");
        /**
        * 接下来可以写业务逻辑,比如 wordcount
        */
        return ssc;
    }
}

执行一次程序后, JavaStreamingContext 会在 checkpointDirectory 中保存,当修 改了业务逻辑后,再次运行程序, JavaStreamingContext.getOrCreate(checkpointDirectory, factory); 因为 checkpointDirectory 中有这个 application 的 JavaStreamingContext,所以不会 调用 JavaStreamingContextFactory 来创建 JavaStreamingContext,而是直接 checkpointDirectory 中的 JavaStreamingContext,所以即使业务逻辑改变了,执行的效 果也是之前的业务逻辑, 如果需要执行修改过的业务逻辑,可以修改或删除 checkpointDirectory

与Kafka的两种连接方式

Receiver模式

获取 kafka 传递的数据来计算:

SparkConf conf = new SparkConf()
    .setAppName("SparkStreamingOnKafkaReceiver")
    .setMaster("local[2]")
    .set("spark.streaming.receiver.writeAheadLog.enable","true");
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
//设置持久化数据的目录
jsc.checkpoint("hdfs://node01:8020/spark/checkpoint");
Map<String, Integer> topicConsumerConcurrency = new HashMap<String,Integer>();
//topic 名 receiver task 数量
topicConsumerConcurrency.put("test_create_topic", 1);
JavaPairReceiverInputDStream<String,String> lines =
KafkaUtils.createStream(
    jsc,
    "node02:2181,node03:2181,node04:2181",
    "MyFirstConsumerGroup",
    topicConsumerConcurrency,
    StorageLevel.MEMORY_AND_DISK_SER());
/*
* 第一个参数是 StreamingContext
* 第二个参数是 ZooKeeper 集群信息(接受 Kafka 数据的时候会从 Zookeeper 中获得
Offset 等元数据信息)
* 第三个参数是 Consumer Group
* 第四个参数是消费的 Topic 以及并发读取 Topic 中 Partition 的线程数
* 第五个参数是持久化数据的级别,可以自定义
*/
//对 lines 进行其他操作……

kafka 客户端生产数据的代码:

public class SparkStreamingDataManuallyProducerForKafka extends Thread {
    private String topic; //发送给 Kafka 的数据的类别
    private Producer<Integer, String> producerForKafka;
    public SparkStreamingDataManuallyProducerForKafka(String topic){
        this.topic = topic;
        Properties conf = new Properties();
        conf.put("metadata.broker.list","node01:9092,node02:9092,node03:9092");
        conf.put("serializer.class", StringEncoder.class.getName());
        producerForKafka = new Producer<Integer, String>(new ProducerConfig(conf)) ;
    }
    @Override
    public void run() {
        while(true){
            counter ++;
            String userLog = createUserLog();
            //生产数据这个方法可以根据实际需求自己编写
            producerForKafka.send(new KeyedMessage<Integer, String>(topic, userLog));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        new SparkStreamingDataManuallyProducerForKafka("test_create_topic").start();
        //test_create_topic 是 topic 名
    }
}

Direct 方式

把 kafka 当作一个存储系统,直接从 kafka 中读数据, SparkStreaming 自己维护消费者 的消费偏移量

SparkConf conf = new SparkConf()
    .setAppName("SparkStreamingOnKafkaDirected")
    .setMaster("local[1]");
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(10));
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list","node01:9092,node02:9092,node03:9092");
HashSet<String> topics = new HashSet<String>();
topics.add("test_create_topic");
JavaPairInputDStream<String,String> lines =KafkaUtils.createDirectStream(jsc,
    String.class,
    String.class,
    StringDecoder.class,
    StringDecoder.class,
    kafkaParameters,
    topics);
//对 lines 进行其他操作……

Direct方式优劣

在实际的应用中,Direct Approach方式很好地满足了我们的需要,与Receiver-based方式相比,有以下几方面的优势:

  1. 降低资源。Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
  2. 降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。
  3. 鲁棒性更好。Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

但是也存在一些不足,具体如下:

  1. 提高成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。
  2. 监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。 接!

两种方式下提高 SparkStreaming 并行度的方法

Receiver 方式调整 SparkStreaming 的并行度的方法:

  • 假设 batch interval 为 5s, Receiver Task 会每隔 200ms(spark.streaming.blockInterval 默 认)将接收来的数据封装到一个 block 中,那么每个 batch 中包括 25 个 block, batch 会被封 装到 RDD 中,所以 RDD 中会包含 25 个 partition,所以提高接收数据时的并行度的方法 是:调低 spark.streaming.blockInterval 的值,建议不低于 50ms

其他配置:

  • spark.streaming.backpressure.enable 默认 false, 设置为 true 后, sparkstreaming 会根 据上一个 batch 的接收数据的情况来动态的调整本次接收数据的速度,但是最大速度不能 超过 spark.streaming.receiver.maxRate 设置的值(设置为 n,那么速率不能超过 n/s)
  • spark.streaming.receiver.writeAheadLog.enable 默认 false 是否开启 WAL 机制 Direct 方式并行度的设置:
  • 第一个 DStream 的分区数是由读取的 topic 的分区数决定的,可以通过增加 topic 的 partition 数来提高 SparkStreaming 的并行度

优化

1. 并行化数据接收:处理多个topic的数据时比较有效

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

2. spark.streaming.blockInterval:增加block数量,增加每个batch rdd的partition数量,增加处理并行度

receiver从数据源源源不断地获取到数据;首先是会按照block interval,将指定时间间隔的数据,收集为一个block;
每个batch对应的task数量大约是:batch interval / block interval。 例如说,batch interval为2s,block interval为200ms,会创建10个task。如果你认为每个batch的task数量太少,即低于每台机器的cpu core数量,那么就说明batch的task数量是不够的,因为所有的cpu资源无法完全被利用起来。要为batch增加block的数量,那么就减小block interval。
默认时间是200ms,官方推荐不要小于50ms;接着呢,会将指定batch interval时间间隔内的block,合并为一个batch;创建为一个rdd,然后启动一个job,去处理这个batch rdd中的数据

batch rdd,它的partition数量是多少呢?一个batch有多少个block,就有多少个partition;就意味着并行度是多少;就意味着每个batch rdd有多少个task会并行计算和处理。

当然是希望可以比默认的task数量和并行度再多一些了;可以手动调节block interval;减少block interval;每个batch可以包含更多的block;有更多的partition;也就有更多的task并行处理每个batch rdd。

3. inputStream.repartition():重分区,增加每个batch rdd的partition数量

有些时候,希望对某些dstream中的rdd进行定制化的分区 对dstream中的rdd进行重分区,去重分区成指定数量的分区,这样也可以提高指定dstream的rdd的计算并行度

4. 调节并行度

spark.default.parallelism  
reduceByKey(numPartitions)

5. 使用Kryo序列化机制:

spark streaming,也是有不少序列化的场景的 提高序列化task发送到executor上执行的性能,如果task很多的时候,task序列化和反序列化的性能开销也比较可观 默认输入数据的存储级别是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到数据,默认就会进行持久化操作;首先序列化数据,存储到内存中;如果内存资源不够大,那么就写入磁盘;而且,还会写一份冗余副本到其他executor的block manager中,进行数据冗余。

6. batch interval:每个的处理时间必须小于batch interval

实际上你的spark streaming跑起来以后,其实都是可以在spark ui上观察它的运行情况的;可以看到batch的处理时间; 如果发现batch的处理时间大于batch interval,就必须调节batch interval 尽量不要让batch处理时间大于batch interval 比如你的batch每隔5秒生成一次;你的batch处理时间要达到6秒;就会出现,batch在你的内存中日积月累,一直囤积着,没法及时计算掉,释放内存空间;而且对内存空间的占用越来越大,那么此时会导致内存空间快速消耗

如果发现batch处理时间比batch interval要大,就尽量将batch interval调节大一些