【问题标题】:Demultiplexing RDD onto multiple ORC tables将 RDD 解复用到多个 ORC 表上
【发布时间】:2017-01-10 02:06:47
【问题描述】:

我正在尝试将存储在 S3 中的数据作为每行 JSON 的文本文件转换为结构化的列格式,例如 S3 上的 ORC 或 Parquet。

源文件包含多种方案的数据(例如,HTTP 请求、HTTP 响应等),需要将其解析为不同类型的正确 Spark Dataframe。

示例架构:

  val Request = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("requestMethod", StringType),
    StructField("scheme", StringType),
    StructField("host", StringType),
    StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
    StructField("path", StringType),
    StructField("sessionId", StringType),
    StructField("userAgent", StringType)
  ))

  val Response = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("contentType", StringType),
    StructField("contentLength", IntegerType),
    StructField("statusCode", StringType),
    StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
    StructField("responseDuration", DoubleType),
    StructField("sessionId", StringType)
  ))

我让那部分工作正常,但是尝试尽可能高效地将数据写回 S3 似乎是个问题。

我尝试了 3 种方法:

  1. muxPartitions 来自 silex 项目
  2. 缓存已解析的 S3 输入并对其进行多次循环
  3. 使每个方案类型成为 RDD 的单独分区

在第一种情况下,JVM 内存不足,而在第二种情况下,机器磁盘空间不足。

第三个我还没有彻底测试过,但这似乎并没有有效地利用处理能力(因为集群中只有一个节点(这个特定分区所在的节点)实际上会将数据写回到 S3)。

相关代码:

val allSchemes = Schemes.all().keys.toArray

if (false) {
  import com.realo.warehouse.multiplex.implicits._

  val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .flatMuxPartitions(allSchemes.length, data => {
      val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
      data.foreach {
        logItem => {
          val schemeIndex = allSchemes.indexOf(logItem.logType)
          if (schemeIndex > -1) {
            buffers(schemeIndex).append(logItem.row)
          }
        }
      }
      buffers
    })

  allSchemes.zipWithIndex.foreach {
    case (schemeName, index) =>
      val rdd = input(index)

      writeColumnarToS3(rdd, schemeName)
  }
} else if (false) {
  // Naive approach
  val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .persist(StorageLevel.MEMORY_AND_DISK)

  allSchemes.foreach {
    schemeName =>
      val rdd = input
        .filter(x => x.logType == schemeName)
        .map(x => x.row)

      writeColumnarToS3(rdd, schemeName)
  }

  input.unpersist()
} else {
  class CustomPartitioner extends Partitioner {
    override def numPartitions: Int = allSchemes.length
    override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
  }

    val input = readRawFromS3(inputPrefix)
      .map(x => (x.logType, x.row))
      .partitionBy(new CustomPartitioner())
      .map { case (logType, row) => row }
      .persist(StorageLevel.MEMORY_AND_DISK)

    allSchemes.zipWithIndex.foreach {
      case (schemeName, index) =>
        val rdd = input
          .mapPartitionsWithIndex(
            (i, iter) => if (i == index) iter else Iterator.empty,
            preservesPartitioning = true
          )

        writeColumnarToS3(rdd, schemeName)
    }

    input.unpersist()
}

从概念上讲,我认为每个方案类型的代码应该有 1 个输出 DStream,输入 RDD 应该选择“n”将每个已处理的项目放置到正确的 DStream 上(通过批处理以获得更好的吞吐量)。

是否有人对如何实现这一点有任何指示?和/或有没有更好的方法来解决这个问题?

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    鉴于输入是 json,您可以将其读入字符串数据帧(每行都是一个字符串)。然后,您可以从每个 json 中提取类型(使用 UDF 或使用 get_json_object 或 json_tuple 等函数)。

    现在您有两列:类型和原始 json。您现在可以在写入数据帧时使用 partitionBy 数据帧选项。这将为每种类型生成一个目录,并且目录的内容将包括原始 jsons。

    现在您可以使用自己的架构来读取每种类型。

    您也可以使用 RDD 执行类似的操作,使用映射将输入 rdd 转换为一对 rdd,其中键是类型,值是转换为目标模式的 json。然后您可以使用 partitionBy 和 map partition 将每个分区保存到一个文件中,或者您可以使用 reduce by key 写入不同的文件(例如通过使用 key 设置文件名)。

    你也可以看看Write to multiple outputs by key Spark - one Spark job

    请注意,我在这里假设目标是拆分为文件。根据您的具体用例,其他选项可能是可行的。例如,如果您的不同模式足够接近,您可以创建一个包含所有模式的超级模式,并直接从中创建数据框。然后您可以直接处理数据帧,也可以使用数据帧 partitionBy 将不同的子类型写入不同的目录(但这次已经保存到 parquet)。

    【讨论】:

    • 我也想到了使用某种统一的“超级方案”。但最后,这似乎是一个 hack。此外,SparkSQL 本身似乎并不支持 UnionType 或类似的东西。将数据保存为 JSON 不是解决方案,这就是源数据的用途。目标是将源文件拆分为多个单独的列文件,并确保数据是强类型的(允许使用 Presto 等轻松查询)。
    • 然后做映射转换成一个键对,键是类型,值是对象是你最好的选择。然后,您可以按分区和映射分区或使用 reduce by key 直接写入目标文件
    【解决方案2】:

    这是我最终想到的:

    我使用自定义分区器根据他们的方案加上行的哈希码对数据进行分区。

    这里的原因是我们希望能够只处理某些分区,但仍然允许所有节点参与(出于性能原因)。因此,我们不会将数据仅分布在 1 个分区上,而是分布在 X 个分区上(在本例中,X 是节点数乘以 2)。

    然后对于每个方案,我们会修剪不需要的分区,因此我们只会处理我们需要的分区。

    代码示例:

    def process(date : ReadableInstant, schemesToProcess : Array[String]) = {
      // Tweak this based on your use case
      val DefaultNumberOfStoragePartitions = spark.sparkContext.defaultParallelism * 2
    
      class CustomPartitioner extends Partitioner {
        override def numPartitions: Int = schemesToProcess.length * DefaultNumberOfStoragePartitions
        override def getPartition(key: Any): Int = {
          // This is tightly coupled with how `input` gets transformed below
          val (logType, rowHashCode) = key.asInstanceOf[(String, Int)]
          (schemesToProcess.indexOf(logType) * DefaultNumberOfStoragePartitions) + Utils.nonNegativeMod(rowHashCode, DefaultNumberOfStoragePartitions)
        }
    
        /**
          * Internal helper function to retrieve all partition indices for the given key
          * @param key input key
          * @return
          */
        private def getPartitions(key: String): Seq[Int] = {
          val index = schemesToProcess.indexOf(key) * DefaultNumberOfStoragePartitions
          index until (index + DefaultNumberOfStoragePartitions)
        }
    
        /**
          * Returns an RDD which only traverses the partitions for the given key
          * @param rdd base RDD
          * @param key input key
          * @return
          */
        def filterRDDForKey[T](rdd: RDD[T], key: String): RDD[T] = {
          val partitions = getPartitions(key).toSet
          PartitionPruningRDD.create(rdd, x => partitions.contains(x))
        }
      }
    
      val partitioner = new CustomPartitioner()
      val input = readRawFromS3(date)
        .map(x => ((x.logType, x.row.hashCode), x.row))
        .partitionBy(partitioner)
        .persist(StorageLevel.MEMORY_AND_DISK_SER)
    
      // Initial stage: caches the processed data + gets an enumeration of all schemes in this RDD
      val schemesInRdd = input
        .map(_._1._1)
        .distinct()
        .collect()
    
      // Remaining stages: for each scheme, write it out to S3 as ORC
      schemesInRdd.zipWithIndex.foreach {
        case (schemeName, index) =>
          val rdd = partitioner.filterRDDForKey(input, schemeName)
            .map(_._2)
            .coalesce(DefaultNumberOfStoragePartitions)
    
          writeColumnarToS3(rdd, schemeName)
      }
    
      input.unpersist()
    }
    

    【讨论】:

      猜你喜欢
      • 2015-03-17
      • 2016-03-01
      • 1970-01-01
      • 2018-06-10
      • 1970-01-01
      • 2021-03-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多