【问题标题】:Iterating a huge data frame in spark/scala在 spark/scala 中迭代一个巨大的数据框
【发布时间】:2017-08-05 05:58:36
【问题描述】:

我有一个包含 5 亿行的数据框。我想遍历每一行并修改列名/删除几列,并根据几个条件更新列值。我正在使用以下方法进行收集。

df.collect.foreach(row => mycustomeMethod())

因为 collect 会将所有数据带到驱动程序,我面临内存不足错误。您能否建议任何替代方法来实现相同的目标。

我们正在使用 datastax 的 spark-cassandra 连接器。我尝试了不同的方法,但都没有帮助提高性能。

【问题讨论】:

  • 可以在不收集数据框的情况下完成列名修改和列删除。所以你的问题是更新列值而不收集,对吧?
  • 对于 5 亿行,使用任何方法转换都会很慢。其他方法是否运行缓慢,或者是否也因“内存不足”错误而崩溃?
  • 他正在耗尽内存,我的假设是,他在收集后应用转换。所以显然驱动节点不能一次处理全部 5 亿。

标签: scala apache-spark apache-spark-sql spark-dataframe spark-cassandra-connector


【解决方案1】:

使用 map 操作而不是 collect/foreach,并转换回 RDD。这将允许计算分布在集群周围,而不是强制将其全部集中到一个节点中。您可以通过修改您的自定义方法来获取并返回一个 Row,然后可以将其转换回 DataFrame。

val oldSchema = originalDf.schema
val newSchema = //TODO: put new schema based on what you want to do
val newRdd = originalDf.map(row => myCustomMethod(row))
val newDf = sqlContext.createDataFrame(newRdd,newSchema)

然后可以通过新 DataFrame 上的 .drop 方法处理删除行。

如果您的自定义方法不可序列化 - 或者包含不可序列化的对象 - 在这种情况下切换到 mapPartitions 方法,则可能会遇到问题,以便您可以强制每个节点首先创建相关对象的副本.

【讨论】:

    猜你喜欢
    • 2021-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-03
    • 2018-08-21
    • 2020-08-03
    相关资源
    最近更新 更多