Canal实现Mysql至Elasticsearch实时同步
canal 1.1.1版本之后, 内置增加客户端数据同步功能。我这里使用canal.adapter-1.1.5.tar.gz,该版本支持RabbitMQ,可从队列中取数据。
关于RabbitMQ配置和对接可以看我之前写的一篇Canal推送Mysql增量数据至RabbitMQ。
对接RabbitMQ
编辑adapter/conf/application.yml
,配置rabbitMQ信息
canal.conf:
mode: rabbitMQ #tcp kafka rocketMQ rabbitMQ
consumerProperties:
# rabbitMQ consumer
rabbitmq.host: 127.0.0.1
rabbitmq.virtual.host: /
rabbitmq.username: root
rabbitmq.password: root
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://remote:3306/test?useUnicode=true
username: root
password: root
Elasticsearch配置
编辑adapter/conf/application.yml
,配置canalAdapters同步es6,队列使用canal.es
canalAdapters:
- instance: canal.es # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: es6
hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
canal会扫描adapter/conf/es6/
下的所有配置文件,其中信息要和adapter/conf/application.yml
匹配
cat ./adapter/conf/es6/scan_log.yml
dataSourceKey: defaultDS
destination: canal.es
groupId: g1
esMapping:
_index: scan_log
_type: _doc
_id: id
sql: "select id, code, casecode from scan_log"
commitBatch: 3000
建立ElasticSearchmapping
新版ES提倡使用_doc
作为_type
,我们这里使用Mysql表名作为ES的index,建立mapping
PUT /scan_log
{
"mappings":{
"_doc":{
"properties":{
"code":{
"type":"long"
},
"casecode" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
我们也可以通过Logstash帮我们自动创建mapping,Logstash可以根据json格式创建mapping,关于Logstash可以看我之前写的文章使用Logstash同步Mysql数据到Elasticsearch
使用generator方式生成json
cat logstash-sample.conf
input {
generator {
count => 1
message => '
{
"id": 628247969266810880,
"code": 2199023788471,
"casecode": "999STZPMDDV"
}'
codec => json
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "scan_log"
document_type => "_doc"
document_id => "%{id}"
}
}
mapping创建完成后,我们启动项目,日志中发现这个错误
Load canal adapter: es6 failed
出现问题的不只我一个,#3466中有人提出解决方案,使用canal.adapter-1.1.5-SNAPSHOT.tar.gz版本中的client-adapter.es6x-1.1.5-SNAPSHOT-jar-with-dependencies.jar
替换现版本中的client-adapter.es6x-1.1.5-jar-with-dependencies.jar
,完美解决。
同步全量数据
adapter支持通过REST接口的方式导入全量数据
# 先暂停同步
curl http://127.0.0.1:8081/syncSwitch/canal.es
curl http://127.0.0.1:8081/syncSwitch/canal.es/off -X PUT
# 全量导入
curl http://127.0.0.1:8081/etl/es6/scan_log.yml -X POST
# 恢复同步
curl http://127.0.0.1:8081/syncSwitch/canal.es/on -X PUT
路径中的scan_log.yml
为adapter/conf/es6/
下的配置