使用canal+kafka监听MySQL binlog小实践

5,126 阅读6分钟

前言

最近,想对MySQL有进一步的认识,看如何保证缓存与数据库一致性,在负责业务上也需要这方面的优化,有些文章提到使用监听MySQL binlog实现,想试下,本文纯属好奇心驱使。

  • 所用工具:MySQL + Canal + Kafka

1、kafka:kafka.apache.org/quickstart(… 2、Canal:github.com/alibaba/can… 3、MySQL版本如下:

+-------------------------+-----------------------+
| Variable_name           | Value                 |
+-------------------------+-----------------------+
| innodb_version          | 8.0.12                |
| protocol_version        | 10                    |
| slave_type_conversions  |                       |
| tls_version             | TLSv1,TLSv1.1,TLSv1.2 |
| version                 | 8.0.12                |
| version_comment         | Homebrew              |
| version_compile_machine | x86_64                |
| version_compile_os      | osx10.14              |
| version_compile_zlib    | 1.2.11                |
+-------------------------+-----------------------+

MySQL binlog简介

  • binlog是MySQL server层维护的一种二进制日志,与innodb等存储引擎中的redo/undo log是完全不同的日志;主要是用来记录对MySQL数据更新或潜在发生更新的SQL语句,并以“事务”的形式保存在磁盘中。 那么,binlog日志的作用如下:

主从复制 数据恢复 增量备份

  • 查看binlog相关配置: show variables like '%log_bin%';
+---------------------------------+-----------------------------------+
| Variable_name                   | Value                             |
+---------------------------------+-----------------------------------+
| log_bin                         | ON                                |
| log_bin_basename                | /usr/local/var/mysql/binlog       |
| log_bin_index                   | /usr/local/var/mysql/binlog.index |
| log_bin_trust_function_creators | OFF                               |
| log_bin_use_v1_row_events       | OFF                               |
| sql_log_bin                     | ON                                |
+---------------------------------+-----------------------------------+
  • 查看binlog目录:show binary logs;
+---------------+-----------+
| Log_name      | File_size |
+---------------+-----------+
| binlog.000036 |       155 |
| binlog.000037 |      1066 |
| binlog.000038 |      3075 |
+---------------+-----------+
  • 查看binlog的状态:show master status;可查看当前二进制日志文件的状态信息,显示正在写入的二进制文件,以及当前的position。
| binlog.000038 |     3075 |              |                  |                 |
  • 那么,再来查看binlog.000038日志内容,./mysqlbinlog /usr/local/var/mysql/binlog.000038
  • ROW级别下,SQL语句需要解码,需要加解码选项,./mysqlbinlog --base64-output=decode-rows -v /usr/local/var/mysql/binlog.000038
# at 3311
#200410 17:29:47 server id 1  end_log_pos 3386 CRC32 0xac866698 	Write_rows: table id 65 flags: STMT_END_F
### INSERT INTO `zacblog`.`t_zb_article`
### SET
###   @1=5
###   @2=22121
###   @3='dada'
###   @4='dadwad'
###   @5=0
###   @6='2020-04-10 17:29:31'
###   @7=1586510981
  • 当然,binlog的格式也是可以设定的,分别有ROWSTATEMENTMIXED选项。

缓存与数据库一致性

  • 首先,根据CAP理论,一个系统同时满足C、A、P是不可能的,放弃C也不是说放弃一致性,而是放弃强一致性,追求最终一致性
  • 此前,项目也遇到过,缓存与数据库如何保持一致性的问题,网上形形色色的答案,但我看到有些大神往往引用Cache-Aside pattern这篇文章,文章中讲的模式就是先更新数据库,再删除缓存

The order of the steps is important. Update the data store before removing the item from the cache. If you remove the cached item first, there is a small window of time when a client might fetch the item before the data store is updated. That will result in a cache miss (because the item was removed from the cache), causing the earlier version of the item to be fetched from the data store and added back into the cache. The result will be stale cache data. 译文:如果先删除缓存,会有短暂的时间窗口,客户端访问数据库的旧值,并且导致该key下的请求全部打到数据库,然后旧值也会重新保存到缓存。

enter image description here

  • 那么,更新完数据库,删除缓存失败(概率不能说没有)时,还是会有问题。。。

1、可以使用MQ重试机制,当remove抛出异常,我们可以利用MQ异步重试删除。 2、利用监控捕捉重试异常。

  • 写到这儿,我都快觉得我跑题了,确实这部分网上各抒己见,上面纯属个人看法。下面就是满足个人好奇心了,有文章说使用MQ监听MySQL binlog的方式异步删除缓存,删除失败继续放入MQ重试。

关于一致性的新的思路(2022.03.01)

Canal简介

  • 为了监听MySQL binlog我搜到美团DB数据同步到数据仓库的架构与实践 文章中写到使用Canal实现binlogKafka的连接。
  • Canal译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
  • 基于日志增量订阅和消费的业务包括

1、数据库镜像 2、数据库实时备份 3、索引构建和实时维护(拆分异构索引、倒排索引等) 4、业务 cache 刷新 5、带业务逻辑的增量数据处理

  • 然后,我们就要开始根据QuickStart的命令去启动了,大部分都是根据官网操作的,只是遇到了几个坑。
  • 具体命令,我就不再赘述了,具体顺序为

1、先启动kafka 2、Canal instance.properties和canal.properties,包含db、用户信息、Kafka集群及topic信息 3、根据Canal配置信息,在Kafka创建相应的topic和消费者

踩坑记录

  • canal.properties关键配置
...
canal.serverMode = kafka
...
canal.mq.servers = localhost:9092,localhost:9093,localhost:9094
  • instance.properties关键配置
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
  • caching_sha2_password Auth failed
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83) ~[canal.parse.driver-1.1.4.jar:na]
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89) ~[canal.parse-1.1.4.jar:na]
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86) ~[canal.parse-1.1.4.jar:na]
	at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183) ~[canal.parse-1.1.4.jar:na]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: java.io.IOException: caching_sha2_password Auth failed
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257) ~[canal.parse.driver-1.1.4.jar:na]
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80) ~[canal.parse.driver-1.1.4.jar:na]
	... 4 common frames omitted
  • mysql -hlocalhost -P3306 -ucanal -p123 -Dzacblog登录数据库,重启canal即可。

最后效果

  • 根据对数据库表,进行修改,Canal会通过Kafka发出消息,在Consumer接收到的消息格式如下。
  • INSERT语句,data为关键字段
{
    "data":[
        {
            "article_id":"3",
            "author_id":"1121212",
            "article_title":"dadad",
            "article_content":"dadad",
            "status":"0",
            "create_time":"2020-04-10 16:30:53",
            "update_time":"2020-04-10 16:31:00"
        }
    ],
    "database":"zacblog",
    "es":1586507462000,
    "id":1,
    "isDdl":false,
    "mysqlType":{
        "article_id":"bigint(20)",
        "author_id":"bigint(20)",
        "article_title":"varchar(50)",
        "article_content":"text",
        "status":"tinyint(3)",
        "create_time":"datetime",
        "update_time":"timestamp"
    },
    "old":null,
    "pkNames":[
        "article_id"
    ],
    "sql":"",
    "sqlType":{
        "article_id":-5,
        "author_id":-5,
        "article_title":12,
        "article_content":2005,
        "status":-6,
        "create_time":93,
        "update_time":93
    },
    "table":"t_zb_article",
    "ts":1586507463069,
    "type":"INSERT"
}
  • UPDATE语句,更新字段用old的list来表示
{
	...
    "old":[
        {
            "article_title":"dadad",
            "update_time":"2020-04-10 16:31:00"
        }
    ],
	...
    "type":"UPDATE"
}
  • DELETE语句,data为关键字段
{
	...
    "data":[
        {
            "article_id":"5",
            "author_id":"22121",
            "article_title":"dada",
            "article_content":"dadwad",
            "status":"0",
            "create_time":"2020-04-10 17:29:31",
            "update_time":"2020-04-10 17:29:41",
            "ext":null
        }
    ],
    "database":"zacblog",
    "es":1586517722000,
	...
    "type":"DELETE"
}
  • ALTER TABLE,直接是语句
{
    "data":null,
    "database":"zacblog",
    "es":1586511807000,
    "id":6,
    "isDdl":true,
    "mysqlType":null,
    "old":null,
    "pkNames":null,
    "sql":"ALTER TABLE `zacblog`.`t_zb_article` 
ADD COLUMN `ext` varchar(255) NULL AFTER `update_time`",
    "sqlType":null,
    "table":"t_zb_article",
    "ts":1586511808016,
    "type":"ALTER"
}

小结

  • 本文从MySQL binlog出发,途径缓存-数据库一致性的讨论,最后为了满足好奇心,简单动手监听了下binlog改动的Kafka消息。
  • 其实,在实际开发当中,我们也可以参照这个消息格式,对数据库的增删改,发出对应的业务消息,这个消息格式,我觉得还是有借鉴意义的。

参考文章