【问题标题】:How do I transform/fork a Kafka stream and send it over to a specific topic?如何转换/分叉 Kafka 流并将其发送到特定主题?
【发布时间】:2017-06-07 15:59:51
【问题描述】:

我正在尝试使用函数“mapValues”将在原始流“textlines”中获得的字符串值转换为 JSONObject 消息到 newStream。然后将我在 newStream 中获得的任何内容流式传输到一个名为“testoutput”的主题上。但是每次消息实际上通过转换块时,我都会收到一个 NullPointerException 错误,该错误仅指向 kafka 流库。不知道发生了什么:((

附:当我从原始流派生/创建新的 kafka 流时,新流是否属于原始构建器?由于我需要构建器来创建 KafkaStreams 对象并开始流式传输,因此我不确定是否需要对新流执行其他操作,而不仅仅是指定它的去向 .to("topic")

//Testing a Kafka Stream Application
public class testStream {

public static void main(String[] args) throws Exception {
    //Configurations
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-teststream");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxx:xxxx");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    //Building Stream
    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textlines = builder.stream("mytest2"); 

    //Printout The Inputs just for testing purposes
    textlines.foreach(new ForeachAction<String, String>(){
        public void apply(String key, String value){
            for(int y=0; y<value.length(); y++){
                System.out.print(value.charAt(y));
            }
            System.out.print("\n");
        }
    });

    //Transform String Records to JSON Objects
    KStream<String, JSONObject> newStream = textlines.mapValues(new ValueMapper<String,JSONObject>(){
        @Override
        public JSONObject apply(String value) {

            JSONObject jsnobj = new JSONObject();

            //If the first 4 letters of the message is "xxxx" then parse it to a 
            //JSON Object, otherwise create a dummy
            if(value.substring(0, 4).equals("xxxx")){               
                jsnobj.put("Header_Title", value.substring(0, 4));
                jsnobj.put("Data_Part", value.substring(4));
            }else{
                jsnobj.put("Header_Title", "Not xxxx");
                jsnobj.put("Data_Part", "None");
            }
            return jsnobj;
        }
    });

    //Specify target
    newStream.to("testoutput");

    //Off you go
    KafkaStreams streams=new KafkaStreams(builder, props);
    streams.start();

  }
}

【问题讨论】:

  • 您应该包含错误消息 /stacktrace,以便本问题的读者可以帮助您了解发生 NullPointerException 的原因。

标签: java stream apache-kafka kafka-consumer-api apache-kafka-streams


【解决方案1】:

据我所知,您的问题是这一行:

newStream.to("testoutput");

newStream 的类型为 KStream&lt;String, JSONObject&gt;

但是,您的应用程序默认配置为使用String serde 来序列化/反序列化记录键和记录值:

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

这意味着,当您未在 to() 调用中提供显式 serdes 时,Kafka Streams 将尝试将您的 newStream 写为 KStream&lt;String, String&gt;(而不是 KStream&lt;String, JSONObject&gt;)回 Kafka。

您需要做的是在 to() 调用中提供明确的 serdes:

// Sth like this
newStream.to(Serdes.String(), myJsonSerde, "testoutput");

不幸的是,Kafka 还没有包含开箱即用的 JSON serde(这是计划中的)。幸运的是,您可以查看(并复制)Kafka 自己的 Kafka Streams API 演示应用程序中包含的 JSON serde 示例:https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

【讨论】:

    【解决方案2】:

    @Michael:我根据您的建议修复了我的代码。多谢。我的目标是将读取字符串解析为 json 值。

        KStreamBuilder builder = new KStreamBuilder();
    
        KStream<String, String> textLines = builder.stream("input-topic-name");
        // do stuff
    
    
        Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    
        Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
    
    
        KStream<String, JsonNode> newStream = textLines.mapValues(new ValueMapper<String,JsonNode>(){
            @Override
            public JsonNode apply(String value) {
    
                JSONObject jsnObj = new JSONObject();
    
    
                //If the first 4 letters of the message is "xxxx" then parse it to a 
                //JSON Object, otherwise create a dummy
                    jsnObj.put("Header_Title", value.toString());
    
                        ObjectMapper objectMapper = new ObjectMapper();
    
                        JsonNode json_value = null;
                        try {
                            json_value = objectMapper.readTree(jsnObj.toString());
                        } catch (IOException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
    
                return json_value;
            }
    
        });
    
        newStream.to(Serdes.String(), jsonSerde, "json-output");  
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-05-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-30
      • 2018-10-23
      • 1970-01-01
      相关资源
      最近更新 更多