使用Logstash同步Mysql数据到Elasticsearch

Logstash
Logstash是一个开源数据收集引擎,具有近实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

项目用到ES做统计,需要同步Mysql数据到ES中,我们使用Logstash进行同步,Logstash安装需要依赖的包比较多,我为了方便直接使用Docker进行操作。

基本语法

首先要编写同步数据的配置文件,从Mysql同步数据到ElasticSearch中,使用jdbc里连接数据库作为input,ouput为ES。

input {
 stdin { }
    jdbc {
        #注意mysql连接地址一定要用ip,不能使用localhost等
        jdbc_connection_string => "jdbc:mysql://192.168.1.200:3308/test"
        jdbc_user => "test"
        jdbc_password => "test"
        #这个jar包的地址是容器内的地址
        jdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-8.0.11.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "5000"
        statement => "SELECT * FROM base_areas where id > :sql_last_value"
        tracking_column => id
        use_column_value => true
        last_run_metadata_path => "/usr/share/logstash/pipeline/area"
        schedule => "* * * * *"
    }
 }
  
 output {
     stdout {
        codec => json_lines
    }
    elasticsearch {
        hosts => "192.168.1.200:9200"
        index => "area"
        document_type => "_doc"
        document_id => "%{id}"
    }
}

重点在于 use_column_valuetracking_column 这两个参数,当use_column_value为true时,可以用 :sql_last_value 这个变量来获取tracking_column对应的字段的最新值,默认即第一次启动时为 0 。我的示例中tracking_column对应id,即Logstash都会记录每次查询结果id的最大值,供下一次查询使用。

Logstash将tracking_column的最新值记录到last_run_metadata_path配置的文件,启动时读取该文件

docker run -it -v  $PWD/pipeline/:/usr/share/logstash/pipeline -v $PWD/logstash.yml:/usr/share/logstash/config/logstash.yml -v $PWD/pipelines.yml:/usr/share/logstash/config/pipelines.yml   logstash:6.4.3  bash

先安装jdbc和es的插件

bash-4.2$ ~/bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful

bash-4.2$ ~/bin/logstash-plugin install logstash-output-elasticsearch
Validating logstash-output-elasticsearch
Installing logstash-output-elasticsearch
Installation successful

由于是在容器内部连接服务,所以ES的连接不要使用localhost和127.0.0.1,更改为实际IP地址

bash-4.2$ cat ~/config/logstash.yml 
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.url: http://192.168.1.200:9200

执行脚本

运行同步脚本可手动执行也可以配置调度执行

手动执行

~/bin/logstash -f ***.config

调度执行

调度任务可读取配置文件config/pipelines.yml,默认有一个main/usr/share/logstash/pipeline为pipeline的存放路径,该文件夹内的pipeline都会被执行。
可手动更改该文件,添加idconfig即可。

bash-4.2$ cat ~/config/pipelines.yml 

- pipeline.id: main
  path.config: "/usr/share/logstash/pipeline"

好,配置已完成,运行/usr/local/bin/docker-entrypoint启动调度任务

总结

logstash是ES官方提供的工具,支持扩展多,配置灵活,但是只能同步增量数据,对于源数据删除和修改的情况,logstash无能为力,还有基于定时任务同步无法保证实时,对实时性要求高的场景要考虑其他方案。

参考文档

Configuring Logstash for Docker

Jdbc input plugin

Logstash 最佳实践


使用Logstash同步Mysql数据到Elasticsearch
https://blog.yjll.blog/post/863f4886.html
作者
简斋
发布于
2021年7月7日
许可协议