【发布时间】:2021-10-06 20:17:33
【问题描述】:
对于一项任务,我们需要处理 gz(ipped) 的大型事务性 xml 文件。未压缩文件中的每一行都可以解释为它自己的 xml 记录。
在处理 100 MiB 之类的小文件时,这可以正常工作。对巨大的输入文件执行 à collect() 的那一刻,它往往会导致 OOM 失败并且 jvm 崩溃。
由于这是一个压缩 (gz) 文件,因此无法并行处理 (AFAIK)。
我在想
- 首先使用 toLocalIterator() 将其拆分为 200K xml 条目的较小数据包,这些数据包分发到其他节点以进行成本处理。显然 toLocalIterator() 也首先执行 collect() (测试)
- 其他选项是使用某种索引值并对其进行过滤(“索引 > 5000”)并设置限制(5000)以模拟 200 万或更多条目的分页。
但是我不知道我应该注意什么来parralize。欢迎任何提示。
- 需要注意的设置以及如何在 Azure Synapse 等中应用它们。
- 如何将读取的 xml 推送到要在其执行程序/任务中处理的节点。
- 可以选择流式传输单个文件吗?
- 欢迎任何提示
目前我的代码是在 scala 中完成的,因为可以轻松访问 java 库以将 xml 转换为 json 并提取我需要的值。
在此先感谢您(也阅读本文)
【问题讨论】:
-
您要对 xml 文件应用什么样的转换?为什么过程中需要
collect数据? -
您当然可以结合使用 java.util.zip.GZIPOutputStream 和 stackoverflow.com/questions/3969713/… 在 Java / Scala 中流式处理 gzipped XML。我对 Spark 和 Azure Synapse 的了解还不够,无法为您提供有关如何将两者结合起来的具体说明,抱歉。 GL!
-
目的是我们可以将整个数据集作为可消耗的东西(json)存储在datalake V2存储中作为parquet文件。根据需要(上下文),我们将有单独的查询。原始 xml “记录”它们的结构部分是固定的,部分是可变的,具体取决于数据的上下文。数据本身被转换为 JSON,因为这更容易访问(一些使用 SAX Java 库的 scala 代码)。我会尝试一下这些建议。目前我们有一些工作可以将整个流重新分区为 32 个分区,但还没有并行性。
标签: scala apache-spark azure-synapse