【问题标题】:i have a kafka pipeline (json problem update) for kafka connect我有一个用于 kafka 连接的 kafka 管道(json 问题更新)
【发布时间】:2021-02-18 08:57:36
【问题描述】:

所以我根据一些建议进行了更新。但是流应用程序会在一段时间后终止。没有表演。 ide 显示的以下代码中没有错误。最后,我将数据发送到主题,因为键等于字符串,值作为 json 对象。还是不行。

我猜它是一条线或其他东西,但不确定我是否正确。请。还附上了下面的错误截图。

 Serializer<JsonNode> jsonSerializer = new JsonSerializer();
            Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
            Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
    
            JSONObject jsnObj = new JSONObject();
    
           ......(word count manipulationover part over here)
    
            KTable<Windowed<String>, Long> Ttable = TgroupedStream
                    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
                    .count();
    
            Ttable
                    .toStream()
                    .selectKey((key, word) -> key.key())
                    .map((key, value) -> {
                                JSONParser par = new JSONParser();
                                StringWriter sw = new StringWriter();
    
                                KeyValue<String, JsonNode> kv = null;
                                try {
                                    ObjectMapper objectMapper = new ObjectMapper();
                                    JsonNode jsonNode = objectMapper.readTree("{ \"word\": \"" + key + "\" \",\" count: \"" + value + "\" }");
                                    KeyValue.pair(key.concat("s"), jsonNode);
                                    kv = KeyValue.pair(key.concat("s"), jsonNode);
    
                                } catch (JsonMappingException e) {
                                    e.printStackTrace();
                                } catch (JsonProcessingException e) {
                                    e.printStackTrace();
                                }
                                return kv;
                            }
                    )
                    .to("badliar", Produced.with(Serdes.String(), jsonSerde));
    
          
            KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
            streams.start();
    
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        }

【问题讨论】:

    标签: java json apache-kafka apache-kafka-streams


    【解决方案1】:

    您正在使用包含所需确切数据的键值对。你不需要解析任何东西,只需创建 JsonNode 并返回它。

    final ObjectMapper mapper = new ObjectMapper();
    
    Ttable
            .toStream()
            .selectKey((key, word) -> key.key())
            .map((key, value) -> {
                 ObjectNode rootNode = mapper.createObjectNode();
    
                 rootNode.put("word", key);
                 rootNode.put("count", value);
                            
                 return new KeyValue.pair(key, jsonNode);           
            })
    

    如果您不修改密钥,也可以使用mapValues 代替map

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-02
      • 2020-01-25
      • 1970-01-01
      • 2020-03-10
      • 1970-01-01
      • 2020-09-17
      • 2021-01-30
      • 1970-01-01
      相关资源
      最近更新 更多