数据库实时转移之Confluent环境搭建(二)

1,564 阅读4分钟

1.前言

上一篇主要介绍了Confluent的基本概念,如果对Confluent不了解的请回看上篇文章。

2.系统架构

为了保证系统可靠性,真实生产环境中都会以集群的方式搭建,以避免单机宕机造成的影响。本文以3台机器,MySQL作为源/目的数据库来进行数据库的转移实验。

整个系统的整体结构如下图所示,因为每个组件都是独立提供服务且都能以集群的方式进行工作,因此本实验把每个服务都分别部署到3台机器来模拟集群环境。本系统主要用了Zookeeper、Kafka、Kafka-Connect、Schema-Registry 4种服务,整体架构如下:

在这里插入图片描述
整个系统工作流程是Source Connector集群从源MySQL DB中不断实时读取变动数据(增/删/改)再经过Schema-Registry序列化后插入到Kafka消息队列中,Sink Connector会不断从Kafka消息队列中获取数据再经过反序列化插入到目的MySQL DB中。

3.Confluent 安装

本文以CentOS操作系统为实验环境。

3.1 安装JDK1.8

下载JDK1.8 64位,解压到安装目录。

设置java环境变量,在~/.bashrc文件中增加以下信息。

 export PATH=<path-to-java>/bin:${PATH}; 
 export CLASSPATH=.:<path-to-java>/lib/dt.jar: <path-to-java>/lib/tools.jar

执行source~/.bashrc 使之生效。

3.2 Confluent 安装

下载Confluent Community版,下载链接为https://www.confluent.io/download/,解压到安装目录,添加环境变量。

export PATH=<path-to-confluent>/bin:${PATH};
export CLASSPATH=<path-to-confluent>/share/java/*:${CLASSPATH}

执行source~/.bashrc 使之生效。

3.3 安装mysql-connector-jdbc.jar

因为实验数据库为MySQL,因此下载MySQL驱动包。

下载jar包,然后放到/java/kafka/目录下面。

4.服务配置

4.1 Zookeeper 配置

编辑/etc/kafka/zookeeper.properites文件,修改以下配置信息。

完整配置信息可以参看链接:

docs.confluent.io/current/zoo…

tickTime=2000 #时间单元,毫秒单位 dataDir=/var/lib/zookeeper/ #数据存储路径 clientPort=2181 #zookeeper 客户端监听端口 initLimit=5 #followers 初始化时间 syncLimit=2 #followers 同步时间 maxClientCnxns=0 #最大client连接数,值为0的时候没有上限 server.< myid >=< hostname >:< leaderport >:< electionport > 集群配置 server.1=< IP1>:2888:3888 #server1 地址,修改为自身地址 server.2=< IP2>:2888:3888 #server2 地址 server.3=< IP3>:2888:3888 #server3 地址 autopurge.snapRetainCount=3 #最近的快照保存数目 autopurge.purgeInterval=24 #快照自动清除时间间隔

修改好配置文件后,需要在每台Zookeeper Server的dataDir目录下创建myid文件来在集群中作为

唯一标示。

例如:

server1:  echo 1 > myid
server2:  echo 2 > myid
server3:  echo 3 > myid

集群中所有配置信息需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。

4.2 Kafka 配置

编辑/etc/kafka/server.properites文件,修改以下配置信息。

完整配置信息可以参看链接:

docs.confluent.io/current/kaf…

#集群地址,修改为自己地址 zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 broker.id=[1,2,3] #节点标识,每台机器依次改为1,2,3….依次递增 log.dirs=/xxx/log/kafka #日志目录,修改为自身log目录 listeners=PLAINTEXT://0.0.0.0:9092 #监听地址 advertised.listeners=PLAINTEXT://0.0.0.0:9092 #发布到zookeeper供客户端连接的地址 num.partitions=3 #分区数设置,分区间数据无序,分区内数据有序。若需要保证有序,此处设置为1 default.replication.factor=3 #消息备份数目默认1不做复制。此处改为2,备份一份。 port=9092 #服务端口,默认9092

集群中除broker.id外所有配置信息需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。

4.3 Schema Registry 配置

编辑/etc/schema-registry/schema-registry.properites 文件,修改以下配置信息。

完整配置信息可以参看链接

docs.confluent.io/current/sch…

listeners=http://0.0.0.0:8081 #监听地址 host.name=172.21.101.186 #主机地址,改为机器本身地址 port #端口号,默认8081 #zookeeper 集群列表,改为机器相应地址
kafkastore.connection.url=xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx:2181

集群中除host.name设置为本身ip外,所有配置信息均需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。

4.4 Kafka-Connect

4.4.1 Connect 配置

编辑/etc/schema-registry/connect-avro-distributed.properties.properites 文件,修改以下配置信息。

完整参数参考链接:

docs.confluent.io/current/con…

group.id=connect-cluster #Connect集群组标示,集群中此处所有配置必须相同。 kafka节点列表,host1:port1,host2:port2,...修改为相应机器地址 bootstrap.servers=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 key.converter=io.confluent.connect.avro.AvroConverter #使用Avro为key转化类 key.converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #key的schema 访问URL value.converter= io.confluent.connect.avro.AvroConverter #使用Avro为value 转化类 value.Converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #value的schema 访问URL config.storage.replication.factor=3 #config信息备份因子,不超过集群数目 offset.storage.replication.factor=3 #偏移量备份因子,不超过集群数目 status.storage.replication.factor=3 #任务状态备份因子,不超过集群数目

集群中除schema.registry.url设置为本身ip外,集群中所有配置信息均需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。

4.4.2 Connect-JDBC

Kafka Connect主要由两部分组成,分别是Source Connector 和Sink Connector。Source Connector 负责从数据库中把信息读入到Kafka,Sink Connector负责把数据从Kafka 中读到数据库。实验使用的是MySql数据库,因此我们主要介绍Confluent JDBC Connector。

完整参数参考链接:

docs.confluent.io/current/con…

4.4.2.1 JDBC Source Connector

以下是各个参数说明,可以根据实际业务场景来配置各个参数。

connection.url #数据库连接地址

connection.user #数据库用户名 connection.password #数据库密码 table.whitelist #查询表白名单 connection.attempts #数据库连接次数 numeric.precision.mapping #是否根据进度判断数据类型 table.blacklist #表黑名单 connection.backoff.ms #数据库尝试连接间隔时间 schema.pattern #查询表时使用的schema mode #更新表时候用的模式,主要有以下4种模式. Bulk:每次都全部查询。 timestamp:根据时间戳字段是否变化来检测数据是否增加或更新。 incrementing :用自增长字段来检测数据是否有增加,不能检测数据变化和删除。 timestamp+incrementing :根据时间戳和自增长字段来检查新增更新数据,根据自增长字段来标示唯一的流数据。 incrementing.column.name #用来判断是否有新增数据的自增长字段名称,该字段值不允许为空值。 timestamp.column.name #用来判断数据是否有新增和更新的时间戳字段,该字段值不允许为空值 validate.non.null #设置是否检测数据库中自增长和时间戳字段不允许为空值,如果检查失败connector就停止启动。 query #设置数据查询语句,如果设置了就不进行全表轮询,而是只是用此sql语句去提取数据 query.condition #设置自定义查询条件,会拼接到where语句后面。(非自带,修改代码添加此参数) poll.interval.ms #数据轮询时间间隔,对数据库执行查询语句的时间间隔,此参数对数据库性能有影响 batch.max.rows # connector每次轮询获取数据的最大数,默认100条,和connector获取数据的性能有关 table.poll.interval.ms #检查表是否有增加或删除的时间间隔。 topic.prefix #使用普通查询时候以topic.prefix +表名 作为topic.自定义query语句时候以topic.prefix作为topic,此处写目的库的表名 table.types #设置要查询的表类型,默认为table,还可以设置view、system table。 timestamp.delay.interval.ms #延迟转移时间间隔,可以延迟数据转移

4.4.2.2 JDBC Sink Connector

以下是各个参数说明,根据实际业务场景来配置各个参数。

connection.url #数据库连接URL connection.user #数据库连接用户名 connection.password #数据库密码。 insert.mode # 插入数据模式 insert,upsert,update。本项目使用upsert模式,在没有相应主键数据时候直接插入,否则进行更新操作。 batch.size #每次插入数据的最大数,和数据库性能有关 topics #订阅的主题,也是目的数据库的表名
table.name.format #插入表面格式化,默认为${topic} pk.mode #主键模式。None:不设置主键,kafka:使用kafka坐标作为主键,record_key: 使用record 的key字段作为主键,record_value:使用record 的value 字段作为主键。 pk.fields #主键字段,以逗号分隔。项目中使用目的表的虚拟主键或者唯一约束字段。 fields.whitelist #插入字段白名单,如果为空则所有字段都使用 auto.create #是否自动创建目的表。 auto.evolve #当表结构发生变化时候,是否自动修改目的表 max.retries #失败重试次数 retry.backoff.ms #重试时间间隔

4.4.3 Connector REST API

在集群环境下,Connector参数只能通过RESTful接口进行参数配置,通过RESTful接口可以给任意一个服务器发送配置参数,Connector参数信息会自动转发给集群中的其它机器。使用RESTful接口配置参数后,Connector服务无需重启,即可生效。

下面详细描述Connector RESTful接口的配置参数。

4.3.3.1 RESTful Header

目前 REST接口只支持Json格式参数,因此请求头应该如下设置。

Accept: application/json
Content-Type: application/json 
4.3.3.2 RESTful URL
接口 功能
GET /connectors 获取connectors列表
POST /connectors 创建connector
GET /connectors/(string:name) 获取指定connector 信息
PUT /connectors/(string:name)/config 创建或者修改指定connector 配置
GET /connectors/(string:name)/status 获取指定connector 运行状态信息
POST /connectors/(string:name)/restart 重启指定connector
PUT /connectors/(string:name)/pause 暂停指定connector
PUT /connectors/(string:name)/resume 重启指定connector
DELETE /connectors/(string:name)/ 删除指定connector
GET /connectors/(string:name)/tasks 获取指定connector的tasks列表信息
GET /connectors/(string:name)/tasks/(int:taskid)/status 获取指定task状态
POST /connectors/(string:name)/tasks/(int:taskid)/restart 重启指定task

5. 服务启动

5.1 启动Zookeeper

zookeeper-server-start  <path-to-confluent>/etc/kafka/zookeeper.properties 

5.2 启动 Kafka

kafka-server-start      <path-to-confluent>/etc/kafka/server.properties

5.3 启动Schema-registry-start

schema-registry-start   <path-to-confluent>/etc/schema-registry/schema-registry.properties

5.4 启动Connector

connect-distributed     <path-to-confluent>/etc/schema-registry/connect-avro-distributed.properties

6. 启动Connector任务

使用Postman对Connector的REST API 接口进行访问配置。示例如下:

6.1 Header 设置

Headers Tab页面增加以下参数。

在这里插入图片描述

6.2 配置Source Connector

在这里插入图片描述
发送请求后即可完成任务的配置。

6.3 配置Sink Connector

在这里插入图片描述
配置完毕点击Send按钮,即完成了Source、Sink Connector任务的配置和启动。此时若向Source 数据库添加或更改数据,目的数据库会实时更新过来。

7.服务停止

在集群中的机器中依次关闭以下服务,关闭时候,需要保证关闭顺序的正确性。

7.1 关闭Connect-jdbc

confluent connect stop

执行命令后若提示 connect is [DOWN],则代表connect服务关闭成功。

7.2 关闭Schema-registry

confluent schema-registry stop

执行命令后若提示 schema-registry is [DOWN],则代表schema-registry服务关闭成功。

7.3 关闭Kafka

confluent kafka stop

执行命令后若提示 kafka is [DOWN],则代表upkafka服务关闭成功。

7.4 关闭Zookeeper

confluent zookeeper stop

执行命令后若提示 zookeeper is [DOWN],则代表zookeeper服务关闭成功。

8. 小结

通过实验所有数据都能实时进行转移并没有遗漏。

在破坏性测试中如杀掉服务进程、下线集群中的某台机器等异常测试中,集群都能正常的进行转移工作。

即使把集群中所有机器的服务都停止,当服务重启后,Confluent会把宕机期间的数据转移过来。可以看出Confluent确实如官方宣传所言,是一个高性能,高可靠的系统。

到此,本文也基本结束,希望此教程能帮助所有需要的人。

想要了解更多,关注公众号:七分熟pizza

在这里插入图片描述