【问题标题】:spark - mimic pyspark asDict() for scala without using case classspark - 在不使用案例类的情况下为 scala 模拟 pyspark asDict()
【发布时间】:2020-07-13 20:49:43
【问题描述】:

Pyspark 允许您在使用以下方法从数据帧返回单行时创建字典。

t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).collect()[0].asDict()
print(t)
print(t["key"])
print(t["value"])
print(t["rw"])
print("Printing using for comprehension")
[print(t[i]) for i in t ]

Results:

{'key': 'spark.app.id', 'value': 'local-1594577194330', 'rw': 1}
spark.app.id
local-1594577194330
1
Printing using for comprehension
spark.app.id
local-1594577194330
1

我正在 scala-spark 中尝试相同的方法。可以使用案例类方法。

case class download(key:String, value:String,rw:Long)

val t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).as[download].first
println(t)
println(t.key)
println(t.value)
println(t.rw)

结果:

download(spark.app.id,local-1594580739413,1)
spark.app.id
local-1594580739413
1

在实际问题中,我有近 200 多列,不想使用案例类方法。我正在尝试以下类似的方法来避免使用案例类选项。

val df =spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)"))

(df.columns).zip(df.take(1)(0))

但出现错误。

    <console>:28: error: type mismatch;
 found   : (String, String, Long)
 required: Iterator[?]
       (df.columns.toIterator).zip(df.take(1)(0))

有没有办法解决这个问题。

【问题讨论】:

  • 您可以使用tuple.productIterator将元组(产品)转换为迭代器

标签: scala apache-spark pyspark


【解决方案1】:

在 Scala 中,有一个方法 getValuesMap 可以将 row 转换为 Map[columnName: String, columnValue: T]。 尝试使用与以下相同的方法-

  val df =spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)"))
    df.show(false)
    df.printSchema()

    /**
      * +----------------------------+-------------------+---+
      * |key                         |value              |rw |
      * +----------------------------+-------------------+---+
      * |spark.app.id                |local-1594644271573|1  |
      * |spark.app.name              |TestSuite          |2  |
      * |spark.driver.host           |192.168.1.3        |3  |
      * |spark.driver.port           |58420              |4  |
      * |spark.executor.id           |driver             |5  |
      * |spark.master                |local[2]           |6  |
      * |spark.sql.shuffle.partitions|2                  |7  |
      * +----------------------------+-------------------+---+
      *
      * root
      * |-- key: string (nullable = false)
      * |-- value: string (nullable = false)
      * |-- rw: integer (nullable = true)
      */

    val map = df.head().getValuesMap(df.columns)
    println(map)
    println(map("key"))
    println(map("value"))
    println(map("rw"))
    println("Printing using for comprehension")
    map.foreach(println)

    /**
      * Map(key -> spark.app.id, value -> local-1594644271573, rw -> 1)
      * spark.app.id
      * local-1594644271573
      * 1
      * Printing using for comprehension
      * (key,spark.app.id)
      * (value,local-1594644271573)
      * (rw,1)
      */

【讨论】:

  • @stack0114106,这个解决方案对你有用吗?如果您遇到任何问题,请告诉我
【解决方案2】:

问题在于zip 是一个集合上的方法,它只能接受另一个实现IterableOnce 的集合对象,而df.take(1)(0) 是一个Spark SQL Row,它不属于该类别。

尝试使用toSeq 方法将该行转换为Seq

df.columns.zip(df.take(1)(0).toSeq)

结果:

Array((key,spark.app.id), (value,local-1594577194330), (rw,1))

【讨论】:

    猜你喜欢
    • 2015-11-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-11
    • 2021-12-26
    • 1970-01-01
    • 2012-06-26
    相关资源
    最近更新 更多