【问题标题】:Map for each value of an array in a Spark Row映射 Spark Row 中数组的每个值
【发布时间】:2019-01-08 01:01:48
【问题描述】:

我有一个json数据集,格式如下,每行一个条目。

 { "sales_person_name" : "John", "products" : ["apple", "mango", "guava"]}
 { "sales_person_name" : "Tom", "products" : ["mango", "orange"]}
 { "sales_person_name" : "John", "products" : ["apple", "banana"]}
 { "sales_person_name" : "Steve", "products" : ["apple", "mango"]}
 { "sales_person_name" : "Tom", "products" : ["mango", "guava"]}

我想知道谁卖的芒果最多等等。 因此,我想将文件加载到数据帧并为每个事务的数组中的每个产品值发出一个(键,值)对(产品,名称)。

var df = spark.read.json("s3n://sales-data.json")
df.printSchema()
root
 |-- sales_person_name: string (nullable = true)
 |-- products: array (nullable = true)

var nameProductsMap = df.select("sales_person_name",  "products").show()
+-----------------+--------------------+
|sales_person_name|   products         |
+-----------------+--------------------+
|             John|[mango, apple,...   |
|              Tom|[mango, orange,...  |
|             John|[apple, banana...   | 

var resultMap = df.select("products", "sales_person_name")
                  .map(r => (r(1), r(0)))
                  .show()  //This is where I am stuck.

我无法找出正确的方法来爆炸()行(0)并使用行(1)值发出一次所有值。任何人都可以提出一种方法。谢谢!

预期输出:

Mango : John(4), Tom(2), Greg(1)... 
Banana: Tom(5), John(2), ...

【问题讨论】:

  • 我正在尝试类似: var actorHashtagsMap = df.select("products", "sales_person_name").map(r => { r(0).map(x => (x, r (1))) })

标签: scala apache-spark dataframe apache-spark-sql


【解决方案1】:
val exploded = df.explode("products", "product") { a: mutable.WrappedArray[String] => a }
val result = exploded.drop("products")
result.show()

打印:

+-----------------+-------+
|sales_person_name|product|
+-----------------+-------+
|             John|  apple|
|             John|  mango|
|             John|  guava|
|              Tom|  mango|
|              Tom| orange|
|             John|  apple|
|             John| banana|
|            Steve|  apple|
|            Steve|  mango|
|              Tom|  mango|
|              Tom|  guava|
+-----------------+-------+

【讨论】:

  • 感谢佐哈尔!你让它看起来很容易。我必须为 mutable._ 放置导入语句,并且爆炸位于 select() 结果而不是 df 上。我明白了你的概念,现在很容易。谢谢!
【解决方案2】:

更新

下面的代码应该可以工作

import org.apache.spark.sql.functions.explode
import scala.collection.mutable

val resultMap = df.select(explode($"products"), $"sales_person_name")


def counter(l: TraversableOnce[Any]) = {
  val temp = mutable.Map[Any, Int]()
    for (i <- l) {
      if(temp.contains(i)) temp(i) += 1
      else temp(i) = 1
    }
  temp
}

resultsMap.map(x => (x(0), Array(x(1)))).
           reduceByKey(_ ++ _).
           map { case (x,y) => (x, counter(y).toArray) }

结果输出:Array((banana,Array((John,1))), (guava,Array((Tom,1), (John,1))), (orange,Array((Tom,1))), (apple,Array((Steve,1), (John,2))), (mango,Array((Tom,2), (Steve,1), (John,1))))

【讨论】:

  • 是的,这是一个中间步骤。我终于想通过 col 产品减少它以达到:apple : John(4), Tom(2), Steve(1) ;芒果:史蒂夫(3),汤姆(1); //
  • 更新了reduce操作的答案。
  • 感谢代码@septa 但是我收到以下错误。错误:值 reduceByKey 不是 org.apache.spark.sql.Dataset[(Any, Array[Any])] 的成员 可能原因:在“值 reduceByKey”之前可能缺少分号?对于我在结果集上所做的任何类型的 map(),我都会得到这个。我正在使用 Spark 2.0。有什么线索吗?
  • :157: 错误:类型不匹配;找到:任何需要:scala.collection.TraversableOnce[Any] map { case (x,y) => (x, counter(y).toArray) } ^
  • 以上代码适用于 Spark 1.6.x。不幸的是,Spark 2.0 有一个我还不熟悉的完全不同的 API。然而,这个想法应该是一样的。
猜你喜欢
  • 1970-01-01
  • 2016-03-24
  • 1970-01-01
  • 2018-10-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-15
  • 2020-11-13
相关资源
最近更新 更多