【发布时间】:2020-06-09 02:41:43
【问题描述】:
我们在 Azure Blob 存储中的一个表中有数据,该表充当数据湖。每 30 分钟摄取一次数据,因此在 UTC 中形成如下时间分区
<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=00
<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=30
<Container>/<TableName>/y=2020/m=02/d=20/h=02/min=00 and so on.
用于捕获数据的文件格式是 orc,并且时间分区内的数据分区大小相同。
我们的用例是使用 Spark (V 2.3) 在 IST 中捕获日级别的数据进行处理。鉴于数据驻留在 UTC 并且用例是在 IST(+5.30 UTC)中处理数据,从 /h=18/min=30(前一天)到 /h=18/min= 总共 48 个时间分区是必不可少的00(第二天)。我们有两种选择
选项 1 为每个时间分区创建数据帧并合并它
df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=18/min=30)
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=19/min=00)
..
df48 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/h=18/min=00) ..
df = df.union(df1)
df = df.union(df2)
..
df = df.union(df48)
对 48 个分区执行此操作将在 df 中生成一整天的数据。
选项 2 在日级别捕获数据并应用一个小时的过滤条件。
df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/).filter(h>=19 or (h=18 and min=30))
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/).filter(h<=17 or (h=18 and min=00))
df = df1.union(df2)
一旦将数据加载到内存中,处理时间是相同的,即约 5 分钟。加载数据所需的时间是瓶颈。选项 1 需要 30 分钟,选项 2 需要 2 分钟才能加载到内存。
在一些博客中,我们看到 Analyzer 每次调用 union 时都会扫描整个先前的数据帧。因此对于 48 个联合,它扫描 1+2+3+47=1128 次。这是指数性能下降的原因吗? Analyzer是做什么的,可以关闭吗?要使读取函数对文件存储上的时间分区数据通用,有什么建议或最佳实践可以采用吗?
【问题讨论】:
标签: apache-spark apache-spark-sql azure-blob-storage apache-spark-2.3