kafka-offset

619 阅读3分钟

实时计算中,source端和sink端很有可能都是kafka,涉及到如何在整个过程中能够不丢数据也就是at-least-once,在这个基础上如何再做到exactly-once。在工作中,比如告警数据,这种是不能丢的,丢了的话高等级告警报不出来,你得背多大一口锅?😹

先从kafka自身编程开始,后面再引入Spark Streaming。

如何提交offset, 有的时候我们会从kafka拉取数据,经过处理把数据写入MySQL,这时候如何保证数据写入MySQL并且offset也提交呢。为了业务的准确性,这两部操作要么全部成功,要么全部失败。这就是一个事务操作,如果offset提交到kafka,消息存入mysql这是没办法保证事务的,除非offset也写入mysql,也就是由我们自己管理offset

消费者的关闭

由于消费者的离开会触发rebalance,如果消费者直接裸退,那么群组协调器是不会立刻触发rebalance的,因为可能是因为网络抖动,不能直接就确定该消费者是死了。但是设置一个hook,在hook中调用消费者的wakeup方法。

final Thread mainThread = Thread.currentThread();

// 优雅的关闭消费者
// 在程序退出的时候,调用wakeup,其实就是为了触发下面代码中finally中的 close
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.err.println("shutdown");
    // 这是唯一一个能在其他线程调用的consumer方法
    consumer.wakeup();
    try {
        mainThread.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}));

消费者是不能多线程中使用的,除了这个wakeup方法

调用这个方法有什么用呢?有兴趣的可以深入源码看看。调用后,消费者会触发一个异常。

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
            consumer.commitSync();
        }
    }
} catch (WakeupException ignored) {
} finally {
    // 这步操作是有必要的
    consumer.close();
}

这个异常不用管,主要是触发finally,显示的调用close,这里面其实是消费者和群组协调器有一次通信,群组协调器会立马触发rebalance。

SparkStreaming Offset

offset在executor端管理还是Driver端管理呢?可以思考一下,答案应该是在Driver端管理。如何保证SparkStreaming不重复消费数据,不重复写下游呢。(面试重点

  • 关闭失败重试(一般重复消费重复写,都是由于task失败重试),聊聊而已,没有人这么做。
  • 下游是文件:每个task落文件,落文件这种方式(overwrite)每次有任务失败就重写文件,这样是没问题的,可以保证结果不重复。
  • 下游是DB:task计算完成后,将数据写入DB时开启事务,这样task失败的时候事务会失败,但是之前有同事的代码里并行度很高,导致数据库连接很多调优点:降低分区数,越低越好,趋向于1,这样就有很少的和DB的连接。但是这样是有问题的,数据写进DB了,但是offset还没有提交。Job完成后跟driver汇报,按理说driver就该提交offset了,但是driver可能会挂,这时候offset就没提交成功。会有重复消费问题。
  • 终极解决方案:数据回收到Driver,collect,这时候Driver内存调大,业务和offset在同一个事务里。这样是最稳的。

三种处理语义

现在有这么一个场景:Java程序从kafka消费数据,然后处理后需要写到MySQL中。

最多一次

当程序从kafka消费到数据后,先进行offset的更新,再进行写MySQL

最少一次

先写MySQL,再更新offset。

仅一次

offset维护在mysql里,把业务和更新offset放在一个DB事务里。

kafka-consumer

offset,其实都是围绕着它来转的。