【问题标题】:Spark specify multiple column conditions for dataframe joinSpark为数据框连接指定多列条件
【发布时间】:2015-09-23 06:19:23
【问题描述】:

连接两个数据框时如何提供更多列条件。例如我想运行以下内容:

val Lead_all = Leads.join(Utm_Master,  
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")

我只想在这些列匹配时加入。但上述语法无效,因为 cols 只接受一个字符串。那么如何才能得到我想要的呢。

【问题讨论】:

    标签: apache-spark apache-spark-sql rdd


    【解决方案1】:

    Pyspark 中,在每个条件周围使用括号是在连接条件中使用多个列名的关键。

    joined_df = df1.join(df2, 
        (df1['name'] == df2['name']) &
        (df1['phone'] == df2['phone'])
    )
    

    【讨论】:

      【解决方案2】:

      试试这个:

      val rccJoin=dfRccDeuda.as("dfdeuda")
      .join(dfRccCliente.as("dfcliente")
      ,col("dfdeuda.etarcid")===col("dfcliente.etarcid") 
      && col("dfdeuda.etarcid")===col("dfcliente.etarcid"),"inner")
      

      【讨论】:

        【解决方案3】:

        === 选项给了我重复的列。所以我改用Seq

        val Lead_all = Leads.join(Utm_Master,
            Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")
        

        当然,这只有在连接列的名称相同时才有效。

        【讨论】:

          【解决方案4】:

          Spark SQL 支持在括号中加入列的元组,如

          ... WHERE (list_of_columns1) = (list_of_columns2)
          

          这比为由一组“AND”组合的每对列指定相等表达式 (=) 更短。

          例如:

          SELECT a,b,c
          FROM    tab1 t1
          WHERE 
             NOT EXISTS
             (    SELECT 1
                  FROM    t1_except_t2_df e
                  WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
             )
          

          而不是

          SELECT a,b,c
          FROM    tab1 t1
          WHERE 
             NOT EXISTS
             (    SELECT 1
                  FROM    t1_except_t2_df e
                  WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
             )
          

          这也不太可读,尤其是当列列表很大并且您想轻松处理 NULL 时。

          【讨论】:

          • 它真的有效吗? 1.6版本支持这个吗?
          【解决方案5】:

          Pyspark 中,您可以简单地分别指定每个条件:

          val Lead_all = Leads.join(Utm_Master,  
              (Leaddetails.LeadSource == Utm_Master.LeadSource) &
              (Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
              (Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
              (Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))
          

          请务必正确使用运算符和括号。

          【讨论】:

            【解决方案6】:

            斯卡拉:

            Leaddetails.join(
                Utm_Master, 
                Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
                    && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
                    && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
                    && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
                "left"
            )
            

            使其不区分大小写

            import org.apache.spark.sql.functions.{lower, upper}
            

            那么就在join方法的条件下使用lower(value)

            例如:dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))

            【讨论】:

              【解决方案7】:

              您可以做的一件事是使用原始 SQL:

              case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
              case class Foo(x2: Int, y2: Int, z2: Int, v2: String)
              
              val bar = sqlContext.createDataFrame(sc.parallelize(
                  Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
                  Bar(3, 1, 2, "bar") :: Nil))
              
              val foo = sqlContext.createDataFrame(sc.parallelize(
                  Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
                  Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))
              
              foo.registerTempTable("foo")
              bar.registerTempTable("bar")
              
              sqlContext.sql(
                  "SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
              

              【讨论】:

              • 这是我现在使用的方法。我希望我可以在不注册为临时表的情况下做到这一点。如果无法使用数据框 API 执行此操作,我将接受答案。
              • 如果是这样,@rchukh 的回答会好很多。
              【解决方案8】:

              对于这种情况,有一个 Spark column/expression API join

              Leaddetails.join(
                  Utm_Master, 
                  Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
                      && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
                      && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
                      && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
                  "left"
              )
              

              示例中的&lt;=&gt; 运算符表示“Equality test that is safe for null values”。

              与简单的Equality test (===) 的主要区别在于,第一个可以安全使用,以防其中一列可能具有空值。

              【讨论】:

              • 您能解释一下===&lt;=&gt; 之间的区别吗?
              • 更新了有关这些相等测试之间差异的更多信息。
              • 啊哈,在文档中找不到这个。你是怎么知道的?
              • @user568109 我正在使用 Java API,在某些情况下 Column/Expression API 是唯一的选择。此外,Column/Expression API 主要作为 Builder 实现,因此更容易在每个版本的 Spark 上发现新方法。
              • 这给了我重复的列,所以我使用了我在另一个答案中添加的 Seq 方法。
              【解决方案9】:

              从 Spark 版本 1.5.0(当前未发布)开始,您可以加入多个 DataFrame 列。参考SPARK-7990: Add methods to facilitate equi-join on multiple join keys

              Python

              Leads.join(
                  Utm_Master, 
                  ["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
                  "left_outer"
              )
              

              斯卡拉

              该问题要求 Scala 答案,但我不使用 Scala。这是我最好的猜测......

              Leads.join(
                  Utm_Master,
                  Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
                  "left_outer"
              )
              

              【讨论】:

              • 我们如何让连接忽略值的大小写(即不区分大小写)?我在下面尝试过,但没有奏效。 sqlContext.sql("设置 spark.sql.caseSensitive=false")
              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2021-09-02
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多