准备
- 下载与Elasticseach版本一致的Logstash
- 安装logstash-input-jdbc (需要Ruby)
Logstash6.x版本本身不带logstash-input-jdbc插件,需要手动安装:
bin目录下./logstash-plugin.bat install logstash-input-jdbc
配置文件
需要配置:
- 模板文件
Logstash需要从数据库读取数据,然后向ES中创建索引,因此我们需要创建一个映射的模板文件。
创建一个test_template.json,格式参考如下:
{
"mappings" : {
"doc" :{
"properties": {
"courseid": {
"type": "keyword"
},
"teachplan_id": {
"type": "keyword"
},
"media_id": {
"type": "keyword"
},
"media_url": {
"index": false,
"type": "text"
}
}
}
},
"template" : "test_media"
}
-
mysql.conf
配置输入数据源和输出数据源,可参考ES官方的指导网页
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html输入数据源:
配置数据库连接相关的信息,mysql驱动,数据库的sql文件路径。
需要定时采集数据库的数据
ES采用UTC时区,比北京时间早8小时,因此ES读取数据时让最后更新时间加8小时where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)
数据表中会有个时间戳字段,只要更新数据的时间戳大于上次数据采集的时间,就会进行采集。
而上次进行采集的时间戳则记录在last_run_metadata_path =>自己指定一个metadata文件
它里面就一行数据,更新每次采集的时间。
输出数据源:配置一些关于ES的信息,ES的hosts地址,索引库名称,id,type,模板路径,模板名。
这些需要和之前创建模板时的信息相匹配。
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/databaseName?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => pwd
# the path to our downloaded jdbc driver
jdbc_driver_library => "D:/Java/maven/apache-maven-3.5.4/repository/mysql/mysql-connector-java/5.1.44/mysql-connector-java-5.1.44.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#要执行的sql文件
#statement_filepath => "/conf/test.sql"
statement => "select * from test_media where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"
#定时配置
schedule => "* * * * *"
record_last_run => true
last_run_metadata_path => "D:/Java/elastic/elasticsearch-v6.2.1/logstash-6.2.1/config/test_media_metadata"
}
}
output {
elasticsearch {
#ES的ip地址和端口
hosts => "localhost:9200"
#hosts => ["localhost:9200","localhost:9202","localhost:9203"]
#ES索引库名称
index => "test_media"
document_id => "%{teachplan_id}"
document_type => "doc"
template =>"D:/Java/elastic/elasticsearch-v6.2.1/logstash-6.2.1/config/test_template.json"
template_name =>"test_media"
template_overwrite =>"true"
}
stdout {
#日志输出
codec => json_lines
}
}
启动Logstash
logstash.bat -f ../config/mysql.conf
默认一分钟进行一次采集
尝试更新的数据或者随便插入一条,新的数据就会被采集到,可通过head插件查看。