【问题标题】:kafka streams for conversion of JSON arrays to json objects用于将 JSON 数组转换为 json 对象的 kafka 流
【发布时间】:2017-12-15 05:16:11
【问题描述】:

我在普通 java 中有一个代码,用于将 JSON 数组转换为 JSON 对象,我需要将这个普通 java 转换为 kafka 流......下面是我的

import java.io.*;
import java.util.*;
import java.lang.*;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class JsonParseTest {
    public static void main(String[] args) {
        try {
            JSONParser parser = new JSONParser();
            JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
             for (Object o : jsonArray) {
//to get the Json object
              JSONObject snap = (JSONObject) o;
              System.out.println(snap);
             }
}
 catch (FileNotFoundException ex) {
            ex.printStackTrace();
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (ParseException ex) {
            ex.printStackTrace();
        } catch (NullPointerException ex) {
            ex.printStackTrace();
        }
    }
}

如果有人帮助我只为逻辑部分编写代码,我可以继续,下面是我的逻辑部分,至少我需要帮助

public class JsonParseTest {
    public static void main(String[] args) {
        try {
            JSONParser parser = new JSONParser();
            JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
             for (Object o : jsonArray) {
//to get the Json object
              JSONObject snap = (JSONObject) o;
              System.out.println(snap);
             }
}

如何在 kafka 流中编写相同的代码?有人可以帮忙吗?

【问题讨论】:

  • 需要实现序列化器和反序列化器,这个Link会帮助你。

标签: json apache-kafka apache-kafka-streams


【解决方案1】:

我会推荐两个选项:

  • 您实现自己的序列化器/反序列化器 (Here is the javadoc)
  • 您可以将流处理为 (String,String) 的流,并使用 flatMap 解析流的每个元素并将其转换为 (String,JSONObject) 的流:

    JSONParser parser = new JSONParser();
    stringStream.flatMap((k,v) -> {
        List<KeyValue<String,JSONOBject>> tmp = new ArrayList<KeyValue<String,JSONOBject>>();
        JSONArray jsonArray = (JSONArray) parser.parse(v);
        for (Object o : jsonArray) {
             JSONObject snap = (JSONObject) o;
             tmp.add(new KeyValue(k, snap));
        }     
        return tmp;
    });
    

这里我根本没有处理必须将 lambda 代码包装到 try/catch 中的异常。

【讨论】:

  • 嘿,你能检查我上面作为答案发布的代码并清除我的疑虑
猜你喜欢
  • 2012-07-21
  • 2020-10-02
  • 1970-01-01
  • 2018-10-11
  • 2018-04-16
  • 2018-09-12
  • 2015-10-30
相关资源
最近更新 更多