【问题标题】:Json Validation in Apache beam using Google Cloud Dataflow使用 Google Cloud Dataflow 在 Apache Beam 中进行 Json 验证
【发布时间】:2020-02-12 10:36:00
【问题描述】:

我正在尝试使用 Apache Beam Java SDK 编写过滤器转换,我需要过滤掉无效的 Json 消息。

如果我为每个元素验证创建一个新的 Gson 对象,则实现工作正常。但是我想避免为每个元素创建 Gson 对象(吞吐量为 1K/秒)并验证 json。

我在开始时创建一个常量 Gson 对象并在静态块中对其进行初始化。这种方法行不通。不知道为什么不能使用同一个对象来解析多个元素,因为我们在处理过程中没有改变对象的状态?

// Gson object declared as constant
private static final Gson gsonObj=new Gson();

// Initialized GSon object during class loading before main method invocation
static {
    gsonObj = new Gson();
}

....

/*
enum to validate json messages.
 */
enum InputValidation implements SerializableFunction<String, Boolean> {
    VALID {
        @Override
        public Boolean apply(String input) {
            try {
                gsonObj.fromJson(input, Object.class);
                return true;
            } catch(com.google.gson.JsonSyntaxException ex) {
                return false;
            }
        }
    }
}

【问题讨论】:

  • 您是否遇到任何错误?
  • 过滤器不起作用,无效的 json 被发送到管道中的下一步
  • 根据文档,可以使用同一个GSON对象进行序列化和反序列化操作。此外,将其声明为私有静态 final 完全没问题,(链接)[sites.google.com/site/gson/gson-user-guide]。您可以从 static{} 初始化中删除 "gsonObj = new Gson();" 吗?此外,在文档中据说通过构造 Gson 实例然后调用 toJson(Object)fromJson(String, Class) 来使用 Gson,(link)[ javadoc.io/static/com.google.code.gson/gson/2.8.6/….
  • 让我知道它是否有效。

标签: json gson google-cloud-dataflow apache-beam


【解决方案1】:

使用 TupleTag 过滤掉记录,而不是 'enum InputValidation implements'。 使用以下代码过滤掉无法解析的 json 行。

Pipeline p = Pipeline.create(options);

TupleTag<String> successParse = new TupleTag<String>();
TupleTag<String> failParse = new TupleTag<String>();

private static final Gson gsonObj=new Gson();

PCollectionTuple = input.apply(ParDo.of(new DoFn<String, String>(){
    @ProcessElement
    public void processElement(ProcessContext c) {
        try {
            gsonObj.fromJson(c.element(), Object.class);
            c.output(successParse,c.element());
        } catch {
            c.output(failParse,c.element());
        }
    }
}).withOutputTags(successParse, TupleTagList.of(failParse)));

以上代码适用于我的案例,是过滤掉记录的最佳解决方案。

这里是官方documentation的例子。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-25
  • 1970-01-01
  • 1970-01-01
  • 2016-11-26
  • 1970-01-01
  • 2018-01-03
相关资源
最近更新 更多