【问题标题】:Spark: How to transform a RDD to Seq to be used in pipelineSpark:如何将 RDD 转换为 Seq 以在管道中使用
【发布时间】:2015-06-19 10:57:37
【问题描述】:

我想在 MLlib 中使用管道的实现。之前,我有一个 RDD 文件并将其传递给模型创建,但现在要使用管道,应该有 LabeledDocument 的序列要传递给管道。

我的 RDD 创建如下:

val data = sc.textFile("/test.csv");
val parsedData = data.map { line =>
        val parts = line.split(',')
        LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail))
        }.cache()

在管道示例Spark Programming Guide中,管道需要以下数据:

// Prepare training documents, which are labeled.
val training = sparkContext.parallelize(Seq(
  LabeledDocument(0L, "a b c d e spark", 1.0),
  LabeledDocument(1L, "b d", 0.0),
  LabeledDocument(2L, "spark f g h", 1.0),
  LabeledDocument(3L, "hadoop mapreduce", 0.0),
  LabeledDocument(4L, "b spark who", 1.0),
  LabeledDocument(5L, "g d a y", 0.0),
  LabeledDocument(6L, "spark fly", 1.0),
  LabeledDocument(7L, "was mapreduce", 0.0),
  LabeledDocument(8L, "e spark program", 1.0),
  LabeledDocument(9L, "a e c l", 0.0),
  LabeledDocument(10L, "spark compile", 1.0),
  LabeledDocument(11L, "hadoop software", 0.0)))

我需要一种方法将我的 RDD (parsedData) 更改为 LabeledDocuments 序列(如示例中的训练)。

感谢您的帮助。

【问题讨论】:

    标签: scala apache-spark pipeline rdd seq


    【解决方案1】:

    我找到了这个问题的答案。

    我可以通过以下代码将我的 RDD(parsedData)转换为 SchemaRDD,它是 LabeledDocuments 的序列:

    val rddSchema = parsedData.toSchemaRDD;
    

    现在问题变了!我想将新的 rddSchema 拆分为训练 (80%) 和测试 (20%)。如果我使用 randomSplit,它会返回一个 Array[RDD[Row]] 而不是 SchemaRDD

    新问题:如何将Array[RDD[Row]]转换为SchemaRDD -- 或者 -- 如何拆分SchemaRDD,在结果是SchemaRDDs

    【讨论】:

    • 很高兴您找到了答案,您可能希望接受它作为最终答案。如果您有任何其他问题,您可能还想创建一个新线程,因为不鼓励同时提出多个问题。
    【解决方案2】:

    我尝试在 pyspark 中关注-

    def myFunc(s):
        # words = s.split(",")
        s = re.sub("\"", "", s)
        words = [s for s in s.split(",")]
        val = words[0]
        lbl = 0.0
        if val == 4 or val == "4":
            lbl = 0.0
        elif val == 0 or val == "0":
            lbl = 1.0
    
        cleanlbl = cleanLine(words[5], True, val)
        # print "cleanlblcleanlbl ",cleanlbl
        return LabeledPoint(lbl, htf.transform(cleanlbl.split(" ")))
    
    
    sparseList = sc.textFile("hdfs:///stats/training.1600000.processed.noemoticon.csv").map(myFunc)
    
    sparseList.cache()  # Cache data since Logistic Regression is an iterative algorithm.
    
    
    # for data in dataset:
    trainfeats, testfeats = sparseList.randomSplit([0.8, 0.2], 10)
    

    您可以在解析数据的同时进行拆分,您可以根据需要进行破解和更改

    【讨论】:

    • myFunc 返回与 parsedData 相同的数据(目前没有问题)。然后 randomSplit 返回 Array[RDD[Row]] 而不是 SchemaRDD。这是我的问题!
    猜你喜欢
    • 2015-02-27
    • 1970-01-01
    • 1970-01-01
    • 2015-10-28
    • 2016-06-29
    • 2015-11-20
    • 2018-01-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多