Canal推送Mysql增量数据至RabbitMQ
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
从v1.1.5版本开始,Canal新增对RabbitMQ的支持。
准备工作
canal
需要Mysql端开启二进制日志同步,我使用的阿里云的Mysql,默认已开启binlog dump
。
RabbitMQ我使用Docker部署,注意镜像要使用带management
版本的,方便在界面操作。
docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root rabbitmq:3.8.2-management
RabbitMQ的Exchange
,Queue
,Binding
也要优先配置好,我的需求是canal解析完Mysql的binlog后,将信息分别推送到3个队列中,我这里为了隔离环境建3个队列,分别是同步Elasticsearch的canal.es
,同步ClickHouse的canal.clickhouse
和同步Mysql的canal.mysql
。因为是无差别投放,我只创建了一个交换机canal
。将3个队列canal.es
、canal.clickhouse
、canal.mysql
绑定到交换机canal
上。
配置
编辑deployer/conf/example/instance.properties
# 我这里使用了gtid日志进行同步
canal.instance.gtidon=true
canal.instance.master.address=localhost:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# 这里配置发送消息的主题,由于3队列接收全部数据,所以此处主题我没有用到,使用的默认值
canal.mq.topic=example
编辑deployer/conf/canal.properties
# 此处更改为rabbitmq模式
canal.serverMode = rabbitmq
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange = canal
rabbitmq.username = root
rabbitmq.password = root
# 消息持久化
rabbitmq.deliveryMode = 2
运行 bin/startup.sh
,服务便开始运行了
补充
由于三个队列需要同步的数据库不同,无差别投放会造成不必要的性能浪费,可开启dynamicTopic
功能,根据topic将数据投放至不能的队列。
编辑deployer/conf/example/instance.properties
canal.mq.dynamicTopic=pro\\..*
投递的topic为库名_表名
Exchanges和Queues的绑定关系在RebbitMQ的后台配置即可,模式选择Topic模式。
参考文档
Canal推送Mysql增量数据至RabbitMQ
https://blog.yjll.blog/post/ab6a3b54.html