【问题标题】:How to split a dataframe into dataframes with same column values?如何将数据框拆分为具有相同列值的数据框?
【发布时间】:2015-10-18 14:17:59
【问题描述】:

使用 Scala,我如何将 dataFrame 拆分为具有相同列值的多个 dataFrame(无论是数组还是集合)。 例如我想拆分以下DataFrame:

ID  Rate    State
1   24  AL
2   35  MN
3   46  FL
4   34  AL
5   78  MN
6   99  FL

到:

数据集 1

ID  Rate    State
1   24  AL  
4   34  AL

数据集 2

ID  Rate    State
2   35  MN
5   78  MN

数据集 3

ID  Rate    State
3   46  FL
6   99  FL

【问题讨论】:

  • 为什么需要将数据帧拆分为多个数据帧?可能你知道你可以过滤和转换你的数据帧到: [(AL,Seq(24 AL, 4 34 AL)), (MN, Seq(35 MN, 5 78 MN)), (FL, Seq(46 FL 6 99 FL))] 使用 groupBy。
  • groupBy 给出 GroupDate 类型,如何将其转换为 Array?
  • 你能解释一下你想用那个数组做什么吗?
  • 每次新数据框都不需要使用过滤器

标签: scala apache-spark dataframe apache-spark-sql


【解决方案1】:
you can use .. 
var stateDF = df.select("state").distinct()  // to get states in a df
val states = stateDF.rdd.map(x=>x(0)).collect.toList //to get states in a list

for (i <- states)  //loop to get each state
{
var finalDF = sqlContext.sql("select * from table1 where state = '" + state
+"' ")
}

【讨论】:

    【解决方案2】:

    您可以收集唯一的状态值并简单地映射到结果数组:

    val states = df.select("State").distinct.collect.flatMap(_.toSeq)
    val byStateArray = states.map(state => df.where($"State" <=> state))
    

    或映射:

    val byStateMap = states
        .map(state => (state -> df.where($"State" <=> state)))
        .toMap
    

    在 Python 中也是这样:

    from itertools import chain
    from pyspark.sql.functions import col
    
    states = chain(*df.select("state").distinct().collect())
    
    # PySpark 2.3 and later
    # In 2.2 and before col("state") == state) 
    # should give the same outcome, ignoring NULLs 
    # if NULLs are important 
    # (lit(state).isNull() & col("state").isNull()) | (col("state") == state)
    df_by_state = {state: 
      df.where(col("state").eqNullSafe(state)) for state in states}
    

    这里明显的问题是它需要对每个级别进行完整的数据扫描,因此这是一项昂贵的操作。如果您正在寻找一种仅拆分输出的方法,另请参阅 How do I split an RDD into two or more RDDs?

    特别是你可以写Dataset 按感兴趣的列分区:

    val path: String = ???
    df.write.partitionBy("State").parquet(path)
    

    并在需要时回读:

    // Depend on partition prunning
    for { state <- states } yield spark.read.parquet(path).where($"State" === state)
    
    // or explicitly read the partition
    for { state <- states } yield spark.read.parquet(s"$path/State=$state")
    

    根据数据的大小、拆分的级别数、输入的存储和持久性级别,它可能比多个过滤器更快或更慢。

    【讨论】:

    • 也许有点迟到的问题。但是当我在 Spark 2.2.0 中尝试 python 代码时,我总是得到一个“列不可调用”错误。我尝试了几种方法,但仍然遇到相同的错误。有什么解决方法吗?
    • 你需要导入colfrom pyspark.sql.functions import col
    【解决方案3】:

    如果你把dataframe做成一个临时表就很简单了(如果spark版本是2的话)。

    df1.createOrReplaceTempView("df1")
    

    现在您可以进行查询了,

    var df2 = spark.sql("select * from df1 where state = 'FL'")
    var df3 = spark.sql("select * from df1 where state = 'MN'")
    var df4 = spark.sql("select * from df1 where state = 'AL'")
    

    现在你得到了 df2、df3、df4。如果你想将它们作为列表,你可以使用,

    df2.collect()
    df3.collect()
    

    甚至是地图/过滤功能。请参考https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

    【讨论】:

    • 是否有可能在 spark 中循环 SQL 查询?之前收集所有不同的值,然后用“where state = 'i'”或类似的东西替换“where state = 'FL'”?
    • 这将是开销,但您仍然可以使用 Spark Dataframes 和 SCALA 编码来处理它
    • 我用同样的方法将一个 DF 拆分为 5 个子 DF 进行左连接,结果 DF 是一个视图,而不是一个独立的 DF,它与左连接弄乱了,我可以拆分吗进入独立 DF ??
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-12-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多