环境:

Ubuntu16.04、jdk1.8u77、elasticsearch5.4.3、logstash5.4.3、kibana5.4.3

ELK的安装就省略了,自行百度(开箱即用)

首先,安装gem

sudo apt-get install gem

把源替换成淘宝的源

gem sources --add https://ruby.taobao.org/ --remove https://rubygems.org/
gem sources -l
sudo gem install bundler
bundle config mirror.https://rubygems.org https://ruby.taobao.org
安装logstash-input-jdbc
cd /../../logstash/bin             (这里是进入logstash的安装目录)
./logstash-plugin install logstash-input-jdbc

实现示例

准备两张mysql表,hotel、hotel_account

CREATE TABLE `hotel` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `hotel_name` varchar(45) DEFAULT NULL,
  `photo_url` varchar(45) DEFAULT NULL,
  `last_modify_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

CREATE TABLE `hotel_account` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `hotel_id` varchar(45) DEFAULT NULL,
  `finance_person` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

insert into hotel(hotel_name,photo_url) values('想取啥名取啥名','www.photo.com/photos/see/1');

insert into hotel(hotel_id,finance_person) values('15627','想取啥名取啥名')

启动elasticsearch,地址:127.0.0.1:9200

然后再准备两个文件,将它们放在/home/xxx/logtest/elk下(路径、文件名字随意),以及mysql的java驱动jar包:mysql-connector-java-5.1.39.jar

这里两个文件分别是jdbc.conf、jdbc.sql

jdbc.conf

input {
    stdin {
    }
    jdbc {
      # mysql jdbc connection string to our backup databse
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
      # the user we wish to excute our statement as
      jdbc_user => "root"
      jdbc_password => "mysql"
      # the path to our downloaded jdbc driver
      jdbc_driver_library => "/home/xxx/lib/mysql-connector-java-5.1.39.jar"
      # the name of the driver class for mysql
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement_filepath => "/home/xxx/logtest/elk/jdbc.sql"
      schedule => "* * * * *"
      type => "jdbc"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
       # port => "9200"
       # protocol => "http"
        index => "mysql01"
        document_id => "%{id}"
       # cluster => "elasticsearch"
    }
    stdout {
        codec => json_lines
    }
}

jdbc.sql

select
    h.id as id,
    h.hotel_name as name,
    h.photo_url as img,
    ha.id as haId,
    ha.finance_person
from
    hotel h LEFT JOIN hotel_account ha on h.id = ha.hotel_id
where
    h.last_modify_time >= '2018-05-21 18:31:40'

启动logstash

./logstash -f /home/xxx/logtest/elk/jdbc.conf

通过kibana查看增量导入是否成功

Logstash通过logstash-input-jdbc实现mysql增量导入ES


相关参考:

https://blog.csdn.net/yeyuma/article/details/50240595#quote

https://www.elastic.co/blog/logstash-jdbc-input-plugin

https://blog.csdn.net/guochunyang/article/details/78911898

https://stackoverflow.com/questions/41480166/unknown-setting-protocol-for-elasticsearch-5-1-1





相关文章: