【问题标题】:Spark - Statistical calculations sorted by timestamps rangesSpark - 按时间戳范围排序的统计计算
【发布时间】:2017-01-20 09:26:45
【问题描述】:

我正在尝试根据一系列小时和\或天数计算统计量度。

意思是,我有一个类似这样的 CSV 文件:

TRANSACTION_URL    START_TIME        END_TIME           SIZE    FLAG
www.google.com     20170113093210    20170113093210     150      1
www.cnet.com       20170113114510    20170113093210     150      2

START_TIME 和 END_TIME 采用 yyyyMMddhhmmss 格式。

我首先使用以下代码将其转换为yyyy-MM-dd hh:mm:ss 格式:

from_pattern = 'yyyyMMddhhmmss'
to_pattern = 'yyyy-MM-dd hh:mm:ss'

log_df = log_df.withColumn('START_TIME', from_unixtime(unix_timestamp(
    log_df['START_TIME'].cast(StringType()), from_pattern), to_pattern).cast(TimestampType()))

然后,我想使用groupBy() 来计算,例如,基于事务时间框架的 SIZE 列的平均值。

例如,我想做这样的事情:

for all transactions that are between 09:00 to 11:00
    calculate SIZE mean

for all transactions that are between 14:00 to 16:00
    calculate SIZE mean

还有:

for all transactions that are in a WEEKEND date
    calculate SIZE mean

for all transactions that are NOT in a WEEKEND date
    calculate SIZE mean

我知道如何将 groupBy 用于“默认”配置,例如根据 FLAG 列值计算 SIZE 列的统计度量。我正在使用类似的东西:

log_df.cache().groupBy('FLAG').agg(mean('SIZE').alias("Mean"), stddev('SIZE').alias("Stddev")).\
    withColumn("Variance", pow(col("Stddev"), 2)).show(3, False)

所以,我的问题是:

  1. 如何在一个小时范围内实现这样的分组和计算? (第一个伪代码示例)

  2. 如何按日期实现这样的分组和计算? (第二个伪代码示例)

如果是周末约会,有没有可以接收yy-MM-dd并返回true的python包?

谢谢

【问题讨论】:

    标签: python apache-spark group-by pyspark aggregate


    【解决方案1】:

    假设您有一个函数 encode_dates 接收日期并返回您感兴趣的所有时间段的编码序列。例如,对于 9-12 星期二,它将返回 Seq("9-11","10 -12","11-13","工作日")。这将是一个常规的 scala 函数(与 spark 无关)。

    现在您可以将其设为 UDF 并将其添加为列并展开列,这样您将拥有多个副本。现在您需要做的就是为 groupby 添加此列。

    所以它看起来像这样:

    val encodeUDF = udf(encode_dates _)
    log_df.cache().withColumn("timePeriod", explode(encodeUDF($"start_date", $"end_date").groupBy('FLAG', 'timePeriod').agg(mean('SIZE').alias("Mean"), stddev('SIZE').alias("Stddev")).
    withColumn("Variance", pow(col("Stddev"), 2)).show(3, False)
    

    【讨论】:

    • 像你建议的那样编写函数,对于我的场景来说似乎有点过头了——我 CSV 中的所有事务通常最多 1 秒到 10 秒长。也许我可以编写一个函数来接收START_TIMEEND_TIME,并返回类似Morning, Weekend 的内容,但我不知道如何:1。将其应用于“即时”的每笔交易- 2。将其应用于每笔交易后,计算所有记录的均值/STDDEV,例如 Afternoon, Weekend(FLAG 与这些计算无关)
    • 甚至更好 - 将我的函数应用于每个事务,以及不在 MorningAfternoon 时间的每个事务 - 从数据帧中删除 if,例如在 13 中的事务:00-13:01 将被删除(这将使后面的计算在减少的 datafrmae 上工作) - 但是我不知道一旦我将编写这样的函数如何实现这种行为
    • 这是你的业务逻辑。您需要定义一个函数,它说:“给定一个时间段,我想在统计中包含哪些时间段”。一旦你有了这个,你基本上将你的数据相乘以出现在所有时间段中并按它分组。您可能可以通过编写自定义 UDAF 来做一些更高效的事情,但我不确定它最终是否会同样高效。
    • 只是为了确保清楚这里的逻辑是做什么的: 1. 你制作一个函数 f(time)->time period 列表。 2.答案中的代码基本上会将其应用于每笔交易并计算统计信息。是否“即时应用”并不重要。在实践中,您将执行 2 次传递:一次用于计算数据,第二次用于聚合。
    • 好的,但是正如我所提到的,创建一个时间段列表会产生巨大的列表,因为所有事务最多只有 10 秒长,而且我需要几个小时的时间范围。如果我会按照您建议的方向进行,但创建一个函数f(START_TIME, END_TIME),它会输出一个包含 2 个对象的列表,例如Morning, WeekdayNoon, WeekendEvening, Weekday 等并使用建议的explode 方法,它仍然可以解决问题吗?
    猜你喜欢
    • 2021-11-08
    • 1970-01-01
    • 1970-01-01
    • 2015-11-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-28
    • 2015-04-26
    相关资源
    最近更新 更多