【问题标题】:selecting a range of elements in an array spark sql在数组spark sql中选择一系列元素
【发布时间】:2019-05-01 21:44:37
【问题描述】:

我使用spark-shell进行以下操作。

最近在 spark-sql 中加载了一个带有数组列的表。

这是相同的 DDL:

create table test_emp_arr{
    dept_id string,
    dept_nm string,
    emp_details Array<string>
}

数据看起来像这样

+-------+-------+-------------------------------+
|dept_id|dept_nm|                     emp_details|
+-------+-------+-------------------------------+
|     10|Finance|[Jon, Snow, Castle, Black, Ned]|
|     20|     IT|            [Ned, is, no, more]|
+-------+-------+-------------------------------+

我可以像这样查询 emp_details 列:

sqlContext.sql("select emp_details[0] from emp_details").show

问题

我想查询集合中的一系列元素:

预期的查询工作

sqlContext.sql("select emp_details[0-2] from emp_details").show

sqlContext.sql("select emp_details[0:2] from emp_details").show

预期输出

+-------------------+
|        emp_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

在纯 Scala 中,如果我有一个数组:

val emp_details = Array("Jon","Snow","Castle","Black")

我可以使用

获取 0 到 2 范围内的元素
emp_details.slice(0,3)

还给我

Array(Jon, Snow,Castle)

我无法在 spark-sql 中应用数组的上述操作。

谢谢

【问题讨论】:

    标签: arrays scala apache-spark hive apache-spark-sql


    【解决方案1】:

    从 Spark 2.4 开始,您可以使用 slice 函数。 In Python):

    pyspark.sql.functions.slice(x, start, length)
    

    集合函数:返回一个数组,其中包含从索引开始(或者如果开始为负数,则从结尾开始)以指定长度包含x中的所有元素。

    ...

    2.4 版中的新功能。

    from pyspark.sql.functions import slice
    
    df = spark.createDataFrame([
        (10, "Finance", ["Jon", "Snow", "Castle", "Black", "Ned"]),
        (20, "IT", ["Ned", "is", "no", "more"])
    ], ("dept_id", "dept_nm", "emp_details"))
    
    df.select(slice("emp_details", 1, 3).alias("empt_details")).show()
    
    +-------------------+
    |       empt_details|
    +-------------------+
    |[Jon, Snow, Castle]|
    |      [Ned, is, no]|
    +-------------------+
    

    In Scala

    def slice(x: Column, start: Int, length: Int): Column
    

    返回一个数组,其中包含 x 中从索引 start 开始(或者如果 start 为负数则从 end 开始)指定长度的所有元素。

    import org.apache.spark.sql.functions.slice
    
    val df = Seq(
        (10, "Finance", Seq("Jon", "Snow", "Castle", "Black", "Ned")),
        (20, "IT", Seq("Ned", "is", "no", "more"))
    ).toDF("dept_id", "dept_nm", "emp_details")
    
    df.select(slice($"emp_details", 1, 3) as "empt_details").show
    
    +-------------------+
    |       empt_details|
    +-------------------+
    |[Jon, Snow, Castle]|
    |      [Ned, is, no]|
    +-------------------+
    

    同样的事情当然可以in SQL

    SELECT slice(emp_details, 1, 3) AS emp_details FROM df
    

    重要

    请注意,与Seq.slice 不同,值从零开始索引,第二个参数是长度,而不是结束位置。

    【讨论】:

    • 对于大型数据库,如果你想要一个值数组怎么办,即。列中任何位置的值?
    【解决方案2】:

    这是一个使用User Defined Function 的解决方案,它的优点是适用于您想要的任何切片大小。它只是围绕 scala 内置的 slice 方法构建了一个 UDF 函数:

    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    
    val slice = udf((array : Seq[String], from : Int, to : Int) => array.slice(from,to))
    

    数据样本示例:

    val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details")
    df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).show
    

    产生预期的输出

    +--------------------+-------------------+
    |         emp_details|              slice|
    +--------------------+-------------------+
    |[Jon, Snow, Castl...|[Jon, Snow, Castle]|
    +--------------------+-------------------+
    

    您也可以在sqlContext 中注册 UDF 并像这样使用它

    sqlContext.udf.register("slice", (array : Seq[String], from : Int, to : Int) => array.slice(from,to))
    sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice(array('Jon‌​','Snow','Castle','Black','Ned'),0,3)")
    

    使用此解决方案,您将不再需要 lit

    【讨论】:

    • 谢谢,它有效,但不完全适合我的情况。我一直试图在一条 SQL 语句中获取所有内容。像这样的东西:sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned'),slice((array('Jon', 'Snow', 'Castle', 'Black', 'Ned')),0,3)").show。我必须同意我可以用你提供的解决方案来解决这个问题,尽管它会让我走一个两步的过程。另外我一直想知道为什么需要lit(抱歉刚开始使用scala)。在我上面的代码中包含lit 会抛出未找到的异常,否则会为切片抛出未找到的异常
    • 您必须按照答案中的说明导入[...]functions.litlit 创建一个在所有行中包含相同值的列(它代表“文字”)。类型检查需要它(尝试不看)。您可以使用更高阶的函数来避免它:def slice(from : Int, to : Int) = udf((array : Seq[String]) =&gt; array.slice(from,to)),这将启用以下合成器:df.withColumn("slice", slice(0, 3)($"emp_details") ).show,但它不一定更简单。
    • @Wilmerton:很抱歉,但我认为这不是import 问题,因为我可以@cheseaux 指出的val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details") df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).showlit 在这种情况下有效,我试图一步完成这两个步骤,可能类似于:sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice((array('Jon','Snow','Castle','Black','Ned')),lit(0),lit(3))")
    • 哈哈,我误会了。您可以将 udf 注册到 spark 上下文,以便在 sql stackoverflow.com/questions/31278110/… 中可用
    • @cheseaux:非常感谢,它有效,最后一件事,lit 究竟做了什么[一个简单的例子就足够了]?
    【解决方案3】:

    Edit2:谁想要以牺牲可读性为代价来避免 udf ;-)

    如果您真的想一步到位,您将不得不使用 Scala 创建一个返回 Column 序列的 lambda 函数并将其包装在一个数组中。这有点牵扯,但这是一步:

    val df = List(List("Jon", "Snow", "Castle", "Black", "Ned")).toDF("emp_details")
    
    df.withColumn("slice", array((0 until 3).map(i => $"emp_details"(i)):_*)).show(false)    
    
    
    +-------------------------------+-------------------+
    |emp_details                    |slice              |
    +-------------------------------+-------------------+
    |[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
    +-------------------------------+-------------------+
    

    _:* 有点神奇地将列表传递给所谓的可变参数函数(在本例中为array,它构造了 sql 数组)。但我建议不要按原样使用此解决方案。将 lambda 函数放在命名函数中

    def slice(from: Int, to: Int) = array((from until to).map(i => $"emp_details"(i)):_*))
    

    为了代码的可读性。请注意,一般情况下,坚持Column 表达式(不使用`udf)具有更好的性能。

    编辑:为了在 sql 语句中执行此操作(正如您在问题中所问的......),遵循相同的逻辑,您将使用 scala 逻辑生成 sql 查询(并不是说它是最易读的)

    def sliceSql(emp_details: String, from: Int, to: Int): String = "Array(" + (from until to).map(i => "emp_details["+i.toString+"]").mkString(",") + ")"
    val sqlQuery = "select emp_details,"+ sliceSql("emp_details",0,3) + "as slice from emp_details"
    
    sqlContext.sql(sqlQuery).show
    
    +-------------------------------+-------------------+
    |emp_details                    |slice              |
    +-------------------------------+-------------------+
    |[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
    +-------------------------------+-------------------+
    

    请注意,您可以将until 替换为to,以提供最后一个被采用的元素,而不是迭代停止的元素。

    【讨论】:

      【解决方案4】:

      您可以使用函数array 从三个值中构建一个新数组:

      import org.apache.spark.sql.functions._
      
      val input = sqlContext.sql("select emp_details from emp_details")
      
      val arr: Column = col("emp_details")
      val result = input.select(array(arr(0), arr(1), arr(2)) as "emp_details")
      
      val result.show()
      // +-------------------+
      // |        emp_details|
      // +-------------------+
      // |[Jon, Snow, Castle]|
      // |      [Ned, is, no]|
      // +-------------------+
      

      【讨论】:

      • downvoters:想解释一下吗?这是一个可行的解决方案,也是此处建议的最短解决方案。 ?
      • 我没有投反对票,但我的猜测是它不像splice-like 行为,就像问题中所问的那样。如果你想获取[4:400] 类型的一部分,你的解决方案(原样)会很麻烦。
      【解决方案5】:

      在 apache spark 中使用 selecrExpr()split() 函数。

      例如:

      fs.selectExpr("((split(emp_details, ','))[0]) as e1,((split(emp_details, ','))[1]) as e2,((split(emp_details, ','))[2]) as e3);
      

      【讨论】:

      • 这甚至不能编译(似乎最后缺少")?)并且即使修复也不起作用 - emp_details 列是一个数组,而不是逗号分隔字符串列。
      【解决方案6】:

      这是我的通用切片 UDF,支持任何类型的数组。有点难看,因为你需要提前知道元素类型。

      import org.apache.spark.sql.types._
      import org.apache.spark.sql.functions._
      
      def arraySlice(arr: Seq[AnyRef], from: Int, until: Int): Seq[AnyRef] =
        if (arr == null) null else arr.slice(from, until)
      
      def slice(elemType: DataType): UserDefinedFunction = 
        udf(arraySlice _, ArrayType(elemType)
      
      fs.select(slice(StringType)($"emp_details", 1, 2))
      

      【讨论】:

        【解决方案7】:

        对于那些坚持使用 Spark slice 函数的人,这里有一个不使用 udfs 的 pySpark 解决方案(Scala 会非常相似)。相反,它使用 spark sql 函数 concat_wssubstring_indexsplit

        这仅适用于字符串数组。要使其与其他类型的数组一起使用,您必须先将它们转换为字符串,然后在“切片”数组后转换回原始类型。

        from pyspark.sql import SparkSession
        from pyspark.sql import functions as F
        
        spark = (SparkSession.builder
            .master('yarn')
            .appName("array_slice")
            .getOrCreate()
        )
        
        emp_details = [
            ["Jon", "Snow", "Castle", "Black", "Ned"],
            ["Ned", "is", "no", "more"]
        ]
        
        df1 = spark.createDataFrame(
            [tuple([emp]) for emp in emp_details],
            ["emp_details"]
        )
        
        df1.show(truncate=False)
        
        +-------------------------------+
        |emp_details                    |
        +-------------------------------+
        |[Jon, Snow, Castle, Black, Ned]|
        |[Ned, is, no, more]            |
        +-------------------------------+
        
        last_string = 2
        
        df2 = (
            df1
            .withColumn('last_string', (F.lit(last_string)))
            .withColumn('concat', F.concat_ws(" ", F.col('emp_details')))
            .withColumn('slice', F.expr("substring_index(concat, ' ', last_string + 1)" ))
            .withColumn('slice', F.split(F.col('slice'), ' '))
            .select('emp_details', 'slice')
        )
        
        df2.show(truncate=False)
        
        +-------------------------------+-------------------+
        |emp_details                    |slice              |
        +-------------------------------+-------------------+
        |[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
        |[Ned, is, no, more]            |[Ned, is, no]      |
        +-------------------------------+-------------------+
        

        【讨论】:

          【解决方案8】:

          使用嵌套拆分:

          split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',')

          scala> import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.SparkSession
          
          scala> val spark=SparkSession.builder().getOrCreate()
          spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d637673
          
          scala> val df = spark.read.json("file:///Users/gengmei/Desktop/test/test.json")
          18/12/11 10:09:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
          df: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
          
          scala> df.createOrReplaceTempView("raw_data")
          
          scala> df.show()
          +-------+-------+--------------------+
          |dept_id|dept_nm|         emp_details|
          +-------+-------+--------------------+
          |     10|Finance|[Jon, Snow, Castl...|
          |     20|     IT| [Ned, is, no, more]|
          +-------+-------+--------------------+
          
          
          scala> val df2 = spark.sql(
               | s"""
               | |select dept_id,dept_nm,split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') as emp_details from raw_data
               | """)
          df2: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
          
          scala> df2.show()
          +-------+-------+-------------------+
          |dept_id|dept_nm|        emp_details|
          +-------+-------+-------------------+
          |     10|Finance|[Jon, Snow, Castle]|
          |     20|     IT|      [Ned, is, no]|
          +-------+-------+-------------------+
          

          【讨论】:

            猜你喜欢
            • 2023-03-22
            • 1970-01-01
            • 2010-09-16
            • 2021-11-07
            • 1970-01-01
            • 2017-03-10
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多