【问题标题】:Get rows with common column values in output在输出中获取具有公共列值的行
【发布时间】:2018-04-16 08:45:22
【问题描述】:

我有一个 Spark 1.6 数据框(3 列),其记录格式如下 -

----------
w  p  s
----------
w1 p1 0
w1 p1 1
w1 p1 2
w1 p2 0
w1 p2 1
w2 p1 0
w2 p1 1
w2 p3 0
w2 p3 1
w2 p3 2
w2 p4 0
w1 p4 1
w3 p1 0
w3 p1 1
w3 p2 0
w3 p3 0
w3 p4 0
w4 p1 0
w4 p1 1

// 根据更新的输入更新输出 接下来我想对其进行转换以获取在第 2 列和第 3 列中具有相同值的行,如下所示,消除所有不常见的记录 w -

----------
w  p  s
----------
w1 p1 0
w1 p1 1
w2 p1 0
w2 p1 1
w3 p1 0
w3 p1 1
w4 p1 0
w4 p1 1

我尝试使用自连接,但我得到了输入数据帧中的所有记录,我能想到的另一种方法是获得 w 和 p 的独特组合并将其与输入数据帧连接,然后获得 p 和s 并将其与输入数据框连接起来。

谁能给我一个更好的方法来实现所需的输出。

//更新 - 使用自我加入 我使用了以下自联接查询-

df.registerTempTable("t")

sqlContext.sql("select distinct t1.w,t1.p,t1.s FROM t AS t1 JOIN t AS t2 ON t1.p = t2.p  and t1.s = t2.s  where t1.w != t2.w")

导致以下输出,p2、p3、p4 不会在所有 w 中重叠,因此它们不应出现在输出中 -

w1 p1 0
w1 p1 1
w2 p1 0
w2 p1 1
w3 p1 0
w3 p1 1
w4 p1 0
w4 p1 1
w1 p2 0
w2 p3 0
w2 p4 0
w3 p2 0
w3 p3 0
w3 p4 0

// 使用窗口函数更新,我没有经常使用窗口函数,所以我尝试了这个简单的查询,但我不确定如何获得所需的结果,我很接近但不确定什么是失踪-

val df1 = sqlContext.sql("select  w,p,s, row_number() over ( order by p,s) as rn, rank() over ( order by p,s) as rk, dense_rank() over ( order by p,s) as dr from t")

val df2 = sqlContext.sql("select  w,p,s, row_number() over (partition by p order by p,s) as rn, rank() over (partition by p order by p,s) as rk, dense_rank() over (partition by p order by p,s) as dr from t")

感谢任何帮助

【问题讨论】:

  • 如果我知道拒绝投票的原因,我将不胜感激

标签: sql scala apache-spark apache-spark-sql


【解决方案1】:

这个答案不需要加入。为了清楚起见,提供了注释

//should know the distinct w to check for the output
val distinctListOfW = df.select("w").distinct().collect.map(row => row.getAs[String](0))

//selecting p ans s of any random w (first here) to check of p and s repetion across all w collected above
val firstW = distinctListOfW(0)
val p_and_s_of_first_w = df.filter($"w" === firstW).select("p", "s").collect().map(row => (row.getAs[String](0), row.getAs[String](1)))

//creating empty dataframe for merging sebsequent dataframes which matches the logic
val emptyDF = Seq(("temp", "temp", "temp")).toDF("w", "p", "s")

//fold left on the p and s collected of any random w and if the condition of count of distinct w matches with p and s then merge else return the previous df
val finaldf = p_and_s_of_first_w.foldLeft(emptyDF){(tempdf, ps) => {
  val filtered = df.filter($"p" === ps._1 && $"s" === ps._2)
  val tempdistinctListOfW = filtered.select("w").distinct().collect.map(row => row.getAs[String](0))

  if(filtered.count() > 0 && (tempdistinctListOfW.length == distinctListOfW.length)){
    tempdf.union(filtered)
  }
  else{
    tempdf
  }
}}.filter($"w" =!= "temp")

// this is the final required result
finaldf.show(false)

这应该给你

+---+---+---+
|w  |p  |s  |
+---+---+---+
|w1 |p1 |0  |
|w2 |p1 |0  |
|w3 |p1 |0  |
|w4 |p1 |0  |
|w1 |p1 |1  |
|w2 |p1 |1  |
|w3 |p1 |1  |
|w4 |p1 |1  |
+---+---+---+

【讨论】:

    【解决方案2】:

    像这样的简单 SELF-JOIN 应该可以工作吗?

    from pyspark.sql.functions import col
    
    df.as("df1").select(col("w").alias("w1"), col("p"), col("s"))
        .join(df.as("df2").select(col("w").alias("w2"), col("p"), col("s")), ["p","s"])
        .filter(col("w1") != col("w2"))
    

    【讨论】:

    • 我尝试了自我加入,我不知道 pyspark 的语法,但你的查询似乎等同于 sqlContext.sql("select distinct t1.w,t1.p,t1.r FROM t AS t1 JOIN t AS t2 ON t1.p = t2.p and t1.r = t2.r where t1.w != t2.w") 这并不能消除在几行中部分出现的记录...更新我的问题以反映场景
    • 也许您的"and" 不起作用?尝试像我这样的另一种语法。你使用 Pyspark 还是 scala API?
    猜你喜欢
    • 1970-01-01
    • 2021-07-29
    • 2022-01-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-09-11
    • 1970-01-01
    • 2023-01-16
    相关资源
    最近更新 更多