【问题标题】:How do I efficiently map keys from one dataset based on values from other dataset如何根据来自其他数据集的值有效地映射来自一个数据集的键
【发布时间】:2020-07-03 10:51:13
【问题描述】:

假设数据框 1 代表目标国家和来源国家列表,数据框 2 代表所有国家的可用性,从数据框 1 中找到目标国家映射为 TRUE 和来源的所有对国家/地区映射是FALSE

数据框 1(targetId、sourceId):
美国:中国、俄罗斯、印度、日本
中国:美国、俄罗斯、印度
俄罗斯:美国、日本

数据框 2(id,可用):
美国:是的
中国:假
俄罗斯:是的
印度:错误
日本:是的

结果数据集应如下所示:
(美国、中国),
(美国、印度)

我的想法是首先分解数据集 1,创建新数据框(例如 tempDF),向其中添加 2 个新列:targetAvailable、sourceAvailable 最后过滤 targetAvailable = false 和 sourceAvailable = true 以获得所需的结果数据框架。

下面是我的代码的 sn-p:

 val sourceDF = sourceData.toDF("targetId", "sourceId")
 val mappingDF = mappingData.toDF("id", "available")
 val tempDF = sourceDF.select(col("targetId"), 
                explode(col("sourceId")).as("source_id_split"))

 val resultDF = tempDF.select("targetId")
         .withColumn("targetAvailable", isAvailable(tempDF.col("targetId")))
         .withColumn("sourceAvailable", isAvailable(tempDF.col("source_id_split")))


 /*resultDF.select("targetId", "sourceId").
  filter(col("targetAvailable") === "true" and col("sourceAvailable") 
  === "false").show()*/


// udf to find the availability value for the given id from the mapping table
val isAvailable = udf((searchId: String) => {
val rows = mappingDF.select("available")
          .filter(col("id") === searchId).collect()

if (rows(0)(0).toString.equals("true")) "true" else "false"  })

在计算 resultDF 时调用 isAvailable UDF 会引发一些奇怪的异常。难道我做错了什么?有没有更好/更简单的方法来做到这一点?

【问题讨论】:

  • 嗨 Uday,欢迎来到 SO。你已经尝试了什么?您能否向我们展示您拥有的代码和您当前的问题?它目前看起来不像您遇到的问题,而是您的作业。谢谢!
  • 你好@regina_fallangi:感谢您的指出。编辑了描述并添加了我的代码。
  • "在计算 resultDF 时调用 isAvailable UDF 会引发一些奇怪的异常。" -> 你能发布“奇怪的异常”吗?

标签: scala apache-spark apache-spark-sql apache-spark-dataset data-transform


【解决方案1】:

在您的 UDF 中,您正在引用另一个数据帧,这是不可能的,因此您获得了“奇怪”的异常。

您想根据另一个数据帧中包含的值过滤一个数据帧。您需要做的是加入id 列。在您的情况下实际上有两个连接,一个用于目标,一个用于源。

不过,使用explode 的想法非常好。这是实现您想要的一种方法:

// generating data, please provide this code next time ;-)
val sourceDF = Seq("USA" ->  Seq("China", "Russia", "India", "Japan"),
                   "China" -> Seq("USA", "Russia", "India"),
                   "Russia" -> Seq("USA", "Japan"))
               .toDF("targetId", "sourceId")
val mappingDF = Seq("USA" -> true, "China" -> false,
                    "Russia" -> true, "India" -> false,
                    "Japan" -> true)
               .toDF("id", "available")

sourceDF
    // we can filter available targets before exploding.
    // let's do it to be more efficient.
    .join(mappingDF.withColumnRenamed("id", "targetId"), Seq("targetId"))
    .where('available)
    // exploding the sources
    .select('targetId, explode('sourceId) as "sourceId")
    // then we keep only non available sources
    .join(mappingDF.withColumnRenamed("id", "sourceId"), Seq("sourceId"))
    .where(! 'available)
    .select("targetId", "sourceId")
    .show(false)

产生

+--------+--------+
|targetId|sourceId|
+--------+--------+
|USA     |China   |
|USA     |India   |
+--------+--------+

【讨论】:

  • 太棒了,这正是我想要的。非常感谢!
  • Oli:这里我们加入了两次映射表,一次过滤掉目标映射,然后过滤掉源映射。只是好奇,有没有办法只使用一个连接来解决它?
  • 对不起,我当时错过了您的评论。这实际上取决于。如果两个数据框都很大,我看不到任何解决方案,因为由于您要加入两个不同的列,因此您需要以不同的方式对数据进行混洗以放置匹配的键。但是,如果其中一个很小(小到可以广播),我们可以做得更好。是这样吗?
猜你喜欢
  • 2017-01-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-03
  • 1970-01-01
相关资源
最近更新 更多