【问题标题】:Scala/Spark : Flattening the DataFrame using RDD only functionsScala/Spark:仅使用 RDD 函数展平 DataFrame
【发布时间】:2017-05-30 03:51:38
【问题描述】:

我有以下数据框,我想仅使用 RDD 进行展平。有人可以帮忙吗?

输入数据框:

+---------+----------+------+-----+- -------------+------------------------------------------------ ------------------+ |TPNB |unitOfMeasure|locationReference|类型|类型|有效日期时间| +---------+----------+------+-----+- -------------+------------------------------------------------ ------------------+ |079562193|EA |0810 |商店|[可出售,持有]|[2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z]| +---------+----------+------+-----+- -------------+------------------------------------------------ ------------------+

输出:

TPNB unitOfMeasure locationReference types types effectiveDateTime 079562193 EA 0810 商店出售 2015-10-09T00:55:23.6345Z 079562193 EA 0810 商店举行 2015-10-09T00:55:23.6345Z

我正在尝试类似的方法,但似乎不起作用。

final_output.map(value=>((value(0),value(1),value(2),value(3)),value(5),value(6))).map{ case(key,value)=>value.map(records=>(key,records)) }

【问题讨论】:

  • final_output.rdd 应该给你 rdd 数据,你试过了吗?
  • 是的,我试过了。它没有工作
  • 您在使用 .rdd 时遇到了什么问题?
  • 嗨 Ramesh,它只是给了我 IntelliJ 中的语法错误
  • final_output 不是数据框吗?

标签: scala apache-spark rdd


【解决方案1】:

这是您仅在 RDD 上寻找的内容。将第 5 行和第 6 行转换为 Map 并为每个行创建一行。

  import spark.implicits._

  val data = spark.sparkContext
    .parallelize(
      Seq(
        ("079562193",
         "EA",
         "0810",
         "STORE",
         Array("SELLABLE", "HELD"),
         Array("2015-10-09T00:55:23.6345Z", "2015-10-09T00:55:23.6345Z"))
      ))

  val result = data
    .map(row => (row._1, row._2, row._3, row._4, (row._5.zip(row._6).toMap)))
    .map(r => {
      r._5.map(v => (r._1, r._2, r._3, r._4, v._1, v._2))
    })
    .collect()
    .foreach(println)

((079562193,EA,0810,STORE,SELLABLE,2015-10-09T00:55:23.6345Z)
(079562193,EA,0810,STORE,HELD,2015-10-09T00:55:23.6345Z))

【讨论】:

  • 嗨 Sankar,当我尝试将 rdd 转换为数据帧并将其应用于数据帧时,这不起作用。像这样 STEP1: val dataDF= sqlContext.createDataFrame(data).toDF("TPNB","unitOfMeasure","locationReference","types","state","effectiveDateTime") STEP2: dataDF.rdd.map(row = > (row(0), row(1), row(2), row(3), (row(4).zip(row(5)).toMap))).flatMap(r => { r._5 .map(v => (r._1, r._2, r._3, r._4, v._1, v._2)) }) .collect() .foreach(println)
  • 嗨 Shankar,这仅适用于 rdd。当您使用 .rdd 将其应用于数据框时,它不起作用
  • 是的,您只针对 RDD 提出了这个问题,所以这是 RDD 的解决方案。对于数据框,我们需要不同的解决方案
【解决方案2】:

要使用仅使用 RDD 的函数进行转换,您可以在将数据帧转换为 RDD 后执行类似以下操作(例如,通过 df.rdd):

val rdd = sc.parallelize(Seq(
    ("079562193", "EA", "0810", "STORE", List("SELLABLE", "HELD"), List("2015-10-09T00:55:23.6345Z", "2015-10-09T00:55:23.6345Z"))
  )).
  map{ case (t, u, l, y, ts, ds) => ((t, u, l, y), (ts, ds)) }.
  flatMapValues{ case (x, y) => x zip y }.
  map{ case ((t, u, l, y), (ts, ds)) => Seq(t, u, l, y, ts, ds) }

rdd.collect.foreach(println)
List(079562193, EA, 0810, STORE, SELLABLE, 2015-10-09T00:55:23.6345Z)
List(079562193, EA, 0810, STORE, HELD, 2015-10-09T00:55:23.6345Z)

【讨论】:

  • 嗨,Leo,当我尝试将 rdd 转换为数据帧并将其应用于数据帧时,他不起作用。像这样 STEP1: val dataDF= sqlContext.createDataFrame(data).toDF("TPNB","unitOfMeasure"‌​,"locationReference"‌​,"types","state","ef‌​fectiveDateTime") STEP2: dataDF. rdd.map{ case (t, u, l, y, ts, ds) => ((t, u, l, y), (ts, ds)) }。 flatMapValues{ case (x, y) => x zip y }。 map{ case ((t, u, l, y), (ts, ds)) => Seq(t, u, l, y, ts, ds) }.collect.foreach(println)
  • @Rohan Nayak,那是因为根据您的原始请求,这些是 RDD 转换。
猜你喜欢
  • 1970-01-01
  • 2022-09-27
  • 2022-11-02
  • 2017-06-13
  • 2018-03-05
  • 1970-01-01
  • 1970-01-01
  • 2016-04-13
  • 1970-01-01
相关资源
最近更新 更多