完成你的第一个Spark Streaming程序

1,647 阅读11分钟

前言

其实 Spark Streaming 主要就是把算子用用,多敲代码的事儿。我当时觉得这个Spark Streaming好像要提的事情并不多呀,所以就直接跳过了。然后···

那我还是回头来扯扯吧😂

一、Spark Streaming

1.1 运行流程

  1. 首先我们无论是提交Spark core也好,Spark SQL也好,Spark Streaming也罢,我们都会统称为一个 “Application”
  2. 而且我们要知道,Spark SQLSpark Streaming 的底层都是依赖于Spark Core的,所以我们要使用它们之前,必须先初始化好一个Spark Core的程序入口,从代码中体现的话,就是 StreamingContext 必须依赖于 SparkContext,而SparkContext里面我们必然是会初始化一个Driver的服务的,所以它们的结构就是这个样子
  1. 以上这些都初始化好之后,我们的worker自然也已经分配好 Executor 了
  2. 之后Driver服务就会发送 Receiver 对象到 Executor 上,Receiver默认就只有一个,当然也可以通过代码设置为多个
  3. Receiver启动后它其实就可以视为一个Task任务(这个Task就开始不断接收数据,并封装成block,之后把block写到Executor的内存中)
  4. Receiver会把这些Block的信息告诉Driver

虽然图很简陋,但是能把信息准确地表达就好。

  1. Driver会根据一定的时间间隔,把这些 Block 组织成一个 RDD ,其实一个 block 就是一个partition (1个partition -> 1个task)

1.2 BlockInterval 和 BatchInterval

这时候多少数据才会形成一个Block呢?答案是每 200 毫秒形成一个Block

那多久时间会把这些Block合并成一个RDD呢?答案就是你的代码中的那句

// 这里的2是2秒的意思
val ssc = new StreamingContext(conf,Seconds(2));

Driver就会把这2s中的数据看成一个RDD

这就是我们的 BlockInterval 和 BatchInterval ,这两个比较重要的时间参数,一个由默认值200毫秒,一个由我们用户自行控制

我们也可以在官网那儿去查看这些配置的参数

然后ctrl+f,搜一下blockInterval或者200都可以找到

暂时需要知道的就这么多,我们来动手写下代码

1.3 入门的WordCount程序

在之前的分享中已经反复提到过了,如果是实时的程序,我们关心的就是它的数据的输入,数据的处理,和数据的输出

1.3.1 POM文件

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.1</spark.version>
    <hadoop.version>2.7.5</hadoop.version>
    <encoding>UTF-8</encoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>


</dependencies>



<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
        </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

1.3.2 数据的输入

我们先要创建Spark Streaming的程序入口

// 1.数据的输入
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("wordCount")

这里我们来解释一下,第一行就是我们刚刚提到的,创建SparkCore的程序入口,这个没毛病了,setAppName就是设置个名字,爱是啥就是啥,也没啥可说的

1.3.3 补充 setMaster(local[2]) 的扩展

第二行,我们setMaster(local[2]),这里要注意,我们消费Kafka的数据,有 Receiver 和 Direct 两种方式,这两种方式是不一样的。

官网中提供的JAR包是两个,一个是基于0.8版本的整合,这个提供了recevier和direct两种版本的整合一个是0.10版本整合,只提供了direct方式

Receiver整合是在Spark的executor当中启动了一个recevier的线程,专门去拉取数据,拉取回来的数据receiver是不会帮忙处理的,所以receiver就是搬东西的,它基于Kafka的high level API进行消费,offset自动保存于zookeeper中,不用自己主动维护

而此时,拉取数据的线程和处理数据的线程互相是不通信的,当我们处理数据的线程挂掉了之后,拉取数据的线程是感知不到的,它仍在不断拉取数据。这时候数据全部会堆积在executor的内存里面,就会出现问题

Direct方式不再单独启动线程去拉取数据,获取到的数据也不再保存在executor里面,获取到的数据直接进行处理,拉取和处理完全就是一拨人。

当然它也有问题:使用Kafka的low API进行消费,需要手动维护offset的值。老版本中我们还会保存在zookeeper中,新版本默认是存在了Kafka的默认的一个topic里面,当然你为了一些特殊的需求,存在MySQL和Hbase···那些也是有可能的。

此时我们setMaster(local[1])行不行呢?那如果是direct的方式,那是可以的,不过此时我们就是单线程拉取并处理,但是如果是基于 Receiver 的方式进行消费的话,那就完蛋了,local[1]的意思是只启动一个线程的,这时候你就会发现你的程序不报错,可是数据死活不出来的情况,所以大家一定要注意,如果是local[*],那就是电脑的cpu core有多少开多少

之后

val ssc = new StreamingContext(conf, Seconds(2))

获取到SparkStreaming的程序入口,设置2s形成一个RDD,即可

1.3.3 数据的获取及处理

这里其实就是简单的从socket那去获取数据而已

// 2.数据的处理
val dataDStream = ssc.socketTextStream("localhost"8888)
val result = dataDStream.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)

1.3.4 数据的输出

// 3.数据的输出
result.print()

如果我们进入到企业工作,也会发现基本上我们只需要编写处理数据的那部分代码,前置部分的获取配置和下游的输出基本不会太过于关注

此时我们可以运行代码,用在flink的第二篇提到的netcat去监听8888端口(不是8888,随便也行)即可

但是此时我们这个统计并不是累计的,累计的要求需要用到一些高级算子

finally

···之后Flink,ES,Spark随机出现,现在有打算重新把组件都重新学习一下,感兴趣的朋友可以关注 公众号:说出你的愿望吧