准备

  1. 下载与Elasticseach版本一致的Logstash
  2. 安装logstash-input-jdbc (需要Ruby)
    Elasticsearch笔记(五)——使用Logstash采集索引
    Logstash6.x版本本身不带logstash-input-jdbc插件,需要手动安装:
    bin目录下
    ./logstash-plugin.bat install logstash-input-jdbc

配置文件

需要配置:

  1. 模板文件
    Logstash需要从数据库读取数据,然后向ES中创建索引,因此我们需要创建一个映射的模板文件。
    创建一个test_template.json,格式参考如下:
{
   "mappings" : {
      "doc" :{
	     "properties": {
	         "courseid": {
	            "type": "keyword"
             },
             "teachplan_id": {
                "type": "keyword"
	         },
	         "media_id": {
                "type": "keyword"
	         },
	         "media_url": {
	            "index": false,
                "type": "text"
             }
         }
      }
   },
   "template" : "test_media"
}

  1. mysql.conf
    配置输入数据源和输出数据源,可参考ES官方的指导网页
    https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

    输入数据源:

    配置数据库连接相关的信息,mysql驱动,数据库的sql文件路径。

    需要定时采集数据库的数据
    ES采用UTC时区,比北京时间早8小时,因此ES读取数据时让最后更新时间加8小时 where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)
    数据表中会有个时间戳字段,只要更新数据的时间戳大于上次数据采集的时间,就会进行采集。
    而上次进行采集的时间戳则记录在
    last_run_metadata_path => 自己指定一个metadata文件
    它里面就一行数据,更新每次采集的时间。
    Elasticsearch笔记(五)——使用Logstash采集索引
    输出数据源:

    配置一些关于ES的信息,ES的hosts地址,索引库名称,id,type,模板路径,模板名。
    这些需要和之前创建模板时的信息相匹配。

input {
  stdin {
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://localhost:3306/databaseName?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC"
  # the user we wish to excute our statement as
  jdbc_user => "root"
  jdbc_password => pwd
  # the path to our downloaded jdbc driver  
  jdbc_driver_library => "D:/Java/maven/apache-maven-3.5.4/repository/mysql/mysql-connector-java/5.1.44/mysql-connector-java-5.1.44.jar"
  # the name of the driver class for mysql
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  #要执行的sql文件
  #statement_filepath => "/conf/test.sql"
  statement => "select * from test_media where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"
  #定时配置
  schedule => "* * * * *"
  record_last_run => true
  last_run_metadata_path => "D:/Java/elastic/elasticsearch-v6.2.1/logstash-6.2.1/config/test_media_metadata"
  }
}


output {
  elasticsearch {
  #ES的ip地址和端口
  hosts => "localhost:9200"
  #hosts => ["localhost:9200","localhost:9202","localhost:9203"]
  #ES索引库名称
  index => "test_media"
  document_id => "%{teachplan_id}"
  document_type => "doc"
  template =>"D:/Java/elastic/elasticsearch-v6.2.1/logstash-6.2.1/config/test_template.json"
  template_name =>"test_media"
  template_overwrite =>"true"
  }
  stdout {
 #日志输出
  codec => json_lines
  }
}


启动Logstash

logstash.bat -f ../config/mysql.conf

默认一分钟进行一次采集
Elasticsearch笔记(五)——使用Logstash采集索引
尝试更新的数据或者随便插入一条,新的数据就会被采集到,可通过head插件查看。
Elasticsearch笔记(五)——使用Logstash采集索引

相关文章:

  • 2021-05-23
  • 2022-12-23
  • 2022-02-09
  • 2022-12-23
  • 2021-07-13
  • 2021-08-19
  • 2021-05-07
  • 2021-09-23
猜你喜欢
  • 2022-01-17
  • 2021-09-18
  • 2021-07-11
  • 2021-08-03
  • 2022-12-23
  • 2021-10-09
  • 2022-01-12
相关资源
相似解决方案