【发布时间】:2016-08-21 01:41:57
【问题描述】:
我是 spark 新手,有一个使用 Spark SQL/hiveContext 的简单 spark 应用程序:
- 从 hive 表中选择数据(10 亿行)
- 做一些过滤、聚合,包括row_number over window函数来选择第一行、group by、count()和max()等。
- 将结果写入 HBase(亿行)
我提交作业以在纱线集群(100 个执行程序)上运行它,它很慢,当我查看 Spark UI 中的 DAG 可视化时,似乎只有 hive 表扫描任务并行运行,其余步骤 #2 ,而上面的#3 只在一个实例中运行,它可能应该能够优化为并行化?
应用程序看起来像:
第 1 步:
val input = hiveContext
.sql(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
, row_number() over
(partition by user_id, address, phone_number, first_name, last_name order by user_id, address, phone_number, first_name, last_name, server_ts desc, age) AS rn
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
table
WHERE
phone_number <> '911' AND
server_date >= '2015-12-01' and server_date < '2016-01-01' AND
user_id IS NOT NULL AND
first_name IS NOT NULL AND
last_name IS NOT NULL AND
address IS NOT NULL AND
phone_number IS NOT NULL AND
) all_rows
) all_rows_with_row_number
WHERE rn = 1)
val input_tbl = input.registerTempTable(input_tbl)
第 2 步:
val result = hiveContext.sql(
SELECT state,
phone_number,
address,
COUNT(*) as hash_count,
MAX(server_ts) as latest_ts
FROM
( SELECT
udf_getState(address) as state
, user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
input_tbl ) input
WHERE state IS NOT NULL AND state != ''
GROUP BY state, phone_number, address)
第 3 步:
result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)
如您所见,阶段 0 中的“过滤器”、“项目”和“交换”仅在一个实例中运行,阶段 1 和阶段 2 也是如此,所以如果问题很愚蠢,请提出几个问题和道歉:
- 从每个执行程序进行数据洗牌后,Driver 中是否会发生“Filter”、“Project”和“Exchange”?
- 什么代码映射到“过滤器”、“项目”和“交换”?
- 如何并行运行“Filter”、“Project”和“Exchange”来优化性能?
- 是否可以同时运行 stage1 和 stage2?
【问题讨论】:
-
你检查过 hbase 连接器允许下推谓词吗?如果是这样,您可以让 HBase 帮助您至少过滤一些数据,而不是从 HBase 中提取所有数据。主要瓶颈通常是 I/O 和网络。您的代码思想中有些事情不清楚。你的桌子代表什么?它是使用来自 HBase 的数据创建的 DataFrame 吗?你的输入数据呢?恐怕描述有点笼统。您愿意复习一下您的问题吗?
-
@eliasah,感谢 cmets。数据从 Hive 中提取并存储到 Hbase。同意瓶颈是 I/O 和网络,特别是有很多 shuffle - 2TB 输入数据和 40GB shuffle 写入。我了解到,洗牌越少越好,但是,洗牌也与输入数据的大小有关?如果是这样,我想知道什么样的比例(洗牌/输入)是一个好的比例?
标签: sql apache-spark parallel-processing apache-spark-sql hadoop-yarn