【问题标题】:Pyspark: Intersection of multiple arraysPyspark:多个数组的交集
【发布时间】:2020-04-13 04:54:56
【问题描述】:

我有以下测试数据,必须借助pyspark检查以下语句(数据实际上非常大:700000笔交易,每笔交易有10+个产品):

import pandas as pd
import datetime

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'customerid': [1, 2, 2, 3, 4, 3], 'productids': ['A;B', 'D;E', 'H;X', 'P;Q;G', 'S;T;U', 'C;G']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])

“某个客户 ID 在 x 天内存在的交易的特点是购物车中至少有一件相同的产品。”

到目前为止,我有以下方法(示例 x = 2):

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "500g") \
    .appName('my-pandasToSparkDF-app') \
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")

df=spark.createDataFrame(data)

x = 2

win = Window().partitionBy('customerid').orderBy(F.col("date").cast("long")).rangeBetween(-(86400*x), Window.currentRow)
test = df.withColumn("productids", F.array_distinct(F.split("productids", "\;")))\
    .withColumn("flat_col", F.array_distinct(F.flatten((F.collect_list("productids").over(win))))).orderBy(F.col("date"))

test = test.toPandas()

因此,从我们回顾过去 2 天的每笔交易中,按 customerid 分组,相应的产品汇总在“flat_col”列中。

但我真正需要的是具有相同 ID 的购物篮的交集。只有这样我才能判断是否有共同的产品。

所以“flat_col”的第五行应该是['G']而不是['P','Q','G','C']。同样,[] 应该出现在“flat_col”的所有其他行中。

非常感谢!

【问题讨论】:

    标签: python pandas apache-spark pyspark


    【解决方案1】:

    您可以在没有 self-join 的情况下实现此目的(因为在大数据中连接是昂贵的 shuffle 操作)使用 higher order functions in spark 2.4。使用的函数filter,transform,aggregate

    df=spark.createDataFrame(data)
    
    x = 2
    
    win = Window().partitionBy('customerid').orderBy(F.col("date").cast("long")).rangeBetween(-(86400*x), Window.currentRow)
    test = df.withColumn("productids", F.array_distinct(F.split("productids", "\;")))\
        .withColumn("flat_col", F.flatten(F.collect_list("productids").over(win)))\
        .withColumn("occurances", F.expr("""filter(transform(productids, x->\
         IF(aggregate(flat_col, 0,(acc,t)->acc+IF(t=x,1,0))>1,x,null)),y->y!='null')"""))\
        .drop("flat_col").orderBy("date").show()
    
    +-------------------+----------+----------+----------+
    |               date|customerid|productids|occurances|
    +-------------------+----------+----------+----------+
    |2014-01-01 00:00:00|         1|    [A, B]|        []|
    |2014-01-02 00:00:00|         2|    [D, E]|        []|
    |2014-01-03 00:00:00|         2|    [H, X]|        []|
    |2014-01-04 00:00:00|         3| [P, Q, G]|        []|
    |2014-01-05 00:00:00|         4| [S, T, U]|        []|
    |2014-01-06 00:00:00|         3|    [C, G]|       [G]|
    +-------------------+----------+----------+----------+
    

    【讨论】:

    • 如果你按照customerid进行广播或分区,什么都不会洗牌,你把几个话题混为一谈了。
    • Broadcast 最多只能占用 8GB 空间,即使正确分区,高阶函数也总是优于 join 操作。我建议你花一些时间学习高阶函数,而不是让我感到困惑。
    • 另外,通过customer_id重新分区,重新分区本身就是一个昂贵的shuffle操作。
    【解决方案2】:

    自加入是有史以来最好的技巧

    from pyspark.sql.functions import concat_ws, collect_list
    spark.createDataFrame(data).registerTempTable("df")
    sql("SELECT date, customerid, explode(split(productids, ';')) productid FROM df").registerTempTable("altered")
    df = sql("SELECT al.date, al.customerid, al.productid productids, altr.productid flat_col FROM altered al left join altered altr on altr.customerid = al.customerid and al.productid = altr.productid and al.date != altr.date and datediff(al.date,altr.date) <=2 and datediff(al.date,altr.date) >=-2")
    df.groupBy("date", "customerid").agg(concat_ws(",", collect_list("productids")).alias('productids'), concat_ws(",", collect_list("flat_col")).alias('flat_col')).show()
    

    【讨论】:

    • 非常感谢您的回答。目前,我仍然无法重现您的结果。我正在尝试这样的事情: from pyspark.sql import SQLContext SQLContext.sql("SELECT explode(split(productids, ';')) FROM df").registerTempTable("exploded") .... 我收到错误消息: sql() 缺少 1 个必需的位置参数:'sqlQuery'
    • 那是因为你是从笔记本启动的。您需要以某种方式导入 sql 上下文。我在命令行中运行脚本,并在我开始执行 bin/pyspark 时为我完成。我更正了一下脚本,请重新运行
    猜你喜欢
    • 2014-10-19
    • 2011-10-24
    • 1970-01-01
    • 2021-03-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-08-16
    • 2021-12-06
    相关资源
    最近更新 更多