【问题标题】:Configure Logstash to receive data from socket and insert it into Elasticsearch in java配置 Logstash 以从 socket 接收数据并将其插入 java 中的 Elasticsearch
【发布时间】:2020-04-08 22:42:25
【问题描述】:

我想将数据直接注入 ElasticSearch 以执行一些性能测试。我的第一个想法是为每个文档创建 JSON 文件并将文件导入 ElasticSearch,但时间太长了。我测试了 110K 文件,创建文件只用了 18 分钟,我需要 55M 文件——这是我测试的 500 倍。快速计算:需要 150 小时,或 6.25 天,太长了。 第二个选项是当我将 JSON 放入搅拌中并使用 Logstash 将字符串注入 ElasticSearch 时停止。但是,我得到一个例外:

2019-12-16 13:49:27,240 | Timer-0 | ERROR | search-injector | c.n.es.injector.output.SocketOutput | SocketOutput::output: 
java.net.SocketException: Software caused connection abort: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:134)
at java.io.DataOutputStream.writeBytes(DataOutputStream.java:276)
at com.beniregev.es.injector.output.SocketOutput.output(SocketOutput.java:39)
at com.beniregev.es.injector.policies.UpdateOutputHandlers.run(UpdateOutputHandlers.java:60)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)

ElasticSearchlocalhost 端口 9200 上运行,Logstashlocalhost 端口 9600 上运行。 我的 SocketOutput.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;

public class SocketOutput implements OutputHandler {

    private static final Logger log = LoggerFactory.getLogger(SocketOutput.class);

    public static final String CLI_OPTION = "socket";

    @Value("${socket.hostname}")
    private String hostname;
    @Value("${socket.port}")
    private int port;
    Socket clientSocket;

    public boolean open() {
        try {
            clientSocket = new Socket(hostname, port);
        } catch (IOException ioe) {
            log.error("", ioe);
            return false;
        }
        return true;
    }

    @Override
    public void output(String data) {

        DataOutputStream outToServer = null;
        try {
            outToServer = new DataOutputStream(clientSocket.getOutputStream());
            outToServer.writeBytes( data );
        } catch (IOException ioe) {
            log.error("", ioe);
        }
    }

}

logstash-simple.conf

# Simple Logstash configuration for creating a simple
# Stdin -> Logstash -> Elasticsearch pipeline.
input { stdin { } }

output {
  elasticsearch { 
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }
}

我正在使用以下命令运行 Logstash:bin/logstash.bat -f config/logstash-simple.conf

JSON 字符串已创建且有效,Socket 获取正确的参数值(hostname="localhost" 和 port=9600)。 我将不胜感激任何帮助。

【问题讨论】:

    标签: java elasticsearch spring-data logstash


    【解决方案1】:

    解决方案有几个层次,首先是使用 Kibana 管理器在 Elastic Search 中创建和使用正确的索引,然后正确配置 Logstash,最后是使用 JSON 字符串而不是文件。

    创建索引不是这个问题的问题,所以我不会进入它。

    Logstash.conf(配置文件):

    #################################################
    # Stdin -> Logstash -> Elastic Search pipeline.
    #################################################
    input {
        stdin{}
        tcp{
            host => "localhost"
            port => 9600
            codec => json
        }
    }
    
    filter 
    {
       mutate 
        {
            remove_field => ["host", "@version", "@timestamp", "port", "tags", "level", "logger_name", "themessage", "mensage", "spring.application.name", "level_value", "thread_name"]
        }
    }
    
    output {
        stdout{ codec => rubydebug }
    
         elasticsearch{
            hosts => ["localhost:9200"]
            index => ["my-index"]
         }
    }
    

    注意:index 中输入您创建并将使用的索引的名称。

    使用配置文件运行Logstashbin/logstash.bat -f config/logstash-simple.conf

    在 Java 中将字符串输出到 Logstash:

    package com.beniregev.injector.output;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.net.Socket;
    
    public class SocketOutput implements OutputHandler {
    
        private static final Logger log = LoggerFactory.getLogger(SocketOutput.class);
    
        public static final String CLI_OPTION = "socket";
        private int outputIndex = 0;
    
        @Value("${socket.hostname}")
        private String hostname;
        @Value("${socket.port}")
        private int port;
        Socket clientSocket;
    
        public boolean open() {
            try {
                clientSocket = new Socket(hostname, port);
            } catch (IOException ioe) {
                log.error("", ioe);
                return false;
            }
            return true;
        }
    
        @Override
        public void output(String data) {
            DataOutputStream outToServer = null;
            try {
                outToServer = new DataOutputStream(clientSocket.getOutputStream());
                outToServer.writeBytes( data );
                outputIndex++;
            } catch (IOException ioe) {
                log.error("", ioe);
            }
            System.out.println("Wrote segment " + outputIndex + " to socket");
    
        }
    
    }
    

    hostport 用于Logstash,默认端口=9600。

    这为我解决了问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-02-04
      • 2018-01-10
      • 2021-08-10
      • 2017-03-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多