【问题标题】:Spark RDD pipe value from tuple来自元组的 Spark RDD 管道值
【发布时间】:2016-10-31 16:35:16
【问题描述】:

我有一个 Spark RDD,其中每个元素都是 (key, input) 形式的元组。我想使用pipe 方法将输入传递给外部可执行文件并生成(key, output) 形式的新RDD。稍后我需要关联的密钥。

以下是使用 spark-shell 的示例:

val data = sc.parallelize(
  Seq(
    ("file1", "one"),
    ("file2", "two two"),
    ("file3", "three three three")))

// Incorrectly processes the data (calls toString() on each tuple)
data.pipe("wc")

// Loses the keys, generates extraneous results
data.map( elem => elem._2 ).pipe("wc")

提前致谢。

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    使用 map 的解决方案不正确,因为 map 不能保证保留分区,因此使用 zip after 会失败。您需要使用 mapValues 来保留初始 RDD 的分区。

    data.zip( 
      data.mapValues{ _.toString }.pipe("my_executable")
    ).map { case ((key, input), output) => 
      (key, output)
    }
    

    【讨论】:

    • 请注意,如果 data rdd 没有分区器,这也会失败(因为 mapValues 只有在分区器可用时才会保存它)。当然,在运行这个 sn-p 之前重新分区将解决问题。
    【解决方案2】:

    考虑到您无法将标签传入/传出可执行文件,这可能工作:

    rdd
      .map(x => x._1)
      .zip(rdd
              .map(x => x._2)
              .pipe("my executable"))
    

    请注意,这可能很脆弱,如果您的可执行文件没有在每个输入记录上生成精确的单行,它肯定会中断。​​

    【讨论】:

    • 这会生成一个 SparkException:Can only zip RDDs with same number of elements in each partition。问题似乎是创建了空分区,当通过管道传输到可执行文件时会产生空结果。类似这个问题:stackoverflow.com/q/33277567
    • @jollybugbear 可执行文件是否在每个输入上恰好产生一行?
    • @Ivan 您可以为每个输入生成几行。然后它将作为字符串“List(line1,line2, ...)”返回。
    猜你喜欢
    • 1970-01-01
    • 2016-06-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-16
    相关资源
    最近更新 更多