【问题标题】:pyspark count distinct on each column每列的pyspark计数不同
【发布时间】:2018-10-24 15:41:04
【问题描述】:

我是全新的 pyspark(实际上也是 python)。我试图在每列上计算不同的(不是不同的列组合)。我想要这条 SQL 语句的答案:

sqlStatement = "Select Count(Distinct C1) AS C1, Count(Distinct C2) AS C2, ..., Count(Distinct CN) AS CN From myTable"

distinct_count = spark.sql(sqlStatement).collect()

在 8 节点集群上,这需要很长时间(16 小时)(请参阅下面的配置)。我正在尝试优化具有 400 列的 100GB 数据集。我没有看到使用数据框 sql 原语的方法,例如:

df.agg(countDistinct('C1', 'C2', ..., 'CN'))

因为这将再次给我独特的组合。必须有一种方法可以快速完成。


主节点 标准(1个主人,N个工人) 机器类型
n1-highmem-8(8 个 vCPU,52.0 GB 内存) 主磁盘大小
500 GB 工作节点
8 机器类型
n1-highmem-4(4 个 vCPU,26.0 GB 内存) 主磁盘大小
500 GB 本地 SSD
1

【问题讨论】:

  • approx_count_distincthere? Count(Distinct x)真的很糟糕。
  • 谢谢。这是一个有用的帖子。我做了很多谷歌搜索,由于某种原因,那个帖子没有出现!
  • 我尝试做的另一件事是运行 describe() 以获取所有基本统计信息。那也很慢。这是我 16 小时中的 2 小时。任何有关加速 describe() 的帮助也将不胜感激。
  • 也许您可以从使用cache() 中受益,如果您对数据框执行多项操作,则会重新计算转换。

标签: python apache-spark pyspark pyspark-sql google-cloud-dataproc


【解决方案1】:

请注意,您使用的是.collect() 方法,该方法会将数据集的所有元素返回给驱动程序,这可能会导致驱动程序内存不足。解释见this link

您可以通过在查询上运行 .explain() 来查看传递的内容:

myquery = spark.sql(sqlStatement)
myquery.explain()

您可以通过将查询拆分为多个查询来缓解此问题,这样您就不会同时在每一列上计算 distinct()。这将减少一次传递的数据量。

【讨论】:

    猜你喜欢
    • 2021-08-28
    • 1970-01-01
    • 2017-07-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-05
    • 1970-01-01
    相关资源
    最近更新 更多