【问题标题】:Spark/Synapse Optimal handling huge gzipped xml file (+600mb compressed size)Spark/Synapse 优化处理巨大的 gzipped xml 文件(+600mb 压缩大小)
【发布时间】:2021-10-06 20:17:33
【问题描述】:

对于一项任务,我们需要处理 gz(ipped) 的大型事务性 xml 文件。未压缩文件中的每一行都可以解释为它自己的 xml 记录。

在处理 100 MiB 之类的小文件时,这可以正常工作。对巨大的输入文件执行 à collect() 的那一刻,它往往会导致 OOM 失败并且 jvm 崩溃。

由于这是一个压缩 (gz) 文件,因此无法并行处理 (AFAIK)。

我在想

  • 首先使用 toLocalIterator() 将其拆分为 200K xml 条目的较小数据包,这些数据包分发到其他节点以进行成本处理。显然 toLocalIterator() 也首先执行 collect() (测试)
  • 其他选项是使用某种索引值并对其进行过滤(“索引 > 5000”)并设置限制(5000)以模拟 200 万或更多条目的分页。

但是我不知道我应该注意什么来parralize。欢迎任何提示。

  • 需要注意的设置以及如何在 Azure Synapse 等中应用它们。
  • 如何将读取的 xml 推送到要在其执行程序/任务中处理的节点。
  • 可以选择流式传输单个文件吗?
  • 欢迎任何提示

目前我的代码是在 scala 中完成的,因为可以轻松访问 java 库以将 xml 转换为 json 并提取我需要的值。

在此先感谢您(也阅读本文)

【问题讨论】:

  • 您要对 xml 文件应用什么样的转换?为什么过程中需要collect数据?
  • 您当然可以结合使用 java.util.zip.GZIPOutputStream 和 stackoverflow.com/questions/3969713/… 在 Java / Scala 中流式处理 gzipped XML。我对 Spark 和 Azure Synapse 的了解还不够,无法为您提供有关如何将两者结合起来的具体说明,抱歉。 GL!
  • 目的是我们可以将整个数据集作为可消耗的东西(json)存储在datalake V2存储中作为parquet文件。根据需要(上下文),我们将有单独的查询。原始 xml “记录”它们的结构部分是固定的,部分是可变的,具体取决于数据的上下文。数据本身被转换为 JSON,因为这更容易访问(一些使用 SAX Java 库的 scala 代码)。我会尝试一下这些建议。目前我们有一些工作可以将整个流重新分区为 32 个分区,但还没有并行性。

标签: scala apache-spark azure-synapse


【解决方案1】:

TL;DR 建议:

步骤 1. 增加驱动内存并测试

步骤 2. 增加执行器内存并测试第一步是否失败

略长的版本:

它在collect() 操作上给出OOM 的事实并不表明它在spark.read 操作上是OOM 还是df.collect() Spark 调度程序将在遇到 Action 时运行 DAG,但在遇到 Transformation 时不会运行。 因此,如果collect 是您的第一个操作,那么它实际上会运行 DAG,并且 OOM 甚至可能处于读取状态,但在收集时表现为 OOM

Spark UI 将提供有关 OOM 发生位置的见解

你说得对,解压缩 gzip 不会被并行化。在读取操作时,它将使用单个执行器甚至单个核心。所以我会增加执行程序的内存,直到有足够的内存将整个文件压缩到内存中——不仅仅是精确的文件大小,保留通常的 400MB / 0.7% 缓冲区。

如果错误确实发生在collect() 操作上,那么您需要充分增加驱动程序内存。

您的应用不会在read 并行化。您的应用不会在collect() 并行化 您的应用程序可以在它们之间的转换期间并行化,您可以通过进一步重新分区数据帧/数据集/rdd 来强制并行化到您想要调整的程度。

最后,我会再次考虑您是否需要收集,或者是否可以将输出存储为多个分区文件?

【讨论】:

  • 谢谢。我会试试信息
  • @VincentDegrave 一些反馈会不会有帮助?
【解决方案2】:

我认为压缩文件总是会成为瓶颈,因此一种替代方法是将其解压缩并查看是否有帮助。我还会考虑将 xml 加载到 Synapse 中的一个表中(它可以处理 gzipped 文件)。这将产生解压缩的效果,然后您可以使用 synapsesql 方法将其传递到 Synapse Notebook,例如在 Scala 中:

// Get the table with the XML column from the database and expose as temp view
val df = spark.read.synapsesql("yourPool.dbo.someXMLTable")

df.createOrReplaceTempView("someXMLTable")

您可以像我 here 那样处理 XML,然后将其作为内部表写回 Synapse 专用 SQL 池:

val df2 = spark.sql(""" 
SELECT
    colA,
    colB,
    xpath_string(pkData,'/DataSet/EnumObject[name="Inpatient"]/value') xvalue
FROM someXMLTable
""")

   
// Write that dataframe back to the dedicated SQL pool
df2.write.synapsesql("yourPool.dbo.someXMLTable_processed", Constants.INTERNAL)

这将确保您保持并行,不需要collect。注意,关于将 gzip 压缩的文件上传到专用 SQL 池有几个假设,xpath_string 可以满足您的需要,需要检查和确认。建议的模式:

【讨论】:

  • 这有什么更新吗?赏金@StijnWynants 发生了什么?
  • 谢谢,我也试试看。
  • 关于“将 xml 加载到 Synapse 中的表”。你能举个例子吗,我正在搜索,但不清楚是否必须将其加载到链接数据库(可通过网关 onprem 访问)或一些 cosmos/其他实例?很抱歉这个新手问题,提前非常感谢! (还不习惯 Azure 和云的东西)
猜你喜欢
  • 1970-01-01
  • 2013-02-02
  • 1970-01-01
  • 2017-06-16
  • 2011-07-04
  • 1970-01-01
  • 1970-01-01
  • 2016-08-12
  • 1970-01-01
相关资源
最近更新 更多