【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

10,038 阅读11分钟

觉得不错请按下图操作,掘友们,哈哈哈!!! image.png

一:Canal 官网介绍

canal :译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

二:工作原理

使用Canal 首先我们要了解MySql主从复制的工作原理。

2.1 MySQL主从复制原理

image.png

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

2.2 canal 工作原理

image.png

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

三:canal 安装使用

3.1 canal 简介

image.png

去官网下载页面进行下载:github.com/alibaba/can…

我这里下载的是1.1.6的版本:

canal 对应包的下载和装置的教程,都间接看 canal官网github,安装包目前有三兄弟:

  • canal deployer:又称 canal server,是真正监听 mysql 日志的服务端。
  • canal adapter:顾名思义“适配器”,搭配 canal server,目前能实现mysql 数据到 hbase、rdb、es的增量同步,妥妥的 ETL 工具。
  • canal admin:也是为 canal server 服务的,为canal提供整体配置管理、节点运维等面向运维的性能,提供绝对敌对的WebUI操作界面。如果 canal server 要搭建集群环境,必少不了 canal admin 这样业余的运维工具。

3.2 mysql 相关配置

1. MySql 相关配置

  • 安装canal前我们先开启MySql的 binlog,在MySQL配置文件my.cnf设置如下信息:
[mysqld] 
# 打开binlog 
log-bin=mysql-bin 
# 选择ROW(行)模式 
binlog-format=ROW 
# 配置MySQL replaction需要定义,不要和canal的slaveId重复 
server_id=1
  • 改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:

image.png

查看binlog日志文件列表:

image.png

查看当前正在写入的binlog文件:

image.png

MySQL服务器这边配置就完成。

  • 在mysql中给canal单独建一个用户,给全库全表的读,拷贝,复制的权限
-- 使用命令登录:
mysql -u root -p 
-- 创建用户 用户名:canal 密码:Canal@123456 
create user 'canal'@'%' identified by 'Canal@123456';
-- 授权 *.*表示所有库 
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';

3.3 安装canal

3.3.1 canal.deployer

解压canal.deployer-1.1.6.tar.gz,我们可以看到里面有四个文件夹:

image.png

接着打开配置文件conf/example/instance.properties,配置信息如下:

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0

# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=154
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

# table regex .*\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

我这里用的是家里的win10系统,公司的mac没遇到这个问题,所以在bin目录下找到startup.bat启动:

但是启动就报错,要踩坑了???

image.png

后来修改一下启动的脚本startup.bat解决了:

image.png

然后再启动脚本:

image.png

这就启动成功了。

3.3.2 canal adapter

作用:

  1. 对接上游消息,包括kafka、rocketmq、canal-server
  2. 实现mysql数据的增量同步
  3. 实现mysql数据的全量同步
  4. 下游写入支持mysql、es、hbase等

它既然是适配器,那么就得介绍“源头”和“指标”这两个部位数据的对接:

  • 源头
  • (1)canal adapter 能够直连 canal server ,生产 instance的数据;
  • (2)也能够在让 canal server 将数据投递到 MQ,而后 cancal adapter 生产 MQ 中的数据。
  • 指标:对接上游消息,包括kafka、rocketmq、canal-server 实现mysql数据的增量同步 实现mysql数据的全量同步 下游写入支持mysql、es、hbase

目前 adapter 是支持动态配置的,也就是说修改配置文件后无需重启,任务会自动刷新配置!

(1) 修改application.yml 执行 vim conf/application.yml 修改consumerProperties、srcDataSources、canalAdapters的配置

canal.conf:
  mode: tcp # kafka rocketMQ                # canal client的模式: tcp kafka rocketMQ
  flatMessage: true                         # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  syncBatchSize: 1000                       # 每次同步的批数量
  retries: 0                                # 重试次数, -1为无限重试
  timeout:                                  # 同步超时时间, 单位毫秒
  consumerProperties:
    canal.tcp.server.host:                  # 对应单机模式下的canal
    canal.tcp.zookeeper.hosts: 127.0.0.1:2181 # 对应集群模式下的zk地址, 如果配置了canal.tcp.server.host, 则以canal.tcp.server.host为准
    canal.tcp.batch.size: 500               # tcp每次拉取消息的数量
  srcDataSources:                           # 源数据库
    defaultDS:                              # 自定义名称
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true   # jdbc url 
      username: root                                            # jdbc 账号
      password: 123                                          # jdbc 密码
  canalAdapters:                            # 适配器列表
  - instance: example                       # canal 实例名或者 MQ topic 名
    groups:                                 # 分组列表
    - groupId: g1                           # 分组id, 如果是MQ模式将用到该值
      outerAdapters:                        # 分组内适配器列表
      - name: es7                           # es7适配器
        mode: rest                          # transport or rest
        hosts: 127.0.0.1:9200               # es地址
        security.auth: test:123456          # 访问es的认证信息,如没有则不需要填
        cluster.name: my-es                 # 集群名称,transport模式必需配置
...... 

  1. 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
  2. 目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息

(2) conf/es7目录下新增映射配置文件

adapter将会自动加载 conf/es7 下的所有 .yml 结尾的配置文件

新增表映射的配置文件,如 sys_user.yml 内容如下:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: sys_goods
  _id: id
  upsert: true
  sql: "select id, goodsname, price from sys_goods"
  etlCondition: "where update_time>={}"
  commitBatch: 3000
  • dataSourceKey 配置 application.yml 里 srcDataSources 的值
  • destination 配置 canal.deployer 的 Instance 名
  • groupId 配置 application.yml 里 canalAdapters.groups 的值
  • _index 配置索引名
  • _id 配置主键对应的字段
  • upsert 是否更新
  • sql 映射sql
  • etlCondition etl 的条件参数,全量同步时可以使用
  • commitBatch 提交批大小

sql映射支持多表关联自由组合, 但是有一定的限制:

  1. 主表不能为子查询语句
  2. 只能使用left outer join即最左表一定要是主表
  3. 关联从表如果是子查询不能有多张表
  4. 主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)
  5. 关联条件只允许主外键的'='操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1
  6. 关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中

Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射

四: 项目实战

4.1 项目使用场景

在现有演出业务中,我们有这样一个场景,管理员在后台管理系统中可以对先有演出项目,场次,售卖时间,票种等信息进行修改,比如原先我有一个演出项目 “周杰伦演唱会”,这时候用户在C端就可以看到名称就是“周杰伦演唱会”,但是我觉的这个项目的名字不够大气,这时候我在后台改了叫 “周杰伦嘉年华演唱会”,在后台更改后首先项目不会立即生效,而是要通过一个发布的过程C端才能看到更改后的。 那么由于C端查询量巨,特别是在这种大型演唱会的时候,面对这么大的流量我们怎么去设计呢??? 所以这就是我们使用的场景。

4.2 设计对比

  • 流程上涉及的模型直接添加Redis 缓存,在业务侧组装查询逻辑
    • 这是我们原先的使用方案,但是对Redis 依赖很大,而且在高峰期的时候出现过缓存击穿的问题
    • 由于查询逻辑复杂,业务侧能实现查询业务的组装,但是在查询高峰期内存消耗极高,效率也不是很高。
  • 数据库数据更改后在业务层再去更新ES,ES提供对外的查询能力
    • 这就是上边方案的变种,ES有天然的查询优势,并且能应对复杂的查询条件,所以在上边很多业务组装的复杂查询此时都可以用ES来实现。但是数据同步ES的地方就比较分散,每个更改表的地方都要考虑,如果同步的表比较多就很难维护了。
  • 使用Canal 监听数据库binary log,同步到Kafka,专门的服务消费kafka消息,同步ES
    • 第三种方案,需要进入中间件,canal和kafka,但是这个方案只会在服务搭建的时候比较复杂一些,属于一劳永逸那种,canal 我们监听数据库 binary log ,然后将 binary log 同步到kafka中,新增消费服务读取kafka 中 binary log,处理ES逻辑。这样业务方可以提各种各样的查询需求,我们只需要在查询服务中利用ES进行组装即可。

4.3 应用:

在我们的项目中,数据流向包含:腾讯云订阅服务 -> canal adapter -> es 。

只使用canal的客户端功能也就是canal adapter,因为我们我们数据库是使用的腾讯云Mql,腾讯云提供了数据订阅功能。

image.png

image.png

image.png

在这里说明下,实际上腾讯云,阿里云都提供了这样的服务,他们对数据库指定的表监听binglog,然后写入到kakaka中,比如上边的截图。

4.4 问题以及解决方案

问题1:监听多张表怎么保证数据消息不乱?

image.png

就像这个截图,我们可以按照表名进行分区,也就是说,同一张表数据的变更只会在一个partion上,这样表变更的消息在一个partion 上就变得有序了。

问题2:如果数据库中数据和ES中的数据对不上怎么处理?

目前我们是提供一个job,每十分钟跑一次去对比数据库中有效数据的数量和ES中的数据数量,但是Job很慢,可以考虑,从数据库捞数据,分批跑(shard的方式,先取比如订单号是1的,后续一直累加到9),如果有多台机器就可以分布式跑,每台下放shardIndex ,数据库sql就捞 orderNum%10 = shardIndex的数据,这样也不会跑重,处理的时候再用多线程。

问题3: 怎么提高处理速度以及效率 在摸索中我们采用了一种方法:就是对消息批量处理,我们从消息中心拿消息的时候批量去拿,举个例子吧,goods 表对应的消息现在有1000条,orderId =100 的数据在占了100个,分布在不同的offset中,如果我每次都是拿一条消息去消费一次这样重复的逻辑我要跑100次,但是们可以批量拿200条消息,然后再对消息中的OrderId进行分组,得到最终要执行的OrderId这样是不是就可以少跑很多次。拿到要执行的OrderId后我们就可以找到对应的Sql得到最后的Select结果(这个地方有个点,我不关心你是什么操作【DML】,我只要操作的数据ID,然后执行我的Select sql 去查询最后的结果,然后再去同步到ES中),处理的过程中我们也可以使用多线程来加快处理速度,这都是一些优化的点。

干货】常见库存设计方案-各种方案对比总有一个适合你

JYM来一篇消息队列-Kafka的干货吧!!!

设计模式:

JYM 设计模式系列- 单例模式,适配器模式,让你的代码更优雅!!!

JYM 设计模式系列- 责任链模式,装饰模式,让你的代码更优雅!!!

JYM 设计模式系列- 策略模式,模板方法模式,让你的代码更优雅!!!

JYM 设计模式系列-工厂模式,让你的代码更优雅!!!

Spring相关:

Spring源码解析-老生常谈Bean ⽣命周期 ,但这个你值得看!!!

Spring 源码解析-JYM你值得拥有,从源码角度看bean的循环依赖!!!

Spring源码解析-Spring 事务

本文正在参加「金石计划」