【问题标题】:(py)Spark job fails on concatenating strings(py)Spark 作业在连接字符串时失败
【发布时间】:2016-06-15 23:40:42
【问题描述】:

我有一个 2TB 内存集群和一个 1TB 数据集(在磁盘上)。任务是将用户会话连接在一起以进行进一步分析。 (略简化)代码如下:

dataset.flatMap(
  get_key_val_from_json  # gives (key, [json_string])
).reduceByKey(
  add  # merge lists
).map(
  lambda x: Row(k=x[0], v='\t'.join(x[1]))  # convert the list to a string
).toDF(
  ['key', 'events_with_tab_sep']  # convert to DataFrame so could save as parquet
).write.mode(
  'overwrite'
).parquet(
  some_path
)

但是,在reduceByKey 步骤,作业无法继续,因为执行程序开始失败(显示很多 CANNOT FIND ADDRESS 错误)并重新启动。作业最终会抛出

的错误消息

org.apache.spark.SparkException:作业中止。

因为

作业 X 无法完成,因为阶段 Y 失败了 4 次。

reduceByKey 的 shuffle 过程中似乎存在 OOM 问题?但是,如果我提取、排序和归约,我认为此任务与内存无关。 Spark 是否需要将整个数据集加载到内存中进行 reduce 操作?

试过但没用

1-要求Spark仅将中间文件持久化到本地磁盘,即

extracted = dataset.flatMap(
  get_key_val_from_json
).persist(
    StorageLevel.DISK_ONLY
)

extracted.reduceByKey(
  add
).map(
  lambda x: Row(k=x[0], v='\t'.join(x[1]))
).toDF(
  ['key', 'events_with_tab_sep']
).write.mode(
  'overwrite'
).parquet(
  some_path
)

【问题讨论】:

  • 好吧,你在reduceByKey 中应用了非归约操作,这几乎是另一个groupByKey。在最坏的情况下,只有当所有数据都适合为单个执行程序分配的内存时,这才有效。
  • 我知道最坏的情况是所有 1TB 数据都具有相同的密钥。在这里,我知道最长的事件列表约为 100KB,因此将它们连接起来应该不是问题。
  • 我很困惑,如果在第一个 map 之后有一个 sort,那么 reduceByKey 操作不需要依赖于内存中的完整数据集。但似乎并非如此。
  • 关于这个问题的一个很好的讨论是here

标签: apache-spark mapreduce pyspark spark-dataframe


【解决方案1】:

首先,您可以在单独的行中执行任务,中间使用 .first() 或 .show() 步骤,看看哪个失败。

【讨论】:

  • 失败的是reduceByKey。我会让它更明确。
  • operator.add 来自 python
猜你喜欢
  • 2015-03-30
  • 2017-11-24
  • 1970-01-01
  • 2018-02-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-12-15
  • 2015-06-02
相关资源
最近更新 更多