【问题标题】:SparkR DataFrame partitioning issueSparkR DataFrame 分区问题
【发布时间】:2018-07-05 23:44:56
【问题描述】:

在我的 R 脚本中,我有一个 SparkDataFrame 两列(时间、值),其中包含四个不同月份的数据。因为我需要将我的函数分别应用到每个月,所以我想我会将 repartition 它分成四个分区,每个分区分别保存一个月的数据。

我创建了一个名为 partition 的附加列,具有一个整数值 0 - 3,然后通过该特定列调用 repartition 方法。

遗憾的是,正如本主题中所描述的那样: Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?,通过repartition 方法,我们只能确定所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以最终在同一个分区中。

在我的例子中,执行下面可见的代码会导致创建 4 个分区,但只用数据填充其中的 2 个。

我想我应该使用partitionBy 方法,但是对于 SparkR,我不知道该怎么做。 官方文档指出,此方法适用于名为 WindowSpec 而不是 DataFrame 的东西。

我非常感谢您对此事的帮助,因为我不知道如何将此方法合并到我的代码中。

sparkR.session(
   master="local[*]",  sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)

schema <- structType(
  structField("time", "timestamp"), 
  structField("value", "double"), 
  structField("partition", "string"))

processedDf <- dapply(repartitionedDf, 
  function(x) { data.frame(produceHourlyResults(x), stringsAsFactors = FALSE) },
  schema)

【问题讨论】:

    标签: r apache-spark sparkr


    【解决方案1】:

    您使用了错误的方法。如果你

    需要将我的函数分别应用于每个月

    你应该使用gapply那个

    使用指定的列对 SparkDataFrame 进行分组,并将 R 函数应用于每个组。

    df %>% group_by("month") %>% gapply(fun, schema)
    

    df %>% gapply("month", fun, schema)
    

    在我的例子中,执行下面可见的代码会导致创建 4 个分区,但只在其中 2 个分区中填充数据。

    这表明哈希冲突。合理地增加分区数量超过唯一键的数量应该可以解决问题:

    spark.sql.shuffle.partitions 17
    

    我想我应该使用 partitionBy 方法,但是

    没有。 partitionBywindow functions (SparkR window function) 一起使用。


    致地址your comment

    我决定将 dapply 与单独的分区一起使用,以便能够轻松地将每个月保存到单独的 CSV 文件中

    哈希分区器不能像How does HashPartitioner work?那样工作

    您可以在作者中尝试使用partitionBy,但我不确定它是否在 SparkR 中直接支持。它在结构化流中受支持,对于批处理,您可能必须调用 Java 方法或使用带有 Metastore 的表:

    createDataFrame(iris) %>% createOrReplaceTempView("iris_view")
    sql(
        "CREATE TABLE iris 
        USING csv PARTITIONED BY(species)
        LOCATION '/tmp/iris' AS SELECT * FROM iris_view"
    )
    

    【讨论】:

    • 我确实启用了它。现在我实际上注意到,当我发出不带 LOCATION 子句的 CREATE 命令时,它会将数据框保存到默认的 location\spark-warehouse 文件夹中。可以通过将spark.sql.warehouse.dir 设置为所需值来更改此默认位置。
    • 您使用什么版本的 Spark?我发现它在 2.2 之前不起作用。
    • 是的,就是这样,我有 2.1.1。我会尝试将其更新到较新的版本。
    猜你喜欢
    • 1970-01-01
    • 2016-02-17
    • 2019-12-27
    • 2020-09-27
    • 1970-01-01
    • 2011-10-03
    • 1970-01-01
    • 2017-06-07
    • 2018-06-13
    相关资源
    最近更新 更多