环境:
Ubuntu16.04、jdk1.8u77、elasticsearch5.4.3、logstash5.4.3、kibana5.4.3
ELK的安装就省略了,自行百度(开箱即用)
首先,安装gem
sudo apt-get install gem
把源替换成淘宝的源
gem sources --add https://ruby.taobao.org/ --remove https://rubygems.org/
gem sources -l
sudo gem install bundler
bundle config mirror.https://rubygems.org https://ruby.taobao.org安装logstash-input-jdbc
cd /../../logstash/bin (这里是进入logstash的安装目录)
./logstash-plugin install logstash-input-jdbc
实现示例
准备两张mysql表,hotel、hotel_account
CREATE TABLE `hotel` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`hotel_name` varchar(45) DEFAULT NULL,
`photo_url` varchar(45) DEFAULT NULL,
`last_modify_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8
CREATE TABLE `hotel_account` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`hotel_id` varchar(45) DEFAULT NULL,
`finance_person` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8
insert into hotel(hotel_name,photo_url) values('想取啥名取啥名','www.photo.com/photos/see/1');
insert into hotel(hotel_id,finance_person) values('15627','想取啥名取啥名')
启动elasticsearch,地址:127.0.0.1:9200
然后再准备两个文件,将它们放在/home/xxx/logtest/elk下(路径、文件名字随意),以及mysql的java驱动jar包:mysql-connector-java-5.1.39.jar
这里两个文件分别是jdbc.conf、jdbc.sql
jdbc.conf
input {
stdin {
}
jdbc {
# mysql jdbc connection string to our backup databse
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => "mysql"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/home/xxx/lib/mysql-connector-java-5.1.39.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/home/xxx/logtest/elk/jdbc.sql"
schedule => "* * * * *"
type => "jdbc"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "127.0.0.1:9200"
# port => "9200"
# protocol => "http"
index => "mysql01"
document_id => "%{id}"
# cluster => "elasticsearch"
}
stdout {
codec => json_lines
}
}
jdbc.sql
select
h.id as id,
h.hotel_name as name,
h.photo_url as img,
ha.id as haId,
ha.finance_person
from
hotel h LEFT JOIN hotel_account ha on h.id = ha.hotel_id
where
h.last_modify_time >= '2018-05-21 18:31:40'
启动logstash
./logstash -f /home/xxx/logtest/elk/jdbc.conf
通过kibana查看增量导入是否成功
相关参考:
https://blog.csdn.net/yeyuma/article/details/50240595#quote
https://www.elastic.co/blog/logstash-jdbc-input-plugin
https://blog.csdn.net/guochunyang/article/details/78911898
https://stackoverflow.com/questions/41480166/unknown-setting-protocol-for-elasticsearch-5-1-1