【问题标题】:How to produce a json object message into kafka topic using java(spring)?如何使用java(spring)将json对象消息生成到kafka主题中?
【发布时间】:2018-10-16 07:30:03
【问题描述】:

我想在 kafka 主题中生成一条消息。该消息应具有以下模式:

   {"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}}

我知道这是一个 json 模式,那么如何将那个 json 转换为 String 呢?

我用的是maven项目,那么需要哪些依赖才能使用

 String stringData = JSON.stringify({"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}});

所以我认为最好不要将 Json 转换为字符串并将该按摩发送到 kafka 主题中。

我的代码就是这样,它可以发送一个字符串,但我不知道如何修改我的代码以发送上面的消息。也许你能帮帮我。

 Producer<String, String> producer = null;

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);
    String msg = "welcome";
    producer.send(new ProducerRecord<String, String>("event", msg));

    producer.close();

【问题讨论】:

  • stringify 不显示以选择它。
  • 你的json好像已经是字符串了,不用再字符串化了。
  • 那么你能给我举个例子,我如何在 kafka 中生成该消息吗? producer.send(new ProducerRecord("event-orsted-v1", jsonNode));
  • 是com.fasterxml.jackson.databind.JsonNode,还是来自其他包?

标签: java json maven apache-kafka kafka-producer-api


【解决方案1】:

这解决了我的问题:

 Producer<String, String> producer = null;

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);

    try {
        producer = new KafkaProducer<String, String>(props);
    } catch (Exception e) {
        e.printStackTrace();
    }
    blobStorageChecker = new BlobStorageChecker();
    String folder = blobStorageChecker.getCurrentDateUTC();
    String msg = "{\"targetFileInfo\":{\"path\":\"test/"+folder+"row01-small.txt\"},\"sourceFileInfo\":{\"lastModifiedTime\":1525437960000,\"file\":\"/row01-small-01.txt\",\"filename\":\"/data/row01/row01-small.txt\",\"size\":19728,\"remoteUri\":\"ftp://waws-prod-am2-191.ftp.net/data/orsted-real/inbound/row01\",\"contentEncoding\":\"\",\"contentType\":\"\"}}";
    ProducerRecord<String, String> record = new ProducerRecord<String, String>("event-orsted-v1", null, msg);
    if (producer != null) {
        try {
            Future<RecordMetadata> future = producer.send(record);
            RecordMetadata metadata = future.get();
        } catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
        }
    }
    producer.close();

【讨论】:

    【解决方案2】:

    根据评论,您需要在 kafka 上发送 JsonNode 作为消息。 编写一个自定义的序列化器/反序列化器。

    import java.io.IOException;
    import java.util.Map;
    
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.Serializer;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    public class JsonNodeSerDes implements Serializer<JsonNode>, Deserializer<JsonNode> {
    
        private ObjectMapper mapper = new ObjectMapper();
    
        @Override
        public byte[] serialize(String topic, JsonNode data) {
    
            try {
                return mapper.writeValueAsBytes(data);
            } catch (JsonProcessingException e) {
                return new byte[0];
            }
        }
    
        @Override
        public JsonNode deserialize(String topic, byte[] data) {
    
            try {
                return mapper.readValue(data, JsonNode.class);
            } catch (IOException e) {
                return null;
            }
        }
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public void close() {
        }
    }
    

    我在同一个类中编写了序列化器/反序列化器。您可以将它们分成两个类(一个实现Serializer,另一个实现Deserializer)。

    在创建KafkaProducer 时,您需要为KafkaConsumer 提供"value.serializer" 配置和"value.deserializer" 配置。

    使用的外部依赖项:

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.8.8</version>
    </dependency>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-10-11
      • 2020-04-06
      • 1970-01-01
      • 2014-01-04
      • 1970-01-01
      • 2021-01-09
      • 2019-11-03
      • 1970-01-01
      相关资源
      最近更新 更多