【问题标题】:PySpark Windows function (lead,lag) in Synapse WorkspaceSynapse Workspace 中的 PySpark Windows 函数(超前、滞后)
【发布时间】:2022-01-23 04:28:07
【问题描述】:

场景:

  • 票证有 StartDateEndDate ,如果存在 StartDateEndDate,则创建一个新的数据框,如下面的所需输出所示。

Pyspark 数据集如下所示

#base Schema for Testing purpose
#Dataset

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

#Create User defined Custom Schema using StructType
schema = StructType([ StructField('CaseNumber', StringType(), True)\
                       ,StructField('StartTime', StringType(), True)\
                       ,StructField('EndTime', StringType(), True)])

data = [
        {"CaseNumber": 'Ticket1', "StartTime": '1/22/19 10:00', "EndTime": ''},
        {"CaseNumber": 'Ticket1', "StartTime": '', "EndTime": '1/23/19 11:00'},
        {"CaseNumber": 'Ticket1', "StartTime": '1/25/19 7:00', "EndTime": ''},
        {"CaseNumber": 'Ticket1', "StartTime": '1/27/19 3:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '1/29/19 10:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '2/23/19 2:00'},
        {"CaseNumber": 'Ticket2', "StartTime": '3/25/19 7:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 8:00'},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 10:00'},
        {"CaseNumber": 'Ticket3', "StartTime": '4/25/19 1:00', "EndTime": ''}
        ]

from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
    .master('local[1]') \
    .appName('SparkByExamples.com') \
    .getOrCreate()

# Creation of a dummy dataframe:
df1 = spark.createDataFrame(data,schema=schema)

df1.show()

已创建数据集:

+----------+-------------+-------------+
|CaseNumber|    StartTime|      EndTime|
+----------+-------------+-------------+
|   Ticket1|1/22/19 10:00|          NaN|
|   Ticket1|          NaN|1/23/19 11:00|
|   Ticket1| 1/25/19 7:00|          NaN|
|   Ticket1| 1/27/19 3:00|          NaN|
|   Ticket2|1/29/19 10:00|          NaN|
|   Ticket2|          NaN| 2/23/19 2:00|
|   Ticket2| 3/25/19 7:00|          NaN|
|   Ticket2|          NaN| 3/27/19 8:00|
|   Ticket2|          NaN|3/27/19 10:00|
|   Ticket3| 4/25/19 1:00|          NaN|
+----------+-------------+-------------+

所需的输出应该是:

+----------+-------------+-------------+
|CaseNumber|    StartTime|      EndTime|
+----------+-------------+-------------+
|   Ticket1|1/22/19 10:00|1/23/19 11:00|
|   Ticket2|1/29/19 10:00| 2/23/19 2:00|
|   Ticket2| 3/25/19 7:00| 3/27/19 8:00|
+----------+-------------+-------------+

应用Lead函数查看,票证是否存在endtime

from pyspark.sql.window import Window
import pyspark.sql.functions as psf

windowSpec = Window.partitionBy("CaseNumber").orderBy("CaseNumber")
df = df1.withColumn("lead",lead("EndTime",1).over(windowSpec))
df.show()

pysparkdf = df.toPandas()

import pandas as pd 
tickets = pysparkdf.groupby('CaseNumber')

def isLeadnull(e): 
    return e['lead'] != None

my_list = []
for i,ticket in tickets:
    for j,e in ticket.iterrows() :
        if  isLeadnull(e):
            my_list.append({'CaseNumber': e['CaseNumber'] ,'Start': e['StartTime'], 'EndTime': e['lead']})
        else:
            print(e['lead'],'Do nothing as condition not met')

这个函数之后的输出是:

[{'CaseNumber': 'Ticket1',
  'Start': '1/22/19 10:00',
  'EndTime': '1/23/19 11:00'},
 {'CaseNumber': 'Ticket1', 'Start': 'NaN', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket1', 'Start': '1/25/19 7:00', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket2',
  'Start': '1/29/19 10:00',
  'EndTime': '2/23/19 2:00'},
 {'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket2', 'Start': '3/25/19 7:00', 'EndTime': '3/27/19 8:00'},
 {'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': '3/27/19 10:00'}]

【问题讨论】:

    标签: python dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    这是一种差距和孤岛问题。您可以通过创建group 列使用条件累积总和来识别“”,然后您可以按CaseNumber + group 分组并聚合最大StartTime 和最小EndTime每组:

    from pyspark.sql import functions as F, Window
    
    # first, convert strings to timestamps and replacing empty strings with nulls
    df1 = df1.withColumn("StartTime", F.to_timestamp("StartTime", "M/dd/yy H:mm")) \
        .withColumn("EndTime", F.to_timestamp("EndTime", "M/dd/yy H:mm")) \
        .replace("", None)
    
    w = Window.partitionBy("CaseNumber").orderBy(F.coalesce("StartTime", "EndTime"))
    
    df2 = df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)) \
        .groupBy("CaseNumber", "group") \
        .agg(F.max("StartTime").alias("StartTime"), F.min("EndTime").alias("EndTime")) \
        .filter(F.col("EndTime").isNotNull()) \
        .drop("group")
    
    df2.show()
    #+----------+-------------------+-------------------+
    #|CaseNumber|          StartTime|            EndTime|
    #+----------+-------------------+-------------------+
    #|   Ticket1|2019-01-22 10:00:00|2019-01-23 11:00:00|
    #|   Ticket2|2019-01-29 10:00:00|2019-02-23 02:00:00|
    #|   Ticket2|2019-03-25 07:00:00|2019-03-27 08:00:00|
    #+----------+-------------------+-------------------+
    

    为了理解逻辑,您可以在分组之前显示中间列:

    df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)).show()
    
    #+----------+-------------------+-------------------+-----+
    #|CaseNumber|          StartTime|            EndTime|group|
    #+----------+-------------------+-------------------+-----+
    #|   Ticket1|2019-01-22 10:00:00|               null|    1|
    #|   Ticket1|               null|2019-01-23 11:00:00|    1|
    #|   Ticket1|2019-01-25 07:00:00|               null|    2|
    #|   Ticket1|2019-01-27 03:00:00|               null|    3|
    #|   Ticket2|2019-01-29 10:00:00|               null|    1|
    #|   Ticket2|               null|2019-02-23 02:00:00|    1|
    #|   Ticket2|2019-03-25 07:00:00|               null|    2|
    #|   Ticket2|               null|2019-03-27 08:00:00|    2|
    #|   Ticket2|               null|2019-03-27 10:00:00|    2|
    #|   Ticket3|2019-04-25 01:00:00|               null|    1|
    #+----------+-------------------+-------------------+-----+
    

    【讨论】:

    • 谢谢你,我的朋友,它就像魅力一样。这太棒了
    猜你喜欢
    • 1970-01-01
    • 2017-06-07
    • 2013-06-20
    • 2018-02-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-07
    • 2015-10-13
    相关资源
    最近更新 更多