实验背景:

Kafka->Logstash->Elasticsearch实验环境安装完成;

Logstash的启动文件设置有过滤机制;

Logstash启动文件:

[html] view plain copy
  1. input {  
  2. kafka {  
  3.   zk_connect => "10.10.16.252:2181,10.10.16.253:2181,10.10.16.249:2181"  
  4.   group_id => "test-consumer-group"  
  5.   topic_id => "MyPattern"  
  6.   reset_beginning => false # boolean (optional), default: false  
  7.   consumer_threads => 5  # number (optional), default: 1  
  8.   decorate_events => true # boolean (optional), default: false  
  9.   }  
  10. }  
  11. filter{  
  12.     
  13.   grok{  
  14.     
  15.      patterns_dir => "./patterns"  
  16.      match => { "message" => ["#字段内容#"]}  
  17.   }  
  18.   
  19. }  
  20.   
  21.   
  22. output {  
  23.      
  24.    if "jsonparsefailure" not in [tags]{  
  25.       
  26.       elasticsearch{  
  27.    
  28.          host => "localhost"  
  29.      }  
  30.    }  
  31. }  

发送数据的代码如下所示:

[java] view plain copy
  1. package com.test.kafka;  
  2.   
  3. import java.util.Properties;  
  4. import kafka.javaapi.producer.Producer;  
  5. import kafka.producer.KeyedMessage;  
  6. import kafka.producer.ProducerConfig;  
  7.   
  8. public class KafkaProduce {  
  9.   
  10.     public static void main(String[] args) {  
  11.   
  12.          Properties props = new Properties();  
  13.          props.setProperty("metadata.broker.list","10.10.16.253:9092,10.10.16.252:9092,10.10.16.249:9092");     
  14.          props.setProperty("serializer.class","kafka.serializer.StringEncoder");  
  15.          props.put("request.required.acks","-1");           
  16.          ProducerConfig config = new ProducerConfig(props);   
  17.            
  18.          Producer<String,String> producer = new Producer<String, String>(config);  
  19.          KeyedMessage<String, String> data = new KeyedMessage<String,String>("MyPattern","数据内容");  
  20.                            
  21.          producer.send(data);  
  22.            
  23.          producer.close();  
  24.     }  
  25. }  



发送给Logstash后,使用ES搜索,开始时,报一下错误:

ELK错误1_Kafka->Logstash->Elasticsearch过程,Elasticsearch报grokparsefailure错误

发送第一个红框内的数据,结果报第二个红框内的错误。

寻找错误,经过对比,发现,代码中“数据内容”的字段比配置文件中“数据内容”中的字段少一个,所以报错。

改正,重新进行实验,结果正确。

进行验证:

字段对应,则正确;代码中少一个字段,则报上面的错。


来源:http://blog.csdn.net/wang_zhenwei/article/details/49755599

相关文章:

  • 2021-06-19
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-11-13
  • 2022-02-06
  • 2021-10-21
  • 2021-05-09
猜你喜欢
  • 2022-12-23
  • 2021-10-22
  • 2021-10-06
  • 2021-04-02
  • 2021-06-05
  • 2021-09-20
相关资源
相似解决方案