【问题标题】:Configure sink elasticsearch apache-flume配置接收器 elasticsearch apache-flume
【发布时间】:2015-11-16 09:37:21
【问题描述】:

这是我第一次来这里,如果我发的不好,很抱歉,也很抱歉我的英语不好。

我正在尝试配置 Apache Flume 和 Elasticsearch 接收器。一切正常,似乎运行良好,但是启动代理时有2个警告;以下是:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies

我的代理配置:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

它启动了netcat,一切都很好,但是我害怕这些警告,我不明白。

【问题讨论】:

  • 您确定给定的配置运行正常吗?第一个日志跟踪不是警告而是一个错误告诉你ElasticSearchSink有问题,很可能与一些依赖问题有关(有一个方法没有找到)。
  • 我没有注意到警告跟踪给出的特定消息,但它证实了我的诊断:Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies

标签: elasticsearch flume data-ingestion


【解决方案1】:

我找到了一个原因,Apache Flume 1.6.0 和 Elasticsearch 2.0 似乎无法正常通信。

我从第三个人那里找到了一个很好的水槽,并进行了改装。

Here is the link

这是我的最终配置,

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink
a1.sinks.k1.hostNames = 127.0.0.1:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

它对我有用。

感谢您的回答。

附:是的,我不得不移动图书馆。

【讨论】:

    【解决方案2】:

    查看日志,有一些缺少依赖的问题。

    如果您查看 ElasticSearchSink 文档,您会看到以下内容:

    你的环境需要的 elasticsearch 和 lucene-core jar 必须放在 Apache Flume 安装的 lib 目录中。 Elasticsearch 要求客户端 JAR 的主要版本与服务器的主要版本匹配,并且两者都运行相同的 JVM 次要版本。如果这不正确,将出现 SerializationExceptions。要选择所需的版本,首先确定elasticsearch的版本和目标集群正在运行的JVM版本。然后选择与主版本匹配的 elasticsearch 客户端库。 0.19.x 客户端可以与 0.19.x 集群通信; 0.20.x 可以与 0.20.x 对话,0.90.x 可以与 0.90.x 对话。一旦确定了 elasticsearch 版本,然后读取 pom.xml 文件以确定要使用的正确 lucene-core JAR 版本。运行 ElasticSearchSink 的 Flume 代理也应该与目标集群运行的 JVM 匹配到次要版本。

    很可能您没有放置所需的 Java jar,或者版本不合适。

    【讨论】:

      【解决方案3】:

      仅在 flume/lib 目录中添加了以下 2 个 JAR 并且它有效,不必添加所有其他 Lucene JAR:

      elasticsearch-1.7.1.jar

      lucene-core-4.10.4.jar

      启动水槽的命令:

      bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console
      

      确保将以下内容添加到 flume-env.sh

      export JAVA_HOME=/usr/java/default
      
      export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
      
      FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib"
      

      Flume 聚合器配置以在 ES 中加载数据:flume-aggregator.conf

      agent2.sources = source1
      agent2.sinks = sink1
      agent2.channels = channel1
      
      ################################################
      # Describe Source
      ################################################
      
      # Source Avro
      agent2.sources.source1.type = avro
      agent2.sources.source1.bind = 0.0.0.0 
      agent2.sources.source1.port = 9997
      
      ################################################
      # Describe Interceptors
      ################################################
      # an example of nginx access log regex match
      # agent2.sources.source1.interceptors = interceptor1
      # agent2.sources.source1.interceptors.interceptor1.type = regex_extractor
      # 
      # agent2.sources.source1.interceptors.interceptor1.regex = "^(\\S+) \\[(.*?)\\] \"(.*?)\" (\\S+) (\\S+)( \"(.*?)\" \"(.*?)\")?"
      # 
      # # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+)
      # # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13
      # 
      # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8
      # agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip
      # agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime
      # agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method
      # agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request
      # agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response
      # agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status
      # agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes
      # agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime
      #  
      
      ################################################
      # Describe Sink
      ################################################
      
      # Sink ElasticSearch
      # Elasticsearch lib ---> flume/lib
      # elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data.
      agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
      agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300
      agent2.sinks.sink1.indexName = pdi
      agent2.sinks.sink1.indexType = pdi_metrics
      agent2.sinks.sink1.clusterName = My-ES-CLUSTER
      agent2.sinks.sink1.batchSize = 1000
      agent2.sinks.sink1.ttl = 2
      #this serializer is crucial in order to use kibana
      agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
      
      
      
      ################################################
      # Describe Channel
      ################################################
      
      # Channel Memory
      agent2.channels.channel1.type = memory
      agent2.channels.channel1.capacity = 10000000
      agent2.channels.channel1.transactionCapacity = 1000
      
      ################################################
      # Bind the source and sink to the channel
      ################################################
      
      agent2.sources.source1.channels = channel1
      agent2.sinks.sink1.channel = channel1
      

      【讨论】:

      • 感谢您列出确切的罐子。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-11-15
      • 2019-05-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-08-17
      相关资源
      最近更新 更多