【问题标题】:pyspark meandays calculation with respect to pandas code关于熊猫代码的pyspark meandays计算
【发布时间】:2021-01-11 10:13:42
【问题描述】:

#Pandas 代码

temp = df_merge[['subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate']].drop_duplicates()
df_merge['mean_cancelled_sub_duration'] = (temp['cancelleddate']-temp['subscriptionstartdate']).dt.days.dropna().mean()/ 365
df_merge['mean_sub_duration'] = (temp['termenddate']-temp['subscriptionstartdate']).dt.days.dropna().mean()/365``

如何在 pyspark 中实现与 pandas 代码相同的逻辑,虽然我尝试在 pyspark 中这样做但它没有帮助我,我们删除了行并且计算错误:

名称中带有日期的列属于日期类型。

#Failed Pyspark 转换

    temp = df_merge.select('subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate').dropDuplicates()
    temp = temp.withColumn("cancelled_sub_duration", datediff(temp.cancelleddate,temp.subscriptionstartdate)).withColumn("sub_duration", datediff(temp.termenddate,temp.subscriptionstartdate))
    temp = temp.na.drop(subset=['cancelled_sub_duration','sub_duration'])
    spec3 = Window.partitionBy("subscription_id")
    temp = temp.withColumn('mean_cancelled_sub_duration',(mean("cancelled_sub_duration").over(spec3))/365).withColumn('mean_sub_duration',(mean("sub_duration").over(spec3))/365)
    temp = temp.select(col('subscription_id').alias('subsid'), col('mean_cancelled_sub_duration'), col('mean_sub_duration'))
    df_merge = df_merge.join(broadcast(temp), df_merge.subscription_id==temp.subsid,"left").drop(col('subsid'))

【问题讨论】:

    标签: python pandas apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您好,请发布 pandas 代码的预期输出以及您从 pyspark 代码中得到的结果,以便我们评估数据集之间的差异。没有它,很难具体看出什么不起作用,什么是。

    同时,我只是专门查看 pandas 代码并尝试在 pyspark 中为 like 点赞,这就是我想出的。

    temp = temp \
    .withColumn('mean_cancelled_sub_duration' avg(datediff('cancelledate', 'subscriptionstartdate')).over(spec3) / lit(365)) \
    .withColumn('mean_sub_duration', avg(datediff('termenddate', 'subscriptionstartdate')).over(spec3) / lit(365))
    

    【讨论】:

      【解决方案2】:

      首先,我刚刚创建了一个函数,可以将 pandas 数据帧平滑地转换为 spark 数据帧。

      def equivalent_type(f):
       if f == 'datetime64[ns]': return DateType()
       elif f == 'int64': return LongType()
       elif f == 'int32': return IntegerType()
       elif f == 'uint8': return IntegerType()
       elif f == 'float64': return FloatType()
       else: return StringType()
      
      def define_structure(string, format_type):
       try: typo = equivalent_type(format_type)
       except: typo = StringType()
       return StructField(string, typo)
      
      def pandas_to_spark(pandas_df):
       columns = list(pandas_df.columns)
       types = list(pandas_df.dtypes)
       struct_list = []
       for column, typo in zip(columns, types):
         struct_list.append(define_structure(column, typo))
       p_schema = StructType(struct_list)
       return spark.createDataFrame(pandas_df, p_schema)
      

      然后我使用 toPandas() 方法将 spark 数据帧转换为 pandas 数据帧

      temp = df_merge.select('subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate').dropDuplicates()
      temp = temp.toPandas()
      
      temp['cancelleddate'] = pd.to_datetime(temp['cancelleddate'])
      temp['subscriptionstartdate'] = pd.to_datetime(temp['subscriptionstartdate'])
      temp['subscriptionstartdate'] = pd.to_datetime(temp['subscriptionstartdate'])
      
      df_merge = df_merge.toPandas()
      df_merge['mean_cancelled_sub_duration'] = (temp['cancelleddate']-temp['subscriptionstartdate']).dt.days.dropna().mean() / 365
      df_merge['mean_sub_duration'] = (temp['termenddate']-temp['subscriptionstartdate']).dt.days.dropna().mean() / 365
      
      df_merge = pandas_to_spark(df_merge)
      

      我使用的是 Spark 2.3.0 版本,所以我必须确保我要转换为 pandas 数据帧的日期字段应该是时间戳,否则会引发错误。

      这终于解决了我的问题,我得到了平均值(因为这不是一种聚合平均值或逐行平均值,你可以说它是列平均值。)

      【讨论】:

        猜你喜欢
        • 2020-11-12
        • 2018-05-04
        • 2020-07-16
        • 1970-01-01
        • 2013-04-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多