【问题标题】:Spark StringIndexer.fit is very slow on large recordsSpark StringIndexer.fit 在大型记录上非常慢
【发布时间】:2018-12-31 07:20:10
【问题描述】:

我的大数据记录格式如下:

// +---+------+------+
// |cid|itemId|bought|
// +---+------+------+
// |abc|   123|  true|
// |abc|   345|  true|
// |abc|   567|  true|
// |def|   123|  true|
// |def|   345|  true|
// |def|   567|  true|
// |def|   789| false|
// +---+------+------+

ciditemId 是字符串。

有 965,964,223 条记录。

我正在尝试使用StringIndexercid 转换为整数,如下所示:

dataset.repartition(50)
val cidIndexer = new StringIndexer().setInputCol("cid").setOutputCol("cidIndex")
val cidIndexedMatrix = cidIndexer.fit(dataset).transform(dataset)

但是这些代码行非常慢(大约需要 30 分钟)。问题是它太大了,在那之后我什么也做不了。

我正在使用具有 2 个节点(61 GB 内存)的 R4 2XLarge 集群的 amazon EMR 集群。

我可以进一步提高性能吗?任何帮助将不胜感激。

【问题讨论】:

  • 您使用的是哪个版本的 Spark?
  • 我也有同样的问题。就我而言,它只有 100,000,000 行 x 150 列。在总共 256 个内核和 1,024GB 内存的 4 个节点上,它甚至在 15 分钟后都无法完成一项任务。背景:我的字符串列并不完全是唯一的,但和它一样好。此外,它相当长,平均约 40 个字符。有没有更好的方法来添加索引列?我尝试使用zipWithIndex 创建一个查找表,但也不能很好地扩展。
  • 我已经尝试过使用此处描述的 zipWithIndex blogs.msdn.microsoft.com/azuredatalake/2016/06/09/… 的替代解决方案。 IE。创建 ID 查找表并加入。这也不能很好地扩展,因为查找表对于广播连接来说太大了。
  • 看起来是一个很老的问题,它可能与这个错误报告有关issues.apache.org/jira/browse/SPARK-20392你当前的 spark 版本是什么?
  • 我使用的是 Spark 2.4.3,所以这应该不是问题

标签: apache-spark apache-spark-ml apache-spark-dataset


【解决方案1】:

假设:

  • 您计划使用 cid 作为功能(在 StringIndexer + OneHotEncoderEstimator 之后)
  • 您的数据位于 S3 中

先问几个问题:

不知道更多,我的第一个猜测是你现在不应该担心内存,先检查你的并行度。您只有 2 个 R4 2XLarge 实例可以为您提供:

  • 8 个 CPU
  • 61GB 内存

就个人而言,我会尝试:

  • 获取更多实例
  • R4 2XLarge 实例与拥有更多 CPU 的其他实例交换

不幸的是,对于当前的 EMR 产品,这只能通过在问题上投入资金来实现:

最后,repartition(50) 有什么用?这可能只会引入进一步的延迟......

【讨论】:

    【解决方案2】:

    如果列的基数很高,这是一种预期行为。作为训练过程的一部分,StringIndexer 收集所有标签,并创建标签-索引映射(使用 Spark 的o.a.s.util.collection.OpenHashMap)。

    这个过程在最坏的情况下需要 O(N) 内存,并且是计算和内存密集型的。

    如果列的基数很高,并且其内容将用作特征,则最好应用FeatureHasher(Spark 2.3 或更高版本)。

    import org.apache.spark.ml.feature.FeatureHasher
    
    val hasher = new FeatureHasher()
      .setInputCols("cid")
      .setOutputCols("cid_hash_vec")
    hasher.transform(dataset)
    

    它不保证唯一性且不可逆,但对于许多应用来说已经足够了,并且不需要拟合过程。

    对于不会用作特征的列,您也可以使用hash 函数:

    import org.apache.spark.sql.functions.hash
    
    dataset.withColumn("cid_hash", hash($"cid"))
    

    【讨论】:

    • 你说得对,我的专栏的基数非常高。我已经阅读了这两个函数的文档,并且对它们与 StringIndexer 有何不同有疑问。如果我可以将哈希列创建为附加列,我不介意哈希列是否不可逆。但是,如果它们不是唯一的,是否意味着可能存在哈希冲突?另外,您为什么不使用hash 来获取功能?我已经看到hash 返回一个 Int 列的问题,但将来我的列中的唯一值将超过 Integer 限制所允许的数量。
    • 如果它们是非唯一的,是否意味着可能发生哈希冲突 - 一般来说,对于固定数量的桶,哈希冲突与唯一的数量成正比价值观。因此,如果唯一值的数量接近Integer.MAX_VALUE,则几乎可以保证发生冲突。 为什么不将哈希用于特征 - 因为单独的哈希不适合作为特征。至少(用作树模型的分类)它需要元数据(另一个瓶颈),并且不能直接用于线性模型(这意味着扩展是必要的)。
    • 因此,如果唯一值的数量接近 Integer.MAX_VALUE 冲突,则几乎可以保证 - 没错,这就是我所害怕的。由于我的唯一 ID 数量会随着时间的推移接近Integer.MAX_VALUE,这两种解决方案都不起作用。即使有超过 10,000,000 个唯一 ID,我猜由于生日问题,冲突会很多发生。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-10-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-20
    • 2015-11-27
    相关资源
    最近更新 更多