【问题标题】:How do I apply a function on each value of a column in a SPARKR DataFrame?如何对 SPARKR DataFrame 中列的每个值应用函数?
【发布时间】:2015-08-12 10:06:41
【问题描述】:

我对 SPARKR 比较陌生。我下载了 SPARK 1.4 并设置 RStudio 以使用 SPARKR 库。但是我想知道如何将函数应用于分布式 DataFrame 的列中的每个值,有人可以帮忙吗? 例如,

这很好用

myFunc <- function(x) { paste(x , "_hello")}
c <- c("a", "b", "c")
d <- lapply(c, myFunc)

如何使其适用于分布式 DataFrame。 目的是将“_hello”附加到DF的列名称的每个值

DF <- read.df(sqlContext, "TV_Flattened_2.csv", source = "com.databricks.spark.csv", header="true")
SparkR:::lapply(DF$Name, myFunc)

在 SPARK 1.4 发布之前的 alpha 版本的 SPARKR 中似乎有这个能力,为什么现在在 SPARK 1.4 正式版本中没有这个能力?

【问题讨论】:

  • 我对 sparkr 一无所知,但您可能更需要 name(DF) 而不是 DF$Name 吗?
  • 在底层,lapply 函数仍然是 SparkR 1.4 的一部分,但目前它不是全局函数,我不知道为什么。你也应该看看地图功能。

标签: r sparkr


【解决方案1】:

使用 flatMap,您可以从 DataFrame 创建一个 RDD,并将该函数应用于所有项目。

c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c))
myFunc <- function(x) { paste(x , "_hello")}
d <- flatMap(df, myFunc)
e <- createDataFrame(sqlContext, d)

然而,缺点是仅在 DataFrame 的第一列上执行您期望的操作,它会跳过所有其他列。这在以下示例中可见:

c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c,u=c(1,2,3)))
myFunc <- function(x) { paste(x , "_hello")}
d <- flatMap(df, myFunc)
e <- createDataFrame(sqlContext, d)

它给出了与第一个示例完全相同的输出,但是 df 以一个额外的列开始。

【讨论】:

  • 看来flatMap在2.1中没有导出,我只好:SparkR:::flatMap(...)
【解决方案2】:

我玩了很多次,但没有一个干净的解决方案可以将该函数直接应用于列元素,坦率地说,我不确定这目前是否可行。尽管如此,使用 COLLECT 方法我们可以执行以下操作:

注意我使用的是 Windows 并输入 powershell

cd D:\Spark\spark-1.4.1-bin-hadoop2.6
./bin/sparkR
c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c))
c1 <- collect(df)
myFunc <- function(x) { paste(x , "_hello")}
d <- lapply(c1, myFunc)
df2 <- createDataFrame(sqlContext, as.data.frame(d))
head(df2)

生成您在 R 中打印的内容: 1个_你好 2 b _你好 3 c _你好

这里是有用的资源:

https://spark.apache.org/docs/latest/api/R/index.html

https://spark.apache.org/docs/latest/sparkr.html

https://databricks.com/blog/2015/08/12/from-pandas-to-apache-sparks-dataframe.html

【讨论】:

  • 这种方法是串行的。在数据帧上调用 collect 意味着这不会在执行者之间分配工作。
【解决方案3】:

Spark 2.x 现在有一个名为 dapply 的函数,它允许您在 SparkR 数据帧的每个分区上运行 R 函数。

来自文档的代码示例:

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300

请参阅此处了解更多信息: http://spark.apache.org/docs/latest/sparkr.html#run-a-given-function-on-a-large-dataset-using-dapply-or-dapplycollect

请注意,如果您使用任何外部 R 库,则需要在工作节点上安装这些库

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-16
    • 1970-01-01
    • 2016-09-21
    相关资源
    最近更新 更多