【问题标题】:collect sparkr into dataframe将 sparkr 收集到数据框中
【发布时间】:2015-09-06 13:06:29
【问题描述】:

我正在将一些数据加载到 sparkR(Spark 版本 1.4.0,在 fedora21 上运行)中,我在其中运行了一些产生三个不同数字的算法。我的算法需要一堆参数,我想在同一数据上运行不同的参数设置。输出格式应该是一个数据框(或 csv 列表),其列是算法参数和我的算法计算的三个数字,即

  mypar1, mypar2, mypar3, myres1, myres2, myres3
  1       1.5     1.2     5.6      8.212  5.9
  2       1.8     1.7     5.1      7.78   8.34

将是两种不同参数设置的输出。 我在下面编写了脚本,该脚本在不同的参数设置上并行运行:它接受一个带有参数值作为参数的输入文件,对于上面的示例,它看起来像这样:

 1,1.5,1.2
 2,1.8,1.7

所以每行一个参数组合。

这是我的问题:不是每个参数设置都有一个,而是将所有数字组合成一个长列表。函数 cv_spark 返回一个 data.frame(基本上是一行)。如何告诉 spark 将 cv_spark 的输出组合到数据帧(即执行 rbind 之类的操作?)或列表列表?

#!/home/myname/Spark/spark-1.4.0/bin/sparkR

library(SparkR)

sparkcontext <- sparkR.init("local[3]","cvspark",sparkEnvir=list(spark.executor.memory="1g"))

cv_spark <- function(indata) {
   cv_params <- strsplit(indata, split=",")[[1]]
   param.par1 = as.integer(cv_params[1])
   param.par2 = as.numeric(cv_params[2])
   param.par3 = as.numeric(cv_params[3])
   predictions <- rep(NA, 1)
   ## here I run some calculation on some data that I load to my SparkR session, 
   ## but for illustration purpose I'm just filling up with some random numbers
   mypred = base:::sample(seq(5,10,by=0.01),3)
   predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3])
   return(as.data.frame(predictions))
}

args <- commandArgs(trailingOnly=TRUE)
print(paste("args ", args))
cvpar = readLines(args[[1]])

rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=4)
myerr <- SparkR:::flatMap(rdd,cv_spark)
output <- SparkR:::collect(myerr)
print("final output")
print(output)

outfile = "spark_output.csv"
write.csv(output,outfile,quote=FALSE,row.names=FALSE)

【问题讨论】:

    标签: r apache-spark sparkr


    【解决方案1】:

    我设法通过使用flatMapValues 而不是flatMap 并通过创建(key, value) 对我的各种参数设置(基本上key 是我的参数输入文件中的行号和value 是参数在那条线上)。然后我打电话给reduceByKey,它基本上每个键都有一行。修改后的脚本如下所示:

    #!/home/myname/Spark/spark-1.4.0/bin/sparkR
    
    library(SparkR)
    
    sparkcontext <- sparkR.init("local[4]","cvspark",sparkEnvir=list(spark.executor.memory="1g"))
    
    cv_spark <- function(indata) {
       cv_params <- unlist(strsplit(indata[[1]], split=","))
       param.par1 = as.integer(cv_params[1])
       param.par2 = as.numeric(cv_params[2])
       param.par3 = as.integer(cv_params[3])
       predictions <- rep(NA, 1)
       ## here I run some calculation on some data that I load to my SparkR session, 
       ## but for illustration purpose I'm just filling up with some random numbers
       mypred = base:::sample(seq(5,10,by=0.01),3)
       predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3])
       return(as.data.frame(predictions))
    }
    
    args <- commandArgs(trailingOnly=TRUE)
    print(paste("args ", args))
    cvpar = readLines(args[[1]])
    ## Creates (key, value) pairs
    cvpar <- Map(list,seq(1,length(cvpar)),cvpar)
    
    rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=1)
    myerr <- SparkR:::flatMapValues(rdd,cv_spark)
    myerr <- SparkR:::reduceByKey(myerr,"c", 2L)
    output <- SparkR:::collect(myerr)
    
    myres <- sapply(output,`[`,2)
    df_res <- do.call("rbind",myres)
    colnames(df_res) <- c("Element","sigdf","sigq","err","err.sse","err.mse")
    
    outfile = "spark_output.csv"
    write.csv(df_res,outfile,quote=FALSE,row.names=FALSE)
    

    这按预期工作,即输出是一个数据帧(或 csv 文件),其行数与上述脚本的输入文件中的行数相同(即不同参数值配置的数量),但也许还有更多有效的方法来做到这一点。

    【讨论】:

    • @Vijay_Shinde ./myexample.R myparameterfile.txt 其中 myexample.R 是上面的脚本。确保在脚本中修复 shebang。 myparameterfile.txt 每行包含 3 个逗号分隔的数字。
    猜你喜欢
    • 2016-04-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多