【问题标题】:PySpark Groupby Filtering with ConditionPySpark Groupby 条件过滤
【发布时间】:2020-10-09 03:12:57
【问题描述】:

我正在处理 pyspark 中的结构化数据框。从 s3 读取 parquet 格式的数据。然后,我想过滤掉一些符合条件的数据。 例如:

Data:
key_1  value rec_date
  A     1    2020-01-01
  A     2    2020-01-02
  A    10    2020-01-03
  B    10    2020-10-10
  B    10    2020-10-11
  B    10    2020-10-12
Filter Condition:
{"A":{'abnormal_daterange':[('2020-01-01', '2020-01-02'), ('2020-02-01', '2020-02-04')]},
"B": {'abnormal_daterange':[('2020-10-10', '2020-10-11')]}
}
Expected result:
key_1  value rec_date
  A    10    2020-01-03
  B    10    2020-10-12

我知道我可以使用@pandas-udf 来创建group by 的函数并使用条件进行过滤,但我想简化代码并且不要过度设计。

任何结构化数据框filter/内置函数都可以有效地过滤掉带有条件的数据吗?

【问题讨论】:

  • 过滤条件有多大?如果它足够小,你可以把它变成一个数据框并使用左反连接。
  • @etherealyn 我已经编辑了过滤条件,可以是多个条件。
  • @Hong 如果dict不是很大,你可以把它转换成一个SQL表达式,包括(AND,OR等)来保存过滤逻辑,然后用这个SQL表达式做df.filter() .

标签: python apache-spark pyspark etl user-defined-functions


【解决方案1】:

假设你有dffilterDf

df = spark.createDataFrame(
    [
        ("A", "1", "2020-01-01"),
        ("A", "2", "2020-01-02"),
        ("A", "10", "2020-01-03"),
        ("B", "10", "2020-10-10"),
        ("B", "10", "2020-10-11"),
        ("B", "10", "2020-10-12")
    ],
    ['key_1', 'value', 'rec_date']
)

filterDf = spark.createDataFrame(
    [
        ("A", "2020-01-01", "2020-01-02"),
        ("B", "2020-10-01", "2020-10-11")
    ],
    ["key", "start_date", "end_date"]
)

然后您可以使用左反连接来过滤您的dffilterDf 的内容:

res = df.join(
    filterDf.hint("broadcast"),
    (col("key_1") == col("key")) & (col("rec_date").between(col("start_date"), col("end_date"))),
    "leftanti"
)

结果将是:

+-----+-----+----------+
|key_1|value|  rec_date|
+-----+-----+----------+
|    A|   10|2020-01-03|
|    B|   10|2020-10-12|
+-----+-----+----------+

之后,您可以应用您的 groupBy 聚合。

【讨论】:

    猜你喜欢
    • 2018-07-22
    • 2020-06-24
    • 1970-01-01
    • 1970-01-01
    • 2019-06-22
    • 2019-09-02
    • 2023-04-02
    • 2022-01-10
    • 2018-08-24
    相关资源
    最近更新 更多