【问题标题】:Apache spark custom log unfiltered data (LazyLogging)Apache spark 自定义日志未过滤数据 (LazyLogging)
【发布时间】:2021-02-24 21:40:50
【问题描述】:

我正在过滤一列以符合某些验证,我可以使用 Spark 内置函数进行过滤, 但是我需要用正确的消息记录无效数据(我正在使用 LazyLogging),有什么方法可以在不使用自定义 UDF 的情况下做到这一点,这样我就可以保持 Spark 优化?

例如过滤少于 20 个字符的名称:

df.filter(length($"name") <= lit(20))

在这种情况下,如何在没有自定义 UDF 的情况下记录超过 20 个字符的名称?

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    如果过滤操作的结果不是太大以至于它不适合您的驱动程序,您可以收集结果并将其打印到您的默认 Logger。

    val logCollection = df.filter(length($"name") > lit(20)).collectAsList
    logCollection.foreach(logger.info(_))
    

    作为替代方案,您可以通过应用另一种 writeStream 格式将名称写入数据库、控制台等来创建单独的流。请记住,当您这样做时,您实际上会在 SparkSession 中创建多个流查询,它们是独立使用数据:

    val originalDf = df.[...]
    val logDf = df.filter(length($"name") > lit(20))
    
    val originalQuery = originalDf.writeStream.[...].start() // keep logic as is
    val logQuery = logDf.writeStream.format("console").[...].start()
    
    spark.streams.awaitAnyTermination()
    

    【讨论】:

    • 好吧,过滤操作给我的名字少于 20 个字符,我正在尝试记录更长的名字。
    • 那只使用互补过滤条件?
    • 是的,但是我需要缓存数据帧并运行单独的过滤器来检索无效结果。它会产生不必要的开销。
    • 只要你有记录的需求,我不会称之为“不必要的”。
    • 我也这么认为。根据预期的无效名称的数量,它可能是应用 UDF 的最简单方法。
    猜你喜欢
    • 2018-02-09
    • 2016-08-18
    • 1970-01-01
    • 1970-01-01
    • 2016-09-24
    • 2012-09-11
    • 2019-11-15
    • 1970-01-01
    相关资源
    最近更新 更多