【问题标题】:Get the distinct elements of a column grouped by another column on a PySpark Dataframe获取由 PySpark Dataframe 上的另一列分组的列的不同元素
【发布时间】:2019-11-13 21:58:42
【问题描述】:

我有一个包含 id 和购买的 pyspark DF,我正在尝试对其进行转换以用于 FP 增长。 目前,对于给定的 id,我有多行,每行仅与一次购买有关。

我想将此数据框转换为有两列的形式,一列用于 id(每个 id 有一行),第二列包含该 id 的不同购买列表。

我尝试使用用户定义函数 (UDF) 将不同的购买映射到不同的 id,但我得到“py4j.Py4JException: Method getstate([]) 不存在”。感谢@Mithril 我看到“你不能在 udf 和 pandas_udf 中使用 sparkSession 对象、spark.DataFrame 对象或其他 Spark 分布式对象,因为它们是 unpickled 的。”

所以我在下面实现了 TERRIBLE 方法(可以使用但不可扩展):

#Lets create some fake transactions
customers  = [1,2,3,1,1]
purschases = ['cake','tea','beer','fruit','cake']

# Lets create a spark DF to capture the transactions
transactions = zip(customers,purschases)
spk_df_1 = spark.createDataFrame(list(transactions) , ["id", "item"])

# Lets have a look at the resulting spark dataframe
spk_df_1.show()

# Lets capture the ids and list of their distinct pruschases in a 
# list of tuples
purschases_lst = []
nums1 = []
import pyspark.sql.functions as f

# for each distinct id lets get the list of their distinct pruschases

for id in spark.sql("SELECT distinct(id) FROM TBLdf ").rdd.map(lambda row : row[0]).collect():
   purschase = df.filter(f.col("id") == id).select("item").distinct().rdd.map(lambda row : row[0]).collect()
   nums1.append((id,purschase))


# Lets see what our list of transaction tuples looks like  
print(nums1)
print("\n")

# lets turn the list of transaction tuples into a pandas dataframe
df_pd = pd.DataFrame(nums1)

# Finally lets turn our pandas dataframe into a pyspark Dataframe
df2 = spark.createDataFrame(df_pd)
df2.show()

输出:

+---+-----+
| id| item|
+---+-----+
|  1| cake|
|  2|  tea|
|  3| beer|
|  1|fruit|
|  1| cake|
+---+-----+

[(1, ['fruit', 'cake']), (3, ['beer']), (2, ['tea'])]


+---+-------------+
|  0|            1|
+---+-------------+
|  1|[fruit, cake]|
|  3|       [beer]|
|  2|        [tea]|
+---+-------------+

如果有人有任何建议,我将不胜感激。

【问题讨论】:

    标签: python-3.x pyspark pyspark-dataframes


    【解决方案1】:

    这是collect_set 的任务,它创建了一组不重复的项目:

    import pyspark.sql.functions as F
    
    #Lets create some fake transactions
    customers  = [1,2,3,1,1]
    purschases = ['cake','tea','beer','fruit','cake']
    
    # Lets create a spark DF to capture the transactions
    transactions = zip(customers,purschases)
    spk_df_1 = spark.createDataFrame(list(transactions) , ["id", "item"])
    spk_df_1.show()
    
    spk_df_1.groupby('id').agg(F.collect_set('item')).show()
    

    输出:

    +---+-----+
    | id| item|
    +---+-----+
    |  1| cake|
    |  2|  tea|
    |  3| beer|
    |  1|fruit|
    |  1| cake|
    +---+-----+
    
    +---+-----------------+
    | id|collect_set(item)|
    +---+-----------------+
    |  1|    [fruit, cake]|
    |  3|           [beer]|
    |  2|            [tea]|
    +---+-----------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-06-27
      • 2022-01-14
      • 2018-05-27
      • 2015-12-11
      • 2022-10-08
      • 1970-01-01
      • 2020-10-16
      • 1970-01-01
      相关资源
      最近更新 更多