【问题标题】:How to optimize spark sql to run it in parallel如何优化 spark sql 以并行运行
【发布时间】:2016-08-21 01:41:57
【问题描述】:

我是 spark 新手,有一个使用 Spark SQL/hiveContext 的简单 spark 应用程序:

  1. 从 hive 表中选择数据(10 亿行)
  2. 做一些过滤、聚合,包括row_number over window函数来选择第一行、group by、count()和max()等。
  3. 将结果写入 HBase(亿行)

我提交作业以在纱线集群(100 个执行程序)上运行它,它很慢,当我查看 Spark UI 中的 DAG 可视化时,似乎只有 hive 表扫描任务并行运行,其余步骤 #2 ,而上面的#3 只在一个实例中运行,它可能应该能够优化为并行化?

应用程序看起来像:

第 1 步:

val input = hiveContext
  .sql(
     SELECT   
           user_id  
           , address  
           , age  
           , phone_number  
           , first_name  
           , last_name  
           , server_ts   
       FROM  
       (     
           SELECT  
               user_id  
               , address  
               , age  
               , phone_number  
               , first_name  
               , last_name  
               , server_ts   
               , row_number() over 
                (partition by user_id, address,  phone_number, first_name, last_name  order by user_id, address, phone_number, first_name, last_name,  server_ts desc, age) AS rn  
           FROM  
           (  
               SELECT  
                   user_id  
                   , address  
                   , age  
                   , phone_number  
                   , first_name  
                   , last_name  
                   , server_ts  
               FROM  
                   table   
               WHERE  
                   phone_number <> '911' AND   
                   server_date >= '2015-12-01' and server_date < '2016-01-01' AND  
                   user_id IS NOT NULL AND  
                   first_name IS NOT NULL AND  
                   last_name IS NOT NULL AND  
                   address IS NOT NULL AND  
                   phone_number IS NOT NULL AND  
           ) all_rows  
       ) all_rows_with_row_number  
       WHERE rn = 1)

val input_tbl = input.registerTempTable(input_tbl)

第 2 步:

val result = hiveContext.sql(
  SELECT state, 
         phone_number, 
         address, 
         COUNT(*) as hash_count, 
         MAX(server_ts) as latest_ts 
     FROM  
    ( SELECT  
         udf_getState(address) as state  
         , user_id  
         , address  
         , age  
         , phone_number  
         , first_name  
         , last_name  
         , server_ts  
     FROM  
         input_tbl ) input  
     WHERE state IS NOT NULL AND state != ''  
     GROUP BY state, phone_number, address)

第 3 步:

result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)

DAG 可视化如下所示:

如您所见,阶段 0 中的“过滤器”、“项目”和“交换”仅在一个实例中运行,阶段 1 和阶段 2 也是如此,所以如果问题很愚蠢,请提出几个问题和道歉:

  1. 从每个执行程序进行数据洗牌后,Driver 中是否会发生“Filter”、“Project”和“Exchange”?
  2. 什么代码映射到“过滤器”、“项目”和“交换”?
  3. 如何并行运行“Filter”、“Project”和“Exchange”来优化性能?
  4. 是否可以同时运行 stage1 和 stage2?

【问题讨论】:

  • 你检查过 hbase 连接器允许下推谓词吗?如果是这样,您可以让 HBase 帮助您至少过滤一些数据,而不是从 HBase 中提取所有数据。主要瓶颈通常是 I/O 和网络。您的代码思想中有些事情不清楚。你的桌子代表什么?它是使用来自 HBase 的数据创建的 DataFrame 吗?你的输入数据呢?恐怕描述有点笼统。您愿意复习一下您的问题吗?
  • @eliasah,感谢 cmets。数据从 Hive 中提取并存储到 Hbase。同意瓶颈是 I/O 和网络,特别是有很多 shuffle - 2TB 输入数据和 40GB shuffle 写入。我了解到,洗牌越少越好,但是,洗牌也与输入数据的大小有关?如果是这样,我想知道什么样的比例(洗牌/输入)是一个好的比例?

标签: sql apache-spark parallel-processing apache-spark-sql hadoop-yarn


【解决方案1】:

您没有正确阅读 DAG 图 - 每个步骤都使用单个框进行可视化这一事实并不意味着它没有使用多个任务(因此 cores) 来计算该步骤。

您可以通过深入查看显示此阶段的所有任务的阶段视图来查看每个步骤使用了多少任务。

例如,下面是一个与您的类似的 DAG 可视化示例:

您可以看到每个阶段都由“单一”的步骤列描述。

但是如果我们看下表,我们可以看到每个阶段的任务数:

其中一个只使用 2 个任务,但另一个使用 220 个,这意味着数据被分成 220 个分区,并且在有足够可用资源的情况下并行处理分区。

如果您深入到该阶段,您可以再次看到它使用了 220 个任务以及所有任务的详细信息。

只有从磁盘读取数据的任务在图中显示为具有这些“多个点”,以帮助您了解读取了多少文件。

所以 - 正如 Rashid 的回答所建议的那样,检查每个阶段的任务数量。

【讨论】:

  • 只是添加到上面;最好有更少的阶段,因为这表明更少的没有。 “广泛的依赖关系”,因此整个集群的数据移动较少。
  • 谢谢@Tzach Zohar。绝对是在这里学到的新东西!还有更多相关的文章,我可以阅读/学习的幻灯片吗?您的 cmets 已经提供了很多信息,只是想了解更多信息:)
  • 谢谢@sourabh。在将数据写入 HBase 之前,我还尝试使用 result.cache() 和 result.repartition() 来查看它们是否对性能优化有用。我没有看到 cache() 反映在 DAG 图中,但 Repartition 确实在 DAG 中显示为第 2 阶段的框。由于这是一个额外的执行步骤,我想知道是否会对整体性能有好处?
  • 当你缓存缓存的 rdd 应该在 UI 中显示为一个绿点。如果你的数据有一个倾斜的表示或非常少的分区;那么一个执行器必须处理的数据量比其他执行者要多得多;那么重新分区肯定会有所帮助。此外,在某些情况下,您可能会尝试根据某些键重新分区,以便在共同分区的 rdd 之间发生连接。
【解决方案2】:

这并不明显,所以我会在问题中将事情归零。

  1. 计算每个步骤的执行时间。
  2. 如果您的表格是文本格式,第一步可能会很慢,如果数据以 parquet 格式存储在 Hive 中,spark 通常会更好。
  3. 查看您的表是否按 where 子句中使用的列进行分区。
  4. 如果将数据保存到 Hbase 很慢,那么您可能需要预先拆分 hbase 表,因为默认情况下数据存储在单个区域中。
  5. 查看 spark ui 中的阶段选项卡,了解每个阶段启动了多少任务,并查找本地级别的数据,如描述 here

希望您能够将问题归零。

【讨论】:

  • 感谢@Rashid Ali。 1. spark UI中显示每个步骤的执行时间。想必是准确的参考:)
  • 2.它是 orc 文件 3. 它由 where 子句中使用的一些列进行分区,而不是全部:( 4. 我已经按字符预先分割了数据,例如 {SPLITS=> ['a', 'b', 'c' ...]} 因为行键是字符组合,不知道我这样做是否正确。 5. 从 hive 读取数据的任务超过 20k,其他阶段的任务数是 2001 和 64,大概都是这些任务是并行运行的 :) 我想知道如何才能使其更加并行。
  • 哪个步骤花费的时间更长并且需要一些优化,是读取 hive 还是将数据存储到 hbase 中。关于 hbase 拆分,如果表未正确拆分,数据可能会写入一个或更少的区域,这可能是一个瓶颈。拆分是基于键分布数据的键范围。记住大量的任务总是不是好兆头。例如,如果您的数据是分区的,并且查询正在从一个或两个分区读取数据,那么大量任务意味着读取完整的表,这不是预期的。我希望,这会有所帮助。
猜你喜欢
  • 2016-02-27
  • 2023-03-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多