【问题标题】:Spark Streaming - Same processing time for 4 cores and 16 cores. Why?Spark Streaming - 4 核和 16 核的处理时间相同。为什么?
【发布时间】:2015-12-09 23:22:23
【问题描述】:

场景:我正在使用火花流进行一些测试。大约 100 条记录的文件每 25 秒进入一次。

问题:在程序中使用 local[*] 处理 4 核 pc 平均需要 23 秒。当我将相同的应用程序部署到具有 16 个内核的服务器时,我期望处理时间有所改善。但是,我看到它在 16 个内核中仍然需要相同的时间(还检查了 ubuntu 中的 cpu 使用情况,并且 cpu 正在被充分利用)。所有配置均由 spark 默认提供。

问题: 处理时间不应该随着流作业可用内核数量的增加而减少吗?

代码:

  val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName(this.getClass.getCanonicalName)
  .set("spark.hadoop.validateOutputSpecs", "false")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(25))
val sqc = new SQLContext(sc)

val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)
jsonBuilder.append("[")
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
  .foreachRDD { rdd =>
  if (!rdd.partitions.isEmpty) {

    val header = rdd.first().split(",")
    val rowsWithoutHeader = Utils.dropHeader(rdd)
rowsWithoutHeader.foreach { row =>
      jsonBuilder.append("{")
      val singleRowArray = row.split(",")
      (header, singleRowArray).zipped
        .foreach { (x, y) =>
        jsonBuilder.append(convertToStringBasedOnDataType(x, y))
        // GEO Hash logic here
        if (x.equals("GPSLat") || x.equals("Lat")) {
          lattitude = y.toDouble
        }
        else if (x.equals("GPSLon") || x.equals("Lon")) {
          longitude = y.toDouble
          if (x.equals("Lon")) {
            // This section is used to convert GPS Look Up to GPS LookUP with Hash
            jsonBuilder.append(convertToStringBasedOnDataType("geoCode", GeoHash.encode(lattitude, longitude)))
          }
          else {
            val selectedRow = broadcastTable.value
              .filter("geoCode LIKE '" + GeoHash.subString(lattitude, longitude) + "%'")
              .withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
              .orderBy("Distance")
              .select("TrackKM", "TrackName").take(1)
            if (selectedRow.length != 0) {
              jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", selectedRow(0).get(0)))
              jsonBuilder.append(convertToStringBasedOnDataType("TrackName", selectedRow(0).get(1)))
            }
            else {
              jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", "NULL"))
              jsonBuilder.append(convertToStringBasedOnDataType("TrackName", "NULL"))
            }
          }
        }
      }
      jsonBuilder.setLength(jsonBuilder.length - 1)
      jsonBuilder.append("},")
    }
  sc.parallelize(Seq(jsonBuilder.toString)).repartition(1).saveAsTextFile("hdfs://localhost:9000/outputDirectory")

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-streaming


    【解决方案1】:

    听起来您只使用一个线程,应用程序是在 4 核还是 16 核的机器上运行并不重要。

    听起来好像有 1 个文件进来了,1 个文件是 1 个 100 行的 RDD 分区。您遍历该 RDD 中的行并附加 jsonBuilder。最后调用repartition(1),这将使文件的写入成为单线程。

    您可以在获取文件后将数据集修复为 12 个 RDD 分区,以确保其他线程在行上工作。但除非我错过了什么,否则你很幸运这不会发生。如果两个线程同时调用jsonBuilder.append("{") 会发生什么?他们不会创建无效的 JSON。我可能在这里遗漏了一些东西。

    您可以通过添加如下日志来测试我对您的应用程序的单线程性是否正确:

    scala> val rdd1 = sc.parallelize(1 to 10).repartition(1)
    rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at repartition at <console>:21
    
    scala> rdd1.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
    Executor task launch worker-40 => 1
    Executor task launch worker-40 => 2
    Executor task launch worker-40 => 3
    Executor task launch worker-40 => 4
    Executor task launch worker-40 => 5
    Executor task launch worker-40 => 6
    Executor task launch worker-40 => 7
    Executor task launch worker-40 => 8
    Executor task launch worker-40 => 9
    Executor task launch worker-40 => 10
    
    scala> val rdd3 = sc.parallelize(1 to 10).repartition(3)
    rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at repartition at <console>:21
    
    scala> rdd3.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
    Executor task launch worker-109 => 1
    Executor task launch worker-108 => 2
    Executor task launch worker-95 => 3
    Executor task launch worker-95 => 4
    Executor task launch worker-109 => 5
    Executor task launch worker-108 => 6
    Executor task launch worker-108 => 7
    Executor task launch worker-95 => 8
    Executor task launch worker-109 => 9
    Executor task launch worker-108 => 10
    

    【讨论】:

    • 你的假设是正确的。我尝试重新分区数据并且输出都搞砸了。大约 4 秒后获得输出(它没有用,因为 JSON 输出不正确)。我相信我的用例不符合 spark 的用法。感谢您的评论。
    • 您仍然可以使用 Spark,您只需为并行处理构建稍微不同的程序。例如,将每个项目映射到一个 JSON 字符串,并在最后将它们合并到一个文件中。 map 部分可以并行发生,合并和文件写入可以是单线程的。询问有关如何使用 Spark 执行此操作的新问题,并链接到此问题以获取背景信息。然后我或其他人可以提出解决方案,
    • 我尝试按照您提到的方式提出问题,但是答案并没有像我预期的那样产生结果。请找到讨论 (stackoverflow.com/questions/32753010/…) @Patrick McGloin
    • 我尝试在 textfilestreams 中重新分区,但我遇到了另一个问题。我无法访问分区内的广播变量。问题详情在link
    猜你喜欢
    • 2016-10-17
    • 2019-06-25
    • 2017-06-24
    • 2017-05-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-31
    • 2023-03-28
    相关资源
    最近更新 更多