实验背景:
Kafka->Logstash->Elasticsearch实验环境安装完成;
Logstash的启动文件设置有过滤机制;
Logstash启动文件:
- input {
- kafka {
- zk_connect => "10.10.16.252:2181,10.10.16.253:2181,10.10.16.249:2181"
- group_id => "test-consumer-group"
- topic_id => "MyPattern"
- reset_beginning => false # boolean (optional), default: false
- consumer_threads => 5 # number (optional), default: 1
- decorate_events => true # boolean (optional), default: false
- }
- }
- filter{
- grok{
- patterns_dir => "./patterns"
- match => { "message" => ["#字段内容#"]}
- }
- }
- output {
- if "jsonparsefailure" not in [tags]{
- elasticsearch{
- host => "localhost"
- }
- }
- }
发送数据的代码如下所示:
- package com.test.kafka;
- import java.util.Properties;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- public class KafkaProduce {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.setProperty("metadata.broker.list","10.10.16.253:9092,10.10.16.252:9092,10.10.16.249:9092");
- props.setProperty("serializer.class","kafka.serializer.StringEncoder");
- props.put("request.required.acks","-1");
- ProducerConfig config = new ProducerConfig(props);
- Producer<String,String> producer = new Producer<String, String>(config);
- KeyedMessage<String, String> data = new KeyedMessage<String,String>("MyPattern","数据内容");
- producer.send(data);
- producer.close();
- }
- }
发送给Logstash后,使用ES搜索,开始时,报一下错误:
发送第一个红框内的数据,结果报第二个红框内的错误。
寻找错误,经过对比,发现,代码中“数据内容”的字段比配置文件中“数据内容”中的字段少一个,所以报错。
改正,重新进行实验,结果正确。
进行验证:
字段对应,则正确;代码中少一个字段,则报上面的错。
来源:http://blog.csdn.net/wang_zhenwei/article/details/49755599