【发布时间】:2020-08-28 12:57:01
【问题描述】:
我正在使用 pyspark 处理来自 s3 的日志文件并根据日期过滤它们。
它们由按年/月/日分区的压缩 json 文件组成,如下所示:s3://bucket/logs/YYYY/MM/DD/<hash>.json.gz
由于分区不遵循 HDFS 分区语法 (year=YYYY/month=MM/day=DD),我正在读取整个文件夹并使用 input_file_name 和正则表达式创建列:
df = spark.read.option("compression", "gzip").text("s3a://bucket/logs/*/*/*/*.json.gz")
df = df.withColumn("path_file", input_file_name())
df = df.withColumn("logstash_date", regexp_extract(col('path_file'), r"(?:s3a:\/\/bucket\/logs\/)(\d{04}\/\d{02}\/\d{02})", 1))
df = df.withColumn("logstash_date", regexp_replace(col('logstash_date'), '/', '-').cast('date'))
df = df.filter(col("logstash_date") >= from_date.date())
# later on uses from_json to parse schema, apply more filters and do a join (to deduplicate logs)
如果使用 HDFS 语法对日志进行分区,spark 将能够在不读取实际数据的情况下过滤它们。
但即使我不使用数据本身,火花似乎还是会读到它。
在此处显示来自 UI 的信息:
逻辑计划似乎很完美,过滤了Project [value#0 AS raw_data#15, path_file#2]之前的日期:
== Parsed Logical Plan ==
'InsertIntoHadoopFsRelationCommand s3a://<REDACTED>, false, ['event_day], Parquet, Map(basePath -> s3a://<REDACTED>, path -> s3a://<REDACTED>), Append, [id, client_id, somos_id, user_id, session_id, user_agent, method, status, path, timestamp, message, controller, action, facility, raw_data, event_day, generated_at]
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81, 2020-05-12T05:31:32.820718+00:00 AS generated_at#149]
+- Repartition 10, false
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81]
+- Filter isnull(id#98)
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81, id#98]
+- Join LeftOuter, (id#22 = id#98)
:- Filter (((((facility#35 = rails-production) && NOT (controller#33 = PingController)) && isnotnull(path#30)) && (isnotnull(user_id#25) && isnotnull(timestamp#65))) && (((isnotnull(method#28) && NOT (method#28 = HEAD)) && NOT client_id#23 LIKE converge_%) && (NOT controller#33 LIKE Hotsite::% && NOT message#32 LIKE somos_id%)))
: +- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, to_date('timestamp, None) AS event_day#81]
: +- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, to_timestamp('timestamp, None) AS timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15]
: +- Project [data#18.id AS id#22, data#18.client_id AS client_id#23, data#18.somos_id AS somos_id#24, data#18.user_id AS user_id#25, data#18.session_id AS session_id#26, data#18.user_agent AS user_agent#27, data#18.method AS method#28, data#18.status AS status#29, data#18.path AS path#30, data#18.timestamp AS timestamp#31, data#18.message AS message#32, data#18.controller AS controller#33, data#18.action AS action#34, data#18.facility AS facility#35, raw_data#15]
: +- Project [raw_data#15, path_file#2, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), raw_data#15, Some(GMT)) AS data#18]
: +- Project [value#0 AS raw_data#15, path_file#2]
: +- Project [value#0, path_file#2]
: +- Filter (logstash_date#9 >= 18392)
: +- Project [value#0, path_file#2, cast(regexp_replace(logstash_date#5, /, -) as date) AS logstash_date#9]
: +- Project [value#0, path_file#2, regexp_extract(path_file#2, (?:s3a:\/\/bucket\/logs\/)(\d{04}\/\d{02}\/\d{02}), 1) AS logstash_date#5]
: +- Project [value#0, input_file_name() AS path_file#2]
: +- Relation[value#0] text
+- Project [id#98]
+- Filter (event_day#114 >= 18392)
+- Relation[id#98,client_id#99,somos_id#100,user_id#101,session_id#102,user_agent#103,method#104,status#105,path#106,timestamp#107,message#108,controller#109,action#110,facility#111,raw_data#112,generated_at#113,event_day#114] parquet
但在优化的逻辑计划中,这两个步骤是连在一起的:
== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand s3a://<REDACTED>, false, [event_day#81], Parquet, Map(basePath -> s3a://<REDACTED>, path -> s3a://<REDACTED>), Append, [id, client_id, somos_id, user_id, session_id, user_agent, method, status, path, timestamp, message, controller, action, facility, raw_data, event_day, generated_at]
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81, 2020-05-12T05:31:32.820718+00:00 AS generated_at#149]
+- Repartition 10, false
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81]
+- Filter isnull(id#98)
+- Join LeftOuter, (id#22 = id#98)
:- Project [jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).id AS id#22, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).client_id AS client_id#23, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).somos_id AS somos_id#24, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).user_id AS user_id#25, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).session_id AS session_id#26, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).user_agent AS user_agent#27, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).method AS method#28, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).status AS status#29, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).path AS path#30, cast(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).timestamp as timestamp) AS timestamp#65, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).message AS message#32, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).controller AS controller#33, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).action AS action#34, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).facility AS facility#35, value#0 AS raw_data#15, cast(cast(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).timestamp as timestamp) as date) AS event_day#81]
: +- Filter (((((((((((cast(regexp_replace(regexp_extract(path_file#2, (?:s3a:\/\/bucket\/logs\/)(\d{04}\/\d{02}\/\d{02}), 1), /, -) as date) >= 18392) && (jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).facility = rails-production)) && NOT (jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).controller = PingController)) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).path)) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).user_id)) && isnotnull(cast(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).timestamp as timestamp))) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).method)) && NOT (jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).method = HEAD)) && NOT jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).client_id LIKE converge_%) && NOT StartsWith(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).controller, Hotsite::)) && NOT jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).message LIKE somos_id%)
: +- Project [value#0, input_file_name() AS path_file#2]
: +- Relation[value#0] text
+- Project [id#98]
+- Filter ((isnotnull(event_day#114) && (event_day#114 >= 18392)) && isnotnull(id#98))
+- Relation[id#98,client_id#99,somos_id#100,user_id#101,session_id#102,user_agent#103,method#104,status#105,path#106,timestamp#107,message#108,controller#109,action#110,facility#111,raw_data#112,generated_at#113,event_day#114]
物理计划执行与过滤器相同的步骤读取raw_data。
有什么办法可以避免这种情况吗?
我以前有过在 python 中过滤文件名并将一个巨大的文件路径列表/生成器传递给 spark.read() 的经验,这也非常慢。
提前致谢。
ps: 一些可能相关的信息:
- 我每天都在做这项工作
- 上周有 40GB 的日志文件
- 自 1 月 1 日起为 410GB
- 运行 2 个执行器,每个执行器 7 个 CPU
【问题讨论】:
标签: apache-spark pyspark