【发布时间】:2019-01-05 19:50:43
【问题描述】:
我正在开发 Kafka Spark 流媒体项目。 Spark 流从 Kafka 获取数据。数据为 json 格式。样本输入
{ “表”:“表A”, "Product_ID": "AGSVGF.upf", "file_timestamp": "2018-07-26T18:58:08.4485558Z000000000000000", “hdfs_file_name”:“null_1532631600050”, "Date_Time": "2018-07-26T13:45:01.0000000Z", “用户名”:“UBAHTSD” }
{ “表”:“表B”, "Test_ID": "FAGS.upf", “时间戳”:“2018-07-26T18:58:08.4485558Z000000000000000”, “名称”:“flink”, "时间": "2018-07-26T13:45:01.0000000Z", “ID”:“UBAHTGADSGSCVDGHASD” }
一个 JSON 字符串就是一个消息。 JSON 字符串有 15 种类型,使用表列进行区分。现在我想在 Apache Hive 中保存这 15 种不同的 JSON。所以我创建了一个 dstream 并在表列的基础上过滤了 rdd 并保存到 Hive 中。代码工作正常。但是有些时候它会花很多时间然后激发批处理。我使用spark.streaming.kafka.maxRatePerPartition=10 控制了输入。我已将 rdd 重新分区为 9 个分区,但在 Spark UI 上,它显示未知阶段。
这是我的代码。
val dStream = dataStream.transform(rdd => rdd.repartition(9)).map(_._2)
dStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val sparkContext = rdd.sparkContext
rdd.persist(StorageLevel.MEMORY_AND_DISK)
val hiveContext = getInstance(sparkContext)
val tableA = rdd.filter(_.contains("tableA"))
if (!tableA.isEmpty()) {
HiveUtil.tableA(hiveContext.read.json(tableA))
tableA.unpersist(true)
}
val tableB = rdd.filter(_.contains("tableB"))
if (!tableB.isEmpty()) {
HiveUtil.tableB(hiveContext.read.json(tableB))
tableB.unpersist(true)
}
.....
.... upto 15 tables
....
val tableK = rdd.filter(_.contains("tableK"))
if (!tableB.isEmpty()) {
HiveUtil.tableB(hiveContext.read.json(tableK))
tableB.unpersist(true)
}
}
}
如何优化代码?
谢谢。
【问题讨论】:
-
你想优化什么?使代码更具可扩展性(因为现在看起来您几乎重复了 15 次相同的事情)?
标签: apache-spark optimization apache-kafka spark-streaming