【问题标题】:Optimizing reading data to spark from Azure blob优化读取数据以从 Azure blob 触发
【发布时间】:2020-06-09 02:41:43
【问题描述】:

我们在 Azure Blob 存储中的一个表中有数据,该表充当数据湖。每 30 分钟摄取一次数据,因此在 UTC 中形成如下时间分区

<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=00
<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=30
<Container>/<TableName>/y=2020/m=02/d=20/h=02/min=00 and so on. 

用于捕获数据的文件格式是 orc,并且时间分区内的数据分区大小相同。

我们的用例是使用 Spark (V 2.3) 在 IST 中捕获日级别的数据进行处理。鉴于数据驻留在 UTC 并且用例是在 IST(+5.30 UTC)中处理数据,从 /h=18/min=30(前一天)到 /h=18/min= 总共 48 个时间分区是必不可少的00(第二天)。我们有两种选择

选项 1 为每个时间分区创建数据帧并合并它

df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=18/min=30)
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=19/min=00) 
..
df48 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/h=18/min=00) ..
df = df.union(df1) 
df = df.union(df2) 
..
df = df.union(df48)

对 48 个分区执行此操作将在 df 中生成一整天的数据。

选项 2 在日级别捕获数据并应用一个小时的过滤条件。

df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/).filter(h>=19 or (h=18 and min=30))
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/).filter(h<=17 or (h=18 and min=00))
df = df1.union(df2)

一旦将数据加载到内存中,处理时间是相同的,即约 5 分钟。加载数据所需的时间是瓶颈。选项 1 需要 30 分钟,选项 2 需要 2 分钟才能加载到内存。

在一些博客中,我们看到 Analyzer 每次调用 union 时都会扫描整个先前的数据帧。因此对于 48 个联合,它扫描 1+2+3+47=1128 次。这是指数性能下降的原因吗? Analyzer是做什么的,可以关闭吗?要使读取函数对文件存储上的时间分区数据通用,有什么建议或最佳实践可以采用吗?

【问题讨论】:

    标签: apache-spark apache-spark-sql azure-blob-storage apache-spark-2.3


    【解决方案1】:

    数据帧的联合导致分析器遍历所有前面的数据帧。这主要是从 orc 文件中推断模式,如果不匹配则抛出错误。我们观察到的是每个联合期间的大量文件操作。

    选项 1 由于每个时间分区中有> 200个文件分区,因此分析器的总传递次数为1+2+..+47=1128。乘以 200 是文件打开分析模式关闭操作的数量 = 225,600。这是选项 1 需要 30 分钟的主要原因。

    选项 2 选项 2 执行相同的操作,但在两个大数据帧上。一个前一天(从 18.30 到 23.30)和另一个第二天(从 00.00 到 18.00)。这导致了 22+26=48x200=9,600 文件打开-分析模式-关闭操作。

    为了缓解这种情况,我们指定了模式,而不是依赖于 Spark 的模式推断机制。选项 1 和选项 2 都在指定架构后的 2 分钟内完成。

    学习:如果涉及大量数据集的联合/合并,则依赖 spark 的架构推理机制成本高昂。主要是因为大量的文件操作。如果在之前的操作中已经推断出架构,这可以是 Spark 中的优化以避免再次遍历数据帧。请指定架构以减轻这种情况。

    【讨论】:

      【解决方案2】:

      等一下……这些文件不是有某种命名约定吗?我的意思是,如果文件的名称基本相同,除了小时和分钟。

      像这样:filter(h>=19 or (h=18 and min=30))

      只需使用通配符遍历文件并将所有文件合并到一个数据帧中。

      val df = sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", "false")
          .option("sep", "|")
          .load("mnt/<Container>/<TableName>/y=2020/m=02/d=20/h*.gz")
          .withColumn("file_name", input_file_name())
      

      如果架构不在文件本身中,或者由于某种原因不完整,您可以创建它并覆盖文件中的内容。

      val customSchema = StructType(Array(
          StructField("field1", StringType, true),
          StructField("field2", StringType, true),
          StructField("field3", StringType, true),
          etc.
      
      val df = sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", "false")
          .option("sep", "|")
          .load("mnt/<Container>/<TableName>/y=2020/m=02/d=20/h*.gz")
          .withColumn("file_name", input_file_name())
      

      试试看,看看你会如何相处。

      【讨论】:

        猜你喜欢
        • 2019-01-02
        • 2016-10-12
        • 2018-07-06
        • 2020-11-20
        • 2021-12-26
        • 2021-10-08
        • 1970-01-01
        • 2020-09-27
        • 2021-02-26
        相关资源
        最近更新 更多