使用Logstash同步Mysql数据到Elasticsearch
Logstash是一个开源数据收集引擎,具有近实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。
项目用到ES做统计,需要同步Mysql数据到ES中,我们使用Logstash进行同步,Logstash安装需要依赖的包比较多,我为了方便直接使用Docker进行操作。
基本语法
首先要编写同步数据的配置文件,从Mysql同步数据到ElasticSearch中,使用jdbc里连接数据库作为input,ouput为ES。
input {
stdin { }
jdbc {
#注意mysql连接地址一定要用ip,不能使用localhost等
jdbc_connection_string => "jdbc:mysql://192.168.1.200:3308/test"
jdbc_user => "test"
jdbc_password => "test"
#这个jar包的地址是容器内的地址
jdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-8.0.11.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "5000"
statement => "SELECT * FROM base_areas where id > :sql_last_value"
tracking_column => id
use_column_value => true
last_run_metadata_path => "/usr/share/logstash/pipeline/area"
schedule => "* * * * *"
}
}
output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => "192.168.1.200:9200"
index => "area"
document_type => "_doc"
document_id => "%{id}"
}
}
重点在于 use_column_value
和 tracking_column
这两个参数,当use_column_value为true时,可以用 :sql_last_value 这个变量来获取tracking_column对应的字段的最新值,默认即第一次启动时为 0 。我的示例中tracking_column对应id,即Logstash都会记录每次查询结果id的最大值,供下一次查询使用。
Logstash将tracking_column的最新值记录到last_run_metadata_path
配置的文件,启动时读取该文件
docker run -it -v $PWD/pipeline/:/usr/share/logstash/pipeline -v $PWD/logstash.yml:/usr/share/logstash/config/logstash.yml -v $PWD/pipelines.yml:/usr/share/logstash/config/pipelines.yml logstash:6.4.3 bash
先安装jdbc和es的插件
bash-4.2$ ~/bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful
bash-4.2$ ~/bin/logstash-plugin install logstash-output-elasticsearch
Validating logstash-output-elasticsearch
Installing logstash-output-elasticsearch
Installation successful
由于是在容器内部连接服务,所以ES的连接不要使用localhost和127.0.0.1,更改为实际IP地址
bash-4.2$ cat ~/config/logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.url: http://192.168.1.200:9200
执行脚本
运行同步脚本可手动执行也可以配置调度执行
手动执行
~/bin/logstash -f ***.config
调度执行
调度任务可读取配置文件config/pipelines.yml
,默认有一个main
,/usr/share/logstash/pipeline
为pipeline的存放路径,该文件夹内的pipeline都会被执行。
可手动更改该文件,添加id
和config
即可。
bash-4.2$ cat ~/config/pipelines.yml
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline"
好,配置已完成,运行/usr/local/bin/docker-entrypoint
启动调度任务
总结
logstash
是ES官方提供的工具,支持扩展多,配置灵活,但是只能同步增量数据,对于源数据删除和修改的情况,logstash无能为力,还有基于定时任务同步无法保证实时,对实时性要求高的场景要考虑其他方案。