【发布时间】:2020-04-04 14:03:48
【问题描述】:
我正在 EMR 中运行 Spark 结构化流式传输作业(每天反弹)。执行几个小时后,我的应用程序出现 OOM 错误并被杀死。以下是我的配置和 Spark SQL 代码。 我是 Spark 新手,需要您的宝贵意见。
EMR 有 10 个实例,具有 16 个内核和 64GB 内存。
Spark-提交参数:
num_of_executors: 17
executor_cores: 5
executor_memory: 19G
driver_memory: 30G
Job 正在以 30 秒的间隔从 Kafka 中以微批次的形式读取输入。每批读取的平均行数为 90k。
spark.streaming.kafka.maxRatePerPartition: 4500
spark.streaming.stopGracefullyOnShutdown: true
spark.streaming.unpersist: true
spark.streaming.kafka.consumer.cache.enabled: true
spark.hadoop.fs.s3.maxRetries: 30
spark.sql.shuffle.partitions: 2001
Spark SQL 聚合代码:
dataset.groupBy(functions.col(NAME),functions.window(functions.column(TIMESTAMP_COLUMN),30))
.agg(functions.concat_ws(SPLIT, functions.collect_list(DEPARTMENT)).as(DEPS))
.select(NAME,DEPS)
.map((row) -> {
Map<String, Object> map = Maps.newHashMap();
map.put(NAME, row.getString(0));
map.put(DEPS, row.getString(1));
return new KryoMapSerializationService().serialize(map);
}, Encoders.BINARY());
来自驱动程序的一些日志:
20/04/04 13:10:51 INFO TaskSetManager: Finished task 1911.0 in stage 1041.0 (TID 1052055) in 374 ms on <host> (executor 3) (1998/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1925.0 in stage 1041.0 (TID 1052056) in 411 ms on <host> (executor 3) (1999/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1906.0 in stage 1041.0 (TID 1052054) in 776 ms on <host> (executor 3) (2000/2001)
20/04/04 13:11:04 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
20/04/04 13:11:04 INFO DAGScheduler: Executor lost: 3 (epoch 522)
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3, <host>, 38533, None)
20/04/04 13:11:04 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
20/04/04 13:11:04 INFO YarnAllocator: Completed container container_1582797414408_1814_01_000004 on host: <host> (state: COMPLETE, exit status: 143)
顺便说一句,我在我的 forEachBatch 代码中使用了 collectasList
List<Event> list = dataset.select("value")
.selectExpr("deserialize(value) as rows")
.select("rows.*")
.selectExpr(NAME, DEPS)
.as(Encoders.bean(Event.class))
.collectAsList();
【问题讨论】:
-
谁能告诉我上述方法有什么问题?
-
首先,如果你有 17executors*5 个核心,请确保你的源 kafka 主题中有相同数量的 kafka 分区。其次,如果可以的话,使用 NAME 键写入源 kafka,这意味着 groupby 不会有 shuffle,因为每个键都会进入同一个分区。第三,collect_list 总是非常危险的操作,如果你有数据倾斜(这是因为一个键有最多结果),它很容易导致 OOM。编辑:请从驱动程序和失败的执行程序执行更多日志。
-
我必须考虑一下,但您的问题源于使用编码器。数据集是一个陷阱。如果您可以对其进行返工以使用数据框操作,那么您的状态会好得多。你能提供一些示例输入和输出吗?
标签: java apache-spark apache-spark-sql hadoop-yarn spark-structured-streaming