1 简介
本文说明使用logstash来实现全量和增量将mysql数据导入到es中。每一分钟执行一次。有时效性高的要求可以提高频率。
2.logstash同步mysql数据到elasticsearch
logstash-plugin install logstash-output-elasticsearch logstash-plugin install logstash-input-jdbc
将mysql-connector-java-8.0.11.jar copy到logstash/bin/mysql目录下
2.1 时间维度
1.以一定的时间间隔定时更新mysql数据到es。
2.配置如下
全量构建索引
(1)jdbc.conf
input {
jdbc {
jdbc_default_timezone => "Asia/Shanghai"
# mysql 数据库链接,school-edu为数据库名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/school-edu?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull"
# 用户名和密码
jdbc_user => "xxxxx"
jdbc_password => "xxxxx"
# 驱动
jdbc_driver_library => "E:\linux\ELK\6.8.2\logstash\bin\mysql\mysql-connector-java-8.0.18.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "E:\linux\ELK\6.8.2\logstash\config\mysql\jdbc.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称
index => "teacher"
user => "elastic"
password => "changeme"
document_type => "_doc"
# 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
(2)jdbc.sql
select id, `name`, intro, career, avatar, sort, upd_time from edu_teacher
增量构建索引
(1)jdbc-update.conf
input {
jdbc {
jdbc_default_timezone => "Asia/Shanghai"
# mysql 数据库链接,school-edu为数据库名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/school-edu?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull"
# 用户名和密码
jdbc_user => "xxxxxx"
jdbc_password => "xxxxx"
# 驱动
jdbc_driver_library => "E:\linux\ELK\6.8.2\logstash\bin\mysql\mysql-connector-java-8.0.18.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
last_run_metadata_path => "E:\linux\ELK\6.8.2\logstash\config\mysql\last_value_meta.txt"
# 执行的sql 文件路径+名称
statement_filepath => "E:\linux\ELK\6.8.2\logstash\config\mysql\jdbc-update.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称
index => "teacher"
user => "elastic"
password => "changeme"
document_type => "_doc"
# 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
(2)jdbc-update.sql
select id, `name`, intro, career, avatar, sort, upd_time from edu_teacher where upd_time >= :sql_last_value
(3)last_value_meta.txt
2020-01-29 00:00:00
全量构建
logstash -f E:\linux\ELK\6.8.2\logstash\config\mysql\jdbc.conf
增量构建
logstash -f E:\linux\ELK\6.8.2\logstash\config\mysql\jdbc-update.conf