【问题标题】:How to apply a function to each row in SparkR?如何将函数应用于 SparkR 中的每一行?
【发布时间】:2017-07-01 11:44:34
【问题描述】:

我有一个 CSV 格式的文件,其中包含一个包含“id”、“timestamp”、“action”、“value”和“location”列的表格。 我想对表格的每一行应用一个函数,并且我已经在 R 中编写了如下代码:

user <- read.csv(file_path,sep = ";")
num <- nrow(user)
curLocation <- "1"
for(i in 1:num) {
    row <- user[i,]
    if(user$action != "power")
        curLocation <- row$value
    user[i,"location"] <- curLocation
}

R 脚本工作正常,现在我想应用它 SparkR。但是,我无法直接在 SparkR 中访问第 i 行,也找不到任何函数来操作 SparkR documentation 中的每一行。

我应该使用哪种方法来达到与 R 脚本中相同的效果?

另外,根据@chateaur 的建议,我尝试使用 dapply 函数进行如下编码:

curLocation <- "1"
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string"))
setLocation <- function(row, curLoc) {
    if(row$Action != "power|battery|level"){
        curLoc <- row$Value
    }
    row$Location <- curLoc
}
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema)
head(bw)

然后我得到一个错误:

我查看了警告消息条件的长度 > 1,并且只会使用第一个元素,我发现了 https://stackoverflow.com/a/29969702/4942713。这让我想知道 dapply 函数中的 row 参数是否代表我的数据框的整个分区而不是单行?也许 dapply 函数不是一个理想的解决方案?

后来,我尝试按照@chateaur 的建议修改该功能。我没有使用 dapply,而是使用了 dapplyCollect,这样可以节省我指定架构的工作量。有效!

changeLocation <- function(partitionnedDf) {
    nrows <- nrow(partitionnedDf)
    curLocation <- "1"
    for(i in 1:nrows){
        row <- partitionnedDf[i,]
        if(row$action != "power") {
            curLocation <- row$value
        }
    partitionnedDf[i,"location"] <- curLocation
    }
    partitionnedDf
}

bw <- dapplyCollect(user, changeLocation)

【问题讨论】:

  • 您可以使用 sparklyr(与 dplyr 语法相同)
  • @DimitriPetrenko 如果我需要使用 SparkR 怎么办? SparkR能达到效果吗?

标签: r apache-spark sparkr bigdata


【解决方案1】:

蝎子775,

您应该分享您的 sparkR 代码。不要忘记在 R 和 sparkR 中处理数据的方式不同。

发件人:http://spark.apache.org/docs/latest/sparkr.html

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

那么你可以在这里查看 dapply 函数:https://spark.apache.org/docs/2.1.0/api/R/dapply.html

这是一个工作示例:

changeLocation <- function(partitionnedDf) {
    nrows <- nrow(partitionnedDf)
    curLocation <- as.integer(1)

    # Loop over each row of the partitionned data frame
    for(i in 1:nrows){
        row <- partitionnedDf[i,]

        if(row[1] != "power") {
            curLocation <- row[2]
        }
        partitionnedDf[i,3] <- curLocation
    }

    # Return modified data frame
    partitionnedDf
}

# Load data
df <- read.df("data.csv", "csv", header="false", inferSchema = "true")

head(collect(df))

# Define schema of dataframe
schema <- structType(structField("action", "string"), structField("value", "integer"),
                     structField("location", "integer"))

# Change location of each row                    
df2 <- dapply(df, changeLocation, schema)

head(df2)

【讨论】:

  • 我查看了 dapply 函数,发现它用于“将 函数 应用于 SparkDataFrame 的每个分区”。据我了解,partition 的概念与row 无关。我担心的是,我不知道如何编写要应用于 SparkDataFrame 的 函数。目前我只知道如何在 R 中实现我想要的 function 而在 SparkR 中不知道。你能给我一些建议吗?
  • 我不是火花专家,但我认为分区是数据拆分后分布在集群中。你能试试上面的例子,告诉我它是否适合你的需要吗?
  • 感谢您的建议。我尝试按照您的指示进行操作,但遇到了问题中所示的错误。
  • 我编辑了我的帖子,尝试并反馈 :) 我之前的错误是认为在 dapply 函数中我们有行。事实上,我们有一个数据框。我相信 spark 会切割数据帧,将每个部分发送到不同的节点并应用函数(此处为 changeLocation)。如果有人可以确认?
  • 只要我使用 dapplyCollect 函数,它就可以工作。在这种情况下,我不需要指定架构。
猜你喜欢
  • 2023-03-06
  • 1970-01-01
  • 1970-01-01
  • 2013-03-18
  • 2014-04-30
  • 2011-01-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多