【问题标题】:Splitting / Grouping DataFrame based on a date / time difference根据日期/时间差异拆分/分组DataFrame
【发布时间】:2022-01-15 18:44:30
【问题描述】:

我在数据框中有以下行:

sender receiver bytes timestamp
A B 50 2147483647
C D 100 2147483648
A B 150 2147483657
C D 200 2147483658
A B 550 2147487657

该数据帧中的每条记录/行都包含在 10 秒时间窗口内发送方和接收方之间发送的数据量。时间戳标记了各个时间窗口的开始时间。

现在,我想计算“流”中每对发送方和接收方之间的数据量。 对于流,我的意思是数据在发送者和接收者之间不断传输。 如果较长时间(比如 1 小时)没有数据传输,我希望拆分流。在上面的例子中,我想得到:

  • flow_AB_1 = 200 字节
  • flow_CD_1 = 300 字节
  • flow_AB_2 = 550 字节

flow_AB_2 将是一个单独的流,因为2147487657 - 2147483657 = 4000 大于3600

有没有办法通过 pyspark/Apache Spark 实现这一点?

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    要解决您的问题,您可以:

    • 使用基于this blog post中描述的Spark窗口的会话算法创建一个新列flow
    • flowsenderreceiver 组合bytes
    • (可选)通过连接 senderreceiverflow 列来构建流的名称

    完整的代码如下:

    from pyspark.sql import functions as F
    from pyspark.sql import Window
    
    window = Window.partitionBy('sender', 'receiver').orderBy('timestamp')
    
    result = dataframe \
        .withColumn('flow_split', F.when(F.col('timestamp') - F.lag('timestamp').over(window) > 3600, F.lit(1)).otherwise(F.lit(0))) \
        .withColumn('flow', F.sum('flow_split').over(window)) \
        .groupby('sender', 'receiver', 'flow') \
        .agg(F.sum('bytes').alias('bytes')) \
        .select(
          F.concat(F.lit('flow_'), F.col('sender'), F.col('receiver'), F.lit('_'), F.col('flow') + 1).alias('flow'),
          F.col('bytes')
        )
    

    使用您问题中的输入数据框,您将得到以下结果:

    +---------+-----+
    |flow     |bytes|
    +---------+-----+
    |flow_AB_1|200  |
    |flow_AB_2|550  |
    |flow_CD_1|300  |
    +---------+-----+
    

    【讨论】:

      猜你喜欢
      • 2021-08-16
      • 2020-11-02
      • 2015-09-03
      • 2022-01-20
      • 1970-01-01
      • 2020-02-23
      • 1970-01-01
      • 2019-07-19
      • 2020-10-02
      相关资源
      最近更新 更多