025-大数据ETL工具之StreamSets安装及订阅mysql binlog

2,574 阅读3分钟

这是坚持技术写作计划(含翻译)的第25篇,定个小目标999,每周最少2篇。

本文主要介绍CDH6.2+StreamSets3.9。

StreamSets是一个大数据采集和数据处理工具。可以通过拖拽式的可视化操作,实现数据管道(Pipelines)的设计和调度。其特点有:

  • 拖拽式的可视化界面操作,上手快。
  • 对常见数据处理(数据源、数据操作、数据输出)支持较好。
  • 内置监控,可以对数据流进行观测。

类似的开源产品还有 Apache NiFi , 网上有关于NiFi和StreamSets 的对比 Open Source ETL: Apache NiFi vs Streamsets (网上有中文翻译版版)

国内接触较多的ETL工具,可能是 DataX 、 Kettle 、Sqoop。此处有个简单的对比,数据集成之 kettle、sqoop、datax、streamSets 比较 

安装StreamSets 3.9

下载parcel安装包

从 archives.streamsets.com/index.html 下载3.9的

image.png

并上传到http服务器的www目录下,本文以centos7.6为例

wget -P /var/www/html/streamsets3.9.0/ https://archives.streamsets.com/datacollector/3.9.0/parcel/manifest.json
wget -P /var/www/html/streamsets3.9.0/ https://archives.streamsets.com/datacollector/3.9.0/parcel/STREAMSETS_DATACOLLECTOR-3.9.0-el7.parcel.sha
wget -P /var/www/html/streamsets3.9.0/ https://archives.streamsets.com/datacollector/3.9.0/parcel/STREAMSETS_DATACOLLECTOR-3.9.0-el7.parcel

image.png

配置csd

从 streamsets.com/opensource 下载

image.png

wget -P /opt/cloudera/csd/ https://archives.streamsets.com/datacollector/3.9.0/csd/STREAMSETS-3.9.0.jar
cd /opt/cloudera/csd/
sudo chown cloudera-scm:cloudera-scm STREAMSETS-3.9.0.jar && sudo chmod 644 STREAMSETS-3.9.0.jar
systemctl restart cloudera-scm-server

下载分发Parcel包

image.png

image.png

image.png

image.png

下载并激活,但是,我实际测试时,总大小,4.6G,实际下载后,5.2G,导致sha1sum 校验失败,报
image.png

在cm所在主机, ls -lah /opt/cloudera/parcel-repo  

image.png

把下载的 archives.streamsets.com/datacollect… 复制到 /opt/cloudera/parcel-repo 下

image.png

如果已经不信邪,试过下载,并报hash错误后,直接替换后,这个页面还是提示hash,此时再次点击下载,就会变成分配。
激活后如下所示
image.png

image.png

image.png

创建完毕

streamsets 简单使用

打开streamsets,默认用户名密码 admin/admin

image.png

image.png


官方教程,参考 Basic Tutorial

本文主要讲解订阅mysql binlog进行数据同步

mysql binlog

开启binlog

修改mysql配置文件,my.cnf,在mysqld下增加(注意5.7的不加server-id无法正常启动)

server-id=1
log-bin=mysql-bin
binlog_format=ROW

创建并配置同步账号

GRANT ALL on slave_test.* to 'slave_test'@'%' identified by 'slave_test';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'slave_test'@'%';
FLUSH PRIVILEGES;

安装mysql jdbc驱动

wget -P /opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar

重启streamsets

创建pipeline

image.png

配置mysql binlog解析及处理

image.png

image.png

image.png

image.png

image.png

配置目标端
image.png
image.png

运行

image.png

测试

此处使用mysql自带的压测工具 mysqlslap.exe 进行测试

bin/mysqlslap --user=root --password=xxxxxx --concurrency=50 --number-int-cols=5 --number-char-cols=20 --auto-generate-sql --number-of-queries=100000 --auto-generate-sql-load-type=write --host=192.168.0.123 --port=3306
--user 用户(需要有建库建表权限)
--password 密码
--concurrency 并发数
--number-int-cols 表内有5个数字列
--number-char-cols 表内有20个字符串列
--auto-generate-sql 自动生成脚本
--number-of-queries 总执行次数
--auto-generate-sql-load-type=write 只执行写入操作
--host mysql 主机
--port 端口

下方有监控报表

常见错误

image.png


同步不一致导致的错误,手动从


设置偏移量
image.png

如果报错 Pipeline Status: RUNNING_ERROR: For input string: ""xxxx"   ,把my.cnf改成

server-id=1
log-bin=mysql-bin
binlog_format=ROW
sync_binlog=1
binlog_gtid_simple_recovery=ON
log_slave_updates=ON
gtid_mode=ON
enforce_gtid_consistency=ON

参考资料