Canal推送Mysql增量数据至RabbitMQ

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

Canal

工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

v1.1.5版本开始,Canal新增对RabbitMQ的支持。

准备工作

canal需要Mysql端开启二进制日志同步,我使用的阿里云的Mysql,默认已开启binlog dump

RabbitMQ我使用Docker部署,注意镜像要使用带management版本的,方便在界面操作。

docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root   rabbitmq:3.8.2-management

RabbitMQ的Exchange,Queue,Binding也要优先配置好,我的需求是canal解析完Mysql的binlog后,将信息分别推送到3个队列中,我这里为了隔离环境建3个队列,分别是同步Elasticsearch的canal.es,同步ClickHouse的canal.clickhouse和同步Mysql的canal.mysql。因为是无差别投放,我只创建了一个交换机canal。将3个队列canal.escanal.clickhousecanal.mysql绑定到交换机canal上。

配置

编辑deployer/conf/example/instance.properties

# 我这里使用了gtid日志进行同步
canal.instance.gtidon=true
canal.instance.master.address=localhost:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# 这里配置发送消息的主题,由于3队列接收全部数据,所以此处主题我没有用到,使用的默认值
canal.mq.topic=example

编辑deployer/conf/canal.properties

# 此处更改为rabbitmq模式
canal.serverMode = rabbitmq

rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange = canal
rabbitmq.username = root
rabbitmq.password = root
# 消息持久化
rabbitmq.deliveryMode = 2

运行 bin/startup.sh ,服务便开始运行了

补充

由于三个队列需要同步的数据库不同,无差别投放会造成不必要的性能浪费,可开启dynamicTopic功能,根据topic将数据投放至不能的队列。

编辑deployer/conf/example/instance.properties

canal.mq.dynamicTopic=pro\\..*

投递的topic为库名_表名

Exchanges和Queues的绑定关系在RebbitMQ的后台配置即可,模式选择Topic模式。

参考文档

alibaba/canal

Canal Kafka RocketMQ QuickStart


Canal推送Mysql增量数据至RabbitMQ
https://blog.yjll.blog/post/ab6a3b54.html
作者
简斋
发布于
2021年6月29日
许可协议