【问题标题】:Create new dataframe from selected information from another datama从另一个数据集的选定信息创建新数据框
【发布时间】:2021-12-03 22:33:32
【问题描述】:

我有一个具有以下架构的数据框:

root
 |-- id: long (nullable = true)
 |-- type: string (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- lat: Long (nullable = true)
 |-- lon: Long (nullable = true)
 |-- nds: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ref: long (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- ref: long (nullable = true)
 |    |    |-- role: string (nullable = true)

我想创建一个新的数据框res,在其中从tags 列中选择特定数据。我需要来自key=placekey=populationvalues。新数据框应具有以下架构:

val schema = StructType(
               Array(
                 StructField("place", StringType),
                 StructField("population", LongType)
               )
             )

我完全不知道该怎么做。我尝试复制第一个数据框,然后选择列,但这没有用。

谁有解决办法?

【问题讨论】:

    标签: java scala dataframe apache-spark apache-spark-sql


    【解决方案1】:

    您可以直接在 map 类型的列上应用所需的键来提取值,然后按照您的意愿强制转换和重命名列,如下所示:

    import org.apache.spark.sql.functions.col
    import org.apache.spark.sql.types.LongType
    
    val result = dataframe.select(
      col("tags")("place").as("place"),
      col("tags")("population").cast(LongType).as("population")
    )
    

    使用以下tags 列:

    +------------------------------------------------+
    |tags                                            |
    +------------------------------------------------+
    |{place -> A, population -> 32, another_key -> X}|
    |{place -> B, population -> 64, another_key -> Y}|
    +------------------------------------------------+
    

    你会得到以下结果:

    +-----+----------+
    |place|population|
    +-----+----------+
    |A    |32        |
    |B    |64        |
    +-----+----------+
    

    具有以下架构:

    root
     |-- place: string (nullable = true)
     |-- population: long (nullable = true)
    

    【讨论】:

      【解决方案2】:

      给定以下简化输入:

      val df = Seq(
        (1L, Map("place" -> "home", "population" -> "1", "name" -> "foo")),
        (2L, Map("place" -> "home", "population" -> "4", "name" -> "foo")),
        (3L, Map("population" -> "3")),
        (4L, Map.empty[String, String])
      ).toDF("id", "tags")
      

      您想使用方法map_filter 选择值以过滤映射以仅包含您想要的键,然后调用map_values 以获取这些条目。 map_values 返回一个数组,所以需要使用explode_outer 来展平数据。我们在这里使用explode_outer,因为您可能有既没有地点也没有人口的条目,或者只有两者之一。一旦数据以我们可以轻松使用的形式出现,我们只需在所需结构中选择我们想要的字段。

      我保留了id 列,因此当您运行该示例时,您可以看到我们不会删除缺少数据的条目。

      
      val r = df.select(
          col("id"),
          explode_outer(map_values(map_filter(col("tags"), (k,_) => k === "place"))) as "place",
          map_values(map_filter(col("tags"), (k,_) => k === "population")) as "population"
        ).withColumn("population", explode_outer(col("population")))
        .select(
          col("id"),
          array(
            struct(
              col("place"),
              col("population") cast LongType as "population"
            ) as "place_and_population"
          ) as "data"
        )
      

      给予:

      root
       |-- id: long (nullable = false)
       |-- data: array (nullable = false)
       |    |-- element: struct (containsNull = false)
       |    |    |-- place: string (nullable = true)
       |    |    |-- population: long (nullable = true)
      
      +---+--------------+
      | id|          data|
      +---+--------------+
      |  1|   [{home, 1}]|
      |  2|   [{home, 4}]|
      |  3|   [{null, 3}]|
      |  4|[{null, null}]|
      +---+--------------+
      

      【讨论】:

        【解决方案3】:

        让我们调用您的原始数据框df。你可以像这样提取你想要的信息

        import org.apache.spark.sql.functions.sql.col
        
        val data = df
          .select("tags")
          .where(
            df("tags")("key") isin (List("place", "population"): _*)
          )
          .select(
            col("tags")("value")
          )
          .collect()
          .toList
        

        这将为您提供一个 List[Row],它可以使用您的架构转换为另一个数据框

        import scala.collection.JavaConversions.seqAsJavaList
        
        sparkSession.createDataFrame(seqAsJavaList[Row](data), schema)
        

        【讨论】:

        • 我试过这个方法。使用所需架构创建新数据框。但是,新的数据框是空的。我认为我做错了转换。我做了什么: val output = spark.createDataFrame(seqAsJavaList[Row](data), schema)
        猜你喜欢
        • 2021-12-04
        • 2011-02-16
        • 2018-12-30
        • 1970-01-01
        • 1970-01-01
        • 2019-02-02
        • 1970-01-01
        • 1970-01-01
        • 2022-01-09
        相关资源
        最近更新 更多