Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

4,136 阅读5分钟

很多情况大数据集群需要获取业务数据,用于分析。通常有两种方式:

  • 业务直接或间接写入的方式
  • 业务的关系型数据库同步到大数据集群的方式

第一种可以是在业务中编写代码,将觉得需要发送的数据发送到消息队列,最终落地到大数据集群。

第二种则是通过数据同步的方式,将关系型数据同步到大数据集群,可以是存储在 hdfs 上,使用 hive 进行分析,或者是直接存储到 hbase 中。

其中数据同步又可以大致分为两种:增量同步、CRUD 同步。

增量同步是只将关系型数据库中新增的数据进行同步,对于修改、删除操作不进行同步,这种同步方式适用于那些一旦生成就不会变动的数据。 CRUD 同步则是数据的增、删、改都需要进行同步,保证两个库中的数据一致性。

本文不讲 binlog + Canal + 消息队列 + JAR 实现数据实时同步的方案,也不讲使用 Sqoop 进行离线同步。而是讲解如何使用 Streamsets 零代码完成整个实时同步流程。关于 Streamsets 具体是什么,以及能做哪些其他的事情,大家可以前往 Streamsets 官网进行了解。从笔者了解的信息,在数据同步方面 Streamsets 十分好用。

要实现 mysql 数据的实时同步,首先我们需要打开其 binlog 模式,具体怎么操作网上有很多教程,这里就不进行阐述了。

那么,现在就直接进入正题吧。

安装

下载

Streamsets 可以直接从官网下载: archives.streamsets.com

这里安装的是 Core Tarball 格式,当然你也可以直接选择下载 Full Tarball、Cloudera Parcel 或者其他格式。下载 Core Tarball 的好处是体积小,后期需要什么库的时候可以自行在 Streamsets Web 页进行下载。相对于 Core Tarball,Full Tarball 默认帮你下载了很多库,但是文件体积相对较大(>4G),并且可能很多库我们暂时使用不到。

streamsets

或者你可以直接使用这个链接进行下载:archives.streamsets.com/datacollect…

解压启动

Streamsets Core Tarball 下载好后,直接解压就可以使用,非常方便。

 tar xvzf streamsets-datacollector-core-3.7.1.tgz
 cd streamsets-datacollector-3.7.1/bin/
 ./streamsets dc
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
Configuration of maximum open file limit is too low: 1024 (expected at least 32768). Please consult https://goo.gl/6dmjXd

如果在运行的时候遇到上面的报错,修改操作系统的 open files 限制数量即可。

#vi /etc/security/limits.conf

添加两行内容:

  • soft nofile 65536
  • hard nofile 65536

运行 'ulimit -n' 既可以看到 open files 设置值已生效。

Web 页

Streamsets 拥有一个 Web 页,默认端口是 18630。浏览器中输入 ip:18630 即可进入 streamsets 的页面,默认用户名、密码都是 admin。

streamsets

Pipeline

准备工作

因为需要将 mysql 的数据实时同步到 hbase 中,但是下载的 Core Tarball 中没有 MySQL Binary Log 以及 hbase 两个 stage library,所以在 create new pipeline 之前需要先安装它们。

安装 MySQL Binary Log 库

streamsets

安装 Hbase 库,这里注意一下,hbase 库位于 CDH 中,所以选择一个 CDH 版本进行安装

streamsets

安装好后在 Installed Stage Libraries 中就能看到已经安装了 MySQL Binary Log 和 Hbase

streamsets

创建 Pipeline

MySQL Binary Log

创建一个 MySQL Binary Log

streamsets

设置 mysql 的连接参数(Hostname, Port 以及 Server ID),这里的 Server ID 与 mysql 配置文件(一般是 /etc/my.cnf)中的 server-id 保持一致

streamsets

设置 mysql 的用户名、密码

streamsets

其他设置:我们在 Include Tables 栏设置了两张表(表与表之间用逗号隔开),意思是监控这两张表的数据变化,其他表不关心。

streamsets

Stream Selector

创建一个 Stream Selector,并将刚刚创建的 MySQL Binary Log 指向这个 Stream Selector。 设置过滤条件, 比如说 ${record:value("/Table")=='cartype'} 就是过滤 cartype 表。

可以看到 Stream Selector 有两个出口(1 和 2),后面我们将会看到: 1 输出到 Hbase, 2 数据到 Trash

streamsets

Hbase & Trash

分别创建 Hbase 和 Trash,连接到 Stream Selector 上

streamsets

配置 Hbase

streamsets

Trash 无需进行配置

验证 & 启动

验证

点击右上角的“眼镜”,验证整个流程是否有问题。

这里报错:"java.lang.RuntimeException:Unable to get driver instance for jdbcUrl"。这个报错的原因是缺少 mysql 连接的 jar 包。解决起来也很简单,下载一个 jar 包然后放到 streamsets 指定的目录下。我这边的完整目录是:/opt/streamsets/streamsets-datacollector-3.7.1/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib/mysql-connector-java-5.1.26-bin.jar, mysql-connector-java-5.1.26-bin.jar 就是我下载的 jar 包。

还有一点就是事先要将 Hbase 中相对应的表创建好,不然验证会提示错误。

streamsets

接着在页面上重启 streamsets 即可。

streamsets

重新验证,发现成功了。

streamsets

点击右上角播放标签,启动流程,这样整个流程就已经完成了(数据已经在进行实时同步),查看各个 Stage 既可以看到有多少数据流入,多少数据流出。也可以直接进入 hbase 数据库中查看是否有数据生成。

streamsets

以上就是如何使用 Streamsets 实时同步 mysql 数据到 hbase 中的整个操作流程。大家肯定发现了,整个流程没有编写任何的代码,相对于 binlog + Canal + 消息队列 + JAR 的方案是不是高效一些呢。当然任何方案都会有优缺点,Streamsets 这种方案的更多实际体验还需要更多的观察。