在spring boot三分钟上手无界流处理系统Spark Streaming,并实现流式点赞统计

4,197 阅读3分钟

在页面上每次点赞,把这个被点赞的文章id发送到kafka,然后通过spark streaming读取kafka里的数据,统计出点赞的数量,更新回mysql中

完整案例代码已上传github:github.com/neatlife/my…

获取案例项目

可以在https://start.spring.io上创建项目

这里加上web-starter, kafka-starter即可,spark和spark kafka streaming相关以来需要手动添加,并没有对应的starter可用

然后在pom.xml中引入kafka的客户端和spark streaming的客户端

<!--spark相关依赖-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.2</version>
</dependency>

把点赞的postId发到kafka

上手kafka可以参考: juejin.cn/post/684490…

创建kafka的topic,比如这里使用mylikes这个topic

kafka-topics --create --topic mylikes --replication-factor 1 --partitions 1 --zookeeper zoo1:2181

操作效果如下

新增一个点赞接口,核心代码如下

@RequestMapping("/send-like")
public String sendLike(@RequestParam(value = "post_id", required = true) Integer postId) {
    producer.send(postId);
    return "test1";
}

kafka发送核心代码如下

public void send(Integer postId) {
        ProducerRecord producerRecord = new ProducerRecord(topic, postId.toString(), "1");
        this.kafkaTemplate.send(producerRecord);
        System.out.println("Sent sample postId [" + postId + "] to " + topic);
}

记下这里使用的kafka的topic:mylikes,在spark里也需要使用这个topic 这里注意发送到kafka的key和value都是字符串,id和点赞数是int,所以在spark进行处理时需要做这个类型转换

在spark中从kafka中读取数据计算点赞量

创建从kafka中读取数据的spark客户端

SparkConf conf = new SparkConf()
        .setAppName("mySparkLikes")
        .setMaster("local[*]")
        .set("spark.default.parallelism", "15")
        .set("spark.streaming.concurrentJobs", "5")
        .set("spark.executor.memory", "1G")
        .set("spark.cores.max", "3")
        .set("spark.local.dir", "/tmp/mySparkLikes")
        .set("spark.streaming.kafka.maxRatePerPartition", "5");

Set<String> topics = Collections.singleton(topic);

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "127.0.0.1:9092");

JavaStreamingContext jsc = new JavaStreamingContext(
        new JavaSparkContext(conf),
        Durations.seconds(3));
jsc.checkpoint("checkpoint");

创建kafka数据流

// 得到数据流
final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
        jsc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topics
);
System.out.println("stream started!");
stream.print();

stream.print() 触发读取数据

将kafka里的字符串类型的postId和点赞次数转成整数类型

JavaPairDStream<Integer, Integer> countDStream = stream
        .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<Integer, Integer>>() {

            @Override
            public JavaPairRDD<Integer, Integer> call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
                return stringStringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {

                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                        return new Tuple2<>(new Integer(stringStringTuple2._1), new Integer(stringStringTuple2._2));
                    }
                });
            }
        })
        .reduceByKey(Integer::sum);

生成点赞次数的sql语句

countDStream.foreachRDD(v -> {
    v.foreach(record -> {
        String sql = String.format("UPDATE `post` SET likes = likes + %s WHERE id=%d", record._2, record._1);
        System.out.println(sql);
    });
    log.info("一批次数据流处理完: {}", v);
});

启动流计算

jsc.start();

添加一个接口来调用上面的代码

@RequestMapping("/launch")
public String launch() {
    sparkLikeService.launch();
    return "test2";
}

先访问/launch来启动流计算引擎,然后访问send-like接口生成点赞数据,查看控制台生成的sql语句,操作效果如下

可以看到已经拿到了点赞数的sql,可以使用jpa把这个点赞数据存放到数据库中了

本地调试

这个spark的job可以本地调试,但是需要满足以下几个条件

  1. spark需要在本地启动
  2. job连接master的地址需要是:local[*]

参考资料

  1. www.iteblog.com/archives/13…
  2. github.com/eBay/Spark/…
  3. blog.csdn.net/guotong1988…
  4. www.4spaces.org/spark-map-f…
  5. blog.csdn.net/wuxintdrh/a…