【发布时间】:2017-01-10 02:06:47
【问题描述】:
我正在尝试将存储在 S3 中的数据作为每行 JSON 的文本文件转换为结构化的列格式,例如 S3 上的 ORC 或 Parquet。
源文件包含多种方案的数据(例如,HTTP 请求、HTTP 响应等),需要将其解析为不同类型的正确 Spark Dataframe。
示例架构:
val Request = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("requestMethod", StringType),
StructField("scheme", StringType),
StructField("host", StringType),
StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
StructField("path", StringType),
StructField("sessionId", StringType),
StructField("userAgent", StringType)
))
val Response = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("contentType", StringType),
StructField("contentLength", IntegerType),
StructField("statusCode", StringType),
StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
StructField("responseDuration", DoubleType),
StructField("sessionId", StringType)
))
我让那部分工作正常,但是尝试尽可能高效地将数据写回 S3 似乎是个问题。
我尝试了 3 种方法:
- muxPartitions 来自 silex 项目
- 缓存已解析的 S3 输入并对其进行多次循环
- 使每个方案类型成为 RDD 的单独分区
在第一种情况下,JVM 内存不足,而在第二种情况下,机器磁盘空间不足。
第三个我还没有彻底测试过,但这似乎并没有有效地利用处理能力(因为集群中只有一个节点(这个特定分区所在的节点)实际上会将数据写回到 S3)。
相关代码:
val allSchemes = Schemes.all().keys.toArray
if (false) {
import com.realo.warehouse.multiplex.implicits._
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.flatMuxPartitions(allSchemes.length, data => {
val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
data.foreach {
logItem => {
val schemeIndex = allSchemes.indexOf(logItem.logType)
if (schemeIndex > -1) {
buffers(schemeIndex).append(logItem.row)
}
}
}
buffers
})
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input(index)
writeColumnarToS3(rdd, schemeName)
}
} else if (false) {
// Naive approach
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.foreach {
schemeName =>
val rdd = input
.filter(x => x.logType == schemeName)
.map(x => x.row)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
} else {
class CustomPartitioner extends Partitioner {
override def numPartitions: Int = allSchemes.length
override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
}
val input = readRawFromS3(inputPrefix)
.map(x => (x.logType, x.row))
.partitionBy(new CustomPartitioner())
.map { case (logType, row) => row }
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input
.mapPartitionsWithIndex(
(i, iter) => if (i == index) iter else Iterator.empty,
preservesPartitioning = true
)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
}
从概念上讲,我认为每个方案类型的代码应该有 1 个输出 DStream,输入 RDD 应该选择“n”将每个已处理的项目放置到正确的 DStream 上(通过批处理以获得更好的吞吐量)。
是否有人对如何实现这一点有任何指示?和/或有没有更好的方法来解决这个问题?
【问题讨论】:
标签: apache-spark apache-spark-sql