【问题标题】:How to extract JSON from a binary protobuf?如何从二进制 protobuf 中提取 JSON?
【发布时间】:2018-03-05 15:22:13
【问题描述】:

将 Apache Spark 2.2.0 结构化流视为:

jsonStream.printSchema()
root
 |-- body: binary (nullable = true)

body 内的数据类型为 Protocol Buffers v2 和嵌套的 JSON。好像

syntax = "proto2";

message Data {
  required string data = 1;
}

message List {
  repeated Data entry = 1;
}

如何提取 Spark 中的数据以“进一步”处理它?

我查看了ScalaPB,但是当我在 Jupyter 中运行我的代码时,无法将“.proto”代码包含在内。我也不知道如何将 DataFrame 转换为流上的 RDD。由于流媒体源,尝试 .rdd 失败。

更新 1:我想出了如何使用 ScalaPB 的控制台工具从 protobuf 规范生成 Scala 文件。我仍然无法将它们作为“类型不匹配”导入。

【问题讨论】:

  • 您能描述一下如何从 protobuf 规范生成 Scala 文件吗?您能描述一下您是如何尝试将它们导入 Spark 的吗?

标签: scala apache-spark protocol-buffers spark-structured-streaming


【解决方案1】:

tl;dr 编写一个用户定义函数 (UDF) 将二进制字段(带有 JSON 的 protobuf)反序列化为 JSON。

将序列化的bodybinary 格式)视为表格列。暂时忘记结构化流(和流数据集)。

然后让我将问题改写为以下内容:

如何将二进制值转换(也称为强制转换)为 [here-your-format]?

有些格式直接是cast-able,这使得将二进制文件转换为字符串很容易,如下所示:

$"body" cast "string"

如果字符串是 JSON 或 unixtime,您可以使用内置的“转换器”,即 functions,如 from_jsonfrom_unixtime

介绍应该会提示您如何进行像您这样的转换。

body 内的数据类型为 Protocol Buffers v2 和嵌套的 JSON。

要处理此类字段(protobuf + json),您必须编写一个 Scala 函数来将“有效负载”解码为 JSON,并使用 udf 创建一个用户定义函数 (UDF):

udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction 将 Java UDF1 实例定义为用户定义函数 (UDF)。调用者必须指定输出数据类型,并且没有自动输入类型强制。默认情况下,返回的 UDF 是确定性的。要将其更改为非确定性,请调用 API UserDefinedFunction.asNondeterministic()

然后使用from_jsonget_json_object等函数。

为了使您的案例更简单,请编写一个单参数函数来执行转换并使用 udf 函数将其包装到 UDF 中。


由于流媒体源,尝试 .rdd 失败。

使用Dataset.foreachforeachPartition

foreach(f: (T) ⇒ Unit): Unit 将函数 f 应用于所有行。

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit 将函数 f 应用于此数据集的每个分区。

【讨论】:

  • 谢谢,这听起来很合理,你有什么例子可以指点我吗?考虑到 Protobuf 需要“映射器”类,我仍然需要导入生成的代码,对吗?还是我错过了什么?
  • 我从未使用过 protobuf,所以在这里我无能为力。然而,这超出了 Spark 的范围,“如果你可以在 Scala 中做到这一点,你应该在 Spark 中使用 udf” 适用于此。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-08-24
  • 1970-01-01
  • 2018-01-26
  • 2013-07-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多