【问题标题】:Why is Spark loading unnecessary data while reading compressed json files?为什么 Spark 在读取压缩的 json 文件时会加载不必要的数据?
【发布时间】:2020-08-28 12:57:01
【问题描述】:

我正在使用 pyspark 处理来自 s3 的日志文件并根据日期过滤它们。 它们由按年/月/日分区的压缩 json 文件组成,如下所示:
s3://bucket/logs/YYYY/MM/DD/<hash>.json.gz

由于分区不遵循 HDFS 分区语法 (year=YYYY/month=MM/day=DD),我正在读取整个文件夹并使用 input_file_name 和正则表达式创建列:

df = spark.read.option("compression", "gzip").text("s3a://bucket/logs/*/*/*/*.json.gz")
df = df.withColumn("path_file", input_file_name())
df = df.withColumn("logstash_date", regexp_extract(col('path_file'), r"(?:s3a:\/\/bucket\/logs\/)(\d{04}\/\d{02}\/\d{02})", 1))
df = df.withColumn("logstash_date", regexp_replace(col('logstash_date'), '/', '-').cast('date'))

df = df.filter(col("logstash_date") >= from_date.date())
# later on uses from_json to parse schema, apply more filters and do a join (to deduplicate logs)

如果使用 HDFS 语法对日志进行分区,spark 将能够在不读取实际数据的情况下过滤它们。
但即使我不使用数据本身,火花似乎还是会读到它。
在此处显示来自 UI 的信息:

逻辑计划似乎很完美,过滤了Project [value#0 AS raw_data#15, path_file#2]之前的日期:

== Parsed Logical Plan ==
'InsertIntoHadoopFsRelationCommand s3a://<REDACTED>, false, ['event_day], Parquet, Map(basePath -> s3a://<REDACTED>, path -> s3a://<REDACTED>), Append, [id, client_id, somos_id, user_id, session_id, user_agent, method, status, path, timestamp, message, controller, action, facility, raw_data, event_day, generated_at]
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81, 2020-05-12T05:31:32.820718+00:00 AS generated_at#149]
   +- Repartition 10, false
      +- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81]
         +- Filter isnull(id#98)
            +- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81, id#98]
               +- Join LeftOuter, (id#22 = id#98)
                  :- Filter (((((facility#35 = rails-production) && NOT (controller#33 = PingController)) && isnotnull(path#30)) && (isnotnull(user_id#25) && isnotnull(timestamp#65))) && (((isnotnull(method#28) && NOT (method#28 = HEAD)) && NOT client_id#23 LIKE converge_%) && (NOT controller#33 LIKE Hotsite::% && NOT message#32 LIKE somos_id%)))
                  :  +- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, to_date('timestamp, None) AS event_day#81]
                  :     +- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, to_timestamp('timestamp, None) AS timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15]
                  :        +- Project [data#18.id AS id#22, data#18.client_id AS client_id#23, data#18.somos_id AS somos_id#24, data#18.user_id AS user_id#25, data#18.session_id AS session_id#26, data#18.user_agent AS user_agent#27, data#18.method AS method#28, data#18.status AS status#29, data#18.path AS path#30, data#18.timestamp AS timestamp#31, data#18.message AS message#32, data#18.controller AS controller#33, data#18.action AS action#34, data#18.facility AS facility#35, raw_data#15]
                  :           +- Project [raw_data#15, path_file#2, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), raw_data#15, Some(GMT)) AS data#18]
                  :              +- Project [value#0 AS raw_data#15, path_file#2]
                  :                 +- Project [value#0, path_file#2]
                  :                    +- Filter (logstash_date#9 >= 18392)
                  :                       +- Project [value#0, path_file#2, cast(regexp_replace(logstash_date#5, /, -) as date) AS logstash_date#9]
                  :                          +- Project [value#0, path_file#2, regexp_extract(path_file#2, (?:s3a:\/\/bucket\/logs\/)(\d{04}\/\d{02}\/\d{02}), 1) AS logstash_date#5]
                  :                             +- Project [value#0, input_file_name() AS path_file#2]
                  :                                +- Relation[value#0] text
                  +- Project [id#98]
                     +- Filter (event_day#114 >= 18392)
                        +- Relation[id#98,client_id#99,somos_id#100,user_id#101,session_id#102,user_agent#103,method#104,status#105,path#106,timestamp#107,message#108,controller#109,action#110,facility#111,raw_data#112,generated_at#113,event_day#114] parquet

但在优化的逻辑计划中,这两个步骤是连在一起的:

== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand s3a://<REDACTED>, false, [event_day#81], Parquet, Map(basePath -> s3a://<REDACTED>, path -> s3a://<REDACTED>), Append, [id, client_id, somos_id, user_id, session_id, user_agent, method, status, path, timestamp, message, controller, action, facility, raw_data, event_day, generated_at]
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81, 2020-05-12T05:31:32.820718+00:00 AS generated_at#149]
   +- Repartition 10, false
      +- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81]
         +- Filter isnull(id#98)
            +- Join LeftOuter, (id#22 = id#98)
               :- Project [jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).id AS id#22, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).client_id AS client_id#23, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).somos_id AS somos_id#24, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).user_id AS user_id#25, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).session_id AS session_id#26, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).user_agent AS user_agent#27, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).method AS method#28, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).status AS status#29, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).path AS path#30, cast(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).timestamp as timestamp) AS timestamp#65, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).message AS message#32, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).controller AS controller#33, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).action AS action#34, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).facility AS facility#35, value#0 AS raw_data#15, cast(cast(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).timestamp as timestamp) as date) AS event_day#81]
               :  +- Filter (((((((((((cast(regexp_replace(regexp_extract(path_file#2, (?:s3a:\/\/bucket\/logs\/)(\d{04}\/\d{02}\/\d{02}), 1), /, -) as date) >= 18392) && (jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).facility = rails-production)) && NOT (jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).controller = PingController)) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).path)) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).user_id)) && isnotnull(cast(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).timestamp as timestamp))) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).method)) && NOT (jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).method = HEAD)) && NOT jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).client_id LIKE converge_%) && NOT StartsWith(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).controller, Hotsite::)) && NOT jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).message LIKE somos_id%)
               :     +- Project [value#0, input_file_name() AS path_file#2]
               :        +- Relation[value#0] text
               +- Project [id#98]
                  +- Filter ((isnotnull(event_day#114) && (event_day#114 >= 18392)) && isnotnull(id#98))
                     +- Relation[id#98,client_id#99,somos_id#100,user_id#101,session_id#102,user_agent#103,method#104,status#105,path#106,timestamp#107,message#108,controller#109,action#110,facility#111,raw_data#112,generated_at#113,event_day#114]

物理计划执行与过滤器相同的步骤读取raw_data。

有什么办法可以避免这种情况吗?
我以前有过在 python 中过滤文件名并将一个巨大的文件路径列表/生成器传递给 spark.read() 的经验,这也非常慢。

提前致谢。

ps: 一些可能相关的信息:

  • 我每天都在做这项工作
  • 上周有 40GB 的日志文件
  • 自 1 月 1 日起为 410GB
  • 运行 2 个执行器,每个执行器 7 个 CPU

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    Spark predicate push down 仅适用于 Parquet 格式,这就是 Spark 读取所有数据然后对非 Parquet 文件应用过滤器的原因。

    有什么办法可以避免吗?

    您可以在阅读之前确定需要阅读的文件列表,然后使用 basePath 选项,例如

    spark_session.read.option("basePath",base_path).json(list_of_file_paths)
    

    【讨论】:

    • 这可能是一个解决方案,我在以前的情况下使用过它,但不幸的是传递相对较大 (>50k) 的文件列表也很慢
    猜你喜欢
    • 1970-01-01
    • 2016-10-04
    • 2014-12-07
    • 1970-01-01
    • 2017-03-04
    • 1970-01-01
    • 1970-01-01
    • 2010-12-18
    • 1970-01-01
    相关资源
    最近更新 更多