【问题标题】:How to efficiently update Impala tables whose files are modified very frequently如何有效地更新文件被频繁修改的 Impala 表
【发布时间】:2020-02-06 08:24:55
【问题描述】:

我们有一个基于 Hadoop 的解决方案 (CDH 5.15),我们在 HDFS 中的某些目录中获取新文件。在这些目录的顶部,我们有 4-5 个 Impala (2.1) 表。在 HDFS 中写入这些文件的过程是 Spark Structured Streaming (2.3.1)

现在,我们在将文件写入 HDFS 后立即运行一些 DDL 查询:

  • ALTER TABLE table1 RECOVER PARTITONS 检测添加到表中的新分区(及其 HDFS 目录和文件)。

  • REFRESH table1 PARTITIONS (partition1=X, partition2=Y),使用每个分区的所有键。

目前,此 DDL 花费的时间有点过长,并且它们正在我们的系统中排队,从而损害了系统的数据可用性。

所以,我的问题是:有没有办法更有效地进行数据整合?

我们考虑过:

  • 使用ALTER TABLE .. RECOVER PARTITONS,但按照the documentation,它只刷新新分区。

  • 尝试一次将REFRESH .. PARTITON ... 与多个分区一起使用,但语句语法不允许这样做。

  • 尝试批处理查询,但 Hive JDBC 驱动器不支持批处理查询。

  • 鉴于系统已经很忙,我们是否应该尝试并行执行这些更新?

  • 您还知道其他方式吗?

谢谢!

维克多

注意:我们知道哪些分区需要刷新的方法是使用 HDFS 事件,就像使用 Spark Structured Streaming 一样,我们不知道文件写入的确切时间。

注意 #2:另外,用 HDFS 编写的文件有时很小,所以如果可以同时合并这些文件,那就太好了。

【问题讨论】:

  • 对您的问题没有答案,抱歉,只是想提一下,新的 Impala 版本添加了“不干涉”元数据管理功能。见impala.apache.org/docs/build/html/topics/impala_metadata.html
  • 谢谢@mazaneicha!这似乎很有希望!我们正计划升级我们的堆栈,所以也许这是另一个原因。
  • 请检查来自 hive 的 msck repair 命令是否有用。

标签: hadoop impala spark-structured-streaming cloudera-cdh


【解决方案1】:

由于似乎没有人知道我的问题的答案,我想分享我们为提高处理效率而采取的方法,非常欢迎 cmets。

我们发现(文档对此不是很清楚)存储在 HDFS 中的 Spark“检查点”中的一些信息是一些元数据文件,描述了每个 Parquet 文件的写入时间和大小:

$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata

w-r--r--   3 hdfs 68K   2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r--  3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r--   3 hdfs 68K   2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...

$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...

所以,我们所做的是:

  • 构建一个轮询 _spark_metadata 文件夹的 Spark Streaming 作业。
    • 我们使用fileStream,因为它允许我们定义要使用的文件过滤器。
    • 该流中的每个条目都是这些 JSON 行之一,对其进行解析以提取文件路径和大小。
  • 按文件所属的父文件夹(映射到每个 Impala 分区)对文件进行分组。
  • 对于每个文件夹:
    • 读取加载目标 Parquet 文件的数据框(以避免与其他作业写入文件的竞争条件)
    • 计算要写入的块数(使用 JSON 中的 size 字段和目标块大小)
    • 将数据帧合并到所需数量的分区并将其写回 HDFS
    • 执行 DDL REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
  • 最后,删除源文件

我们取得的成就是:

  • 通过对每个分区和批次执行一次刷新来限制 DDL。

  • 通过可配置批处理时间和块大小,我们能够使我们的产品适应具有更大或更小数据集的不同部署场景。

  • 解决方案非常灵活,因为我们可以为 Spark Streaming 作业分配更多或更少的资源(执行器、内核、内存等),并且我们可以启动/停止它(使用它自己的检查点系统) .

  • 我们还在研究在执行此过程时应用一些数据重新分区的可能性,以使分区尽可能接近最佳大小。

【讨论】:

  • 你认为使用 KafkaConnect 代替 Spark 也能达到同样的效果吗?在最后一步“最后,删除源文件”中,如何避免删除进来的新文件?
  • 元数据文件流在每个批次中都包含要处理的文件,因此您不必担心可能同时写入的新文件
  • 根据KafkaConnect,我没用过,我去看看。谢谢!
猜你喜欢
  • 1970-01-01
  • 2011-05-09
  • 2011-10-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-03-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多