Canal实现Mysql至Elasticsearch实时同步

canal 1.1.1版本之后, 内置增加客户端数据同步功能。我这里使用canal.adapter-1.1.5.tar.gz,该版本支持RabbitMQ,可从队列中取数据。

关于RabbitMQ配置和对接可以看我之前写的一篇Canal推送Mysql增量数据至RabbitMQ

对接RabbitMQ

编辑adapter/conf/application.yml,配置rabbitMQ信息


canal.conf:
  mode: rabbitMQ #tcp kafka rocketMQ rabbitMQ
  consumerProperties:
    # rabbitMQ consumer
    rabbitmq.host: 127.0.0.1
    rabbitmq.virtual.host: /
    rabbitmq.username: root
    rabbitmq.password: root
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://remote:3306/test?useUnicode=true
      username: root
      password: root

Elasticsearch配置

编辑adapter/conf/application.yml,配置canalAdapters同步es6,队列使用canal.es

canalAdapters:
  - instance: canal.es # canal instance Name or mq topic name
    groups:
      - groupId: g1
        outerAdapters:
          - name: es6
            hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
            properties:
              mode: rest # or rest
              # security.auth: test:123456 #  only used for rest mode
              cluster.name: elasticsearch

canal会扫描adapter/conf/es6/下的所有配置文件,其中信息要和adapter/conf/application.yml匹配

cat ./adapter/conf/es6/scan_log.yml


dataSourceKey: defaultDS
destination: canal.es
groupId: g1
esMapping:
  _index: scan_log
  _type: _doc
  _id: id
  sql: "select id, code, casecode from scan_log"
  commitBatch: 3000

建立ElasticSearchmapping

新版ES提倡使用_doc作为_type,我们这里使用Mysql表名作为ES的index,建立mapping

PUT /scan_log 
{
    "mappings":{
        "_doc":{
            "properties":{
                "code":{
                    "type":"long"
                },
                "casecode" : {
                    "type" : "text",
                    "fields" : {
                    "keyword" : {
                        "type" : "keyword",
                        "ignore_above" : 256
                      }
                    }
                }
            }
        }
    }
}

我们也可以通过Logstash帮我们自动创建mapping,Logstash可以根据json格式创建mapping,关于Logstash可以看我之前写的文章使用Logstash同步Mysql数据到Elasticsearch

使用generator方式生成json

cat logstash-sample.conf 

input {
 
  generator {
    count => 1
    message => '
    {
        "id": 628247969266810880,
        "code": 2199023788471,
        "casecode": "999STZPMDDV"
    }'
    codec => json
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "scan_log"
    document_type => "_doc"
    document_id => "%{id}"
  }
}

mapping创建完成后,我们启动项目,日志中发现这个错误

Load canal adapter: es6 failed

出现问题的不只我一个,#3466中有人提出解决方案,使用canal.adapter-1.1.5-SNAPSHOT.tar.gz版本中的client-adapter.es6x-1.1.5-SNAPSHOT-jar-with-dependencies.jar替换现版本中的client-adapter.es6x-1.1.5-jar-with-dependencies.jar,完美解决。

同步全量数据

adapter支持通过REST接口的方式导入全量数据

# 先暂停同步
curl http://127.0.0.1:8081/syncSwitch/canal.es
curl http://127.0.0.1:8081/syncSwitch/canal.es/off -X PUT
# 全量导入
curl http://127.0.0.1:8081/etl/es6/scan_log.yml -X POST
# 恢复同步
curl http://127.0.0.1:8081/syncSwitch/canal.es/on -X PUT

路径中的scan_log.ymladapter/conf/es6/下的配置

参考文档

ElasticSearch适配器

ClientAdapter

ElasticSearch为什么在高版本移除映射类型

通过Canal将MySQL数据同步到阿里云Elasticsearch


Canal实现Mysql至Elasticsearch实时同步
https://blog.yjll.blog/post/15c08367.html
作者
简斋
发布于
2021年6月29日
许可协议