【问题标题】:Spark randomSplit - inconsistent results for every runSpark randomSplit - 每次运行的结果不一致
【发布时间】:2020-04-03 04:04:38
【问题描述】:

我正在尝试将数据集拆分为训练和非训练使用

inDataSet.randomSplit(weights.toArray, 0)

每次运行,我都会得到不同的结果。这是预期的吗?如果是这样,我怎样才能每次都获得相同百分比的行?

例如:Training Offer 随机拆分的权重是:ArrayBuffer(0.3, 0.7) - 为此我总共有 72 行,权重为 0.3,我期待大约 21 行。有时我会得到 23、29、19、4。请指导。

注意:我给出的总权重为 1.0 (0.3 + 0.7) 不归一化。

--另一个问题很有用,但那是在单次执行中。我正在运行我的测试 N 次,每次我得到不同的结果集。

【问题讨论】:

  • 这是由于拆分在 Spark 中的执行方式,如上述评论中问题的答案所示。不保证每个部分的行数。这里的答案可以让您了解如何始终获得相同的行数:stackoverflow.com/questions/44135610/…
  • 我查看了其他答案,但我的问题是,对于每次运行,它应该在每个数据集中获得相同的加权行数。不一样..
  • 问题在于 Spark 如何划分行。它为每一行计算一个介于 0 和 1 之间的随机数,在这种情况下,如果该数字低于 0.3,则它在第一组中,否则在第二组中。这当然会给每次运行的组提供不同的大小(因为使用了随机数)。我链接到的第二个问题包含有关如何每次获得相同行数的信息。
  • 谢谢,我明白你的意思了!我在我的代码中实现了类似的逻辑,但是当我后面有“过滤器”和“联合”时,效率不高。我必须将数据集组合起来,以便在整个数据集中进行其他一些 % 操作。请参考我的实现..

标签: apache-spark


【解决方案1】:

我输入的一种可能的实现(类似于第二条评论中的链接):

    def doTrainingOffer(inDataSet: Dataset[Row],
                      fieldName: String,
                      training_offer_list: List[(Long, Long, Int, String, String)]):
  (Dataset[Row], Option[Dataset[Row]]) = {
    println("Doing Training Offer!")

    val randomDs = inDataSet
              .withColumn("row_id", rank().over(Window.partitionBy().orderBy(fieldName)))
              .orderBy(rand)

    randomDs.cache()
    val count = randomDs.count()
    println(s"The total no of rows for this use case is: ${count}")

    val trainedDatasets = new mutable.ArrayBuffer[Dataset[Row]]()
    var startPos = 0l
    var endPos = 0l
    for (trainingOffer <- training_offer_list) {
      val noOfRows = scala.math.round(count * trainingOffer._3 / 100.0)
      endPos += noOfRows
      println(s"for training offer id: ${trainingOffer._1} and percent of ${trainingOffer._3}, the start and end are ${startPos}, ${endPos}")
      trainedDatasets += addTrainingData(randomDs.where(col("row_id") > startPos && col("row_id") <= endPos), trainingOffer)
      startPos = endPos
    }

    val combinedDs = trainedDatasets.reduce(_ union _)
    // (left over for other offer, trainedOffer)
    (randomDs.join(combinedDs, Seq(field_name), "left_anti"), Option(combinedDs))
  }

还有另一种可能的实现方式::

val randomDs = inDataSet.orderBy(rand)
    randomDs.cache()
    val count = randomDs.count()
    println(s"The total no of rows for this use case is: ${count}")
    val trainedDatasets = new mutable.ArrayBuffer[Dataset[Row]]()

    for (trainingOffer <- training_offer_list) {
      if (trainedDatasets.length > 1) {
        val combinedDs = trainedDatasets.reduce(_ union _)
        val remainderDs = randomDs.join(combinedDs, Seq(field_name), "left_anti")
        trainedDatasets += addTrainingData(remainderDs.limit(scala.math.round(count * trainingOffer._3 / 100)), trainingOffer)
      }
      else if (trainedDatasets.length == 1) {
        val remainderDs = randomDs.join(trainedDatasets(0), Seq(field_name), "left_anti")
        trainedDatasets += addTrainingData(remainderDs.limit(scala.math.round(count * trainingOffer._3 / 100)), trainingOffer)
      }
      else {
        val tDs = randomDs.limit(scala.math.round(count * trainingOffer._3 / 100))
        trainedDatasets += addTrainingData(tDs, trainingOffer)
      }
    }

    val combinedDs = trainedDatasets.reduce(_ union _)
    // (left over for other offer, trainedOffer)
    (randomDs.join(combinedDs, Seq(field_name), "left_anti"), Option(combinedDs))

【讨论】:

    【解决方案2】:

    如果使用参数seed=1234,可以产生一致的结果。使用 dataframe.cache() 函数也可以。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-08-12
      • 2017-06-10
      • 2017-04-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多