【问题标题】:Output Spark application id in the logs with Log4j使用 Log4j 在日志中输出 Spark 应用程序 ID
【发布时间】:2019-02-15 09:47:24
【问题描述】:

我有一个用于 Spark 应用程序的自定义 Log4j 文件。我想输出 Spark 应用程序 ID 以及消息和日期等其他属性,因此 JSON 字符串结构如下所示:

{"name":,"time":,"date":,"level":,"thread":,"message":,"app_id":}

现在,这个结构看起来像这样:

{"name":,"time":,"date":,"level":,"thread":,"message":}

如何为 Spark 驱动程序日志定义这样的布局?

我的 log4j 文件如下所示:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/'>

    <appender name="Json" class="org.apache.log4j.ConsoleAppender">
        <layout class="org.apache.hadoop.log.Log4Json">
            <param name="ConversionLayout" value=""/>
        </layout>
    </appender>

    <root>
        <level value="INFO"/>
        <appender-ref ref="Json"/>
    </root>
</log4j:configuration>

【问题讨论】:

    标签: json scala apache-spark log4j


    【解决方案1】:

    我怀疑org.apache.hadoop.log.Log4Json 可以为此目的进行调整。根据它的 javadoc 和源代码,它可能相当繁琐。

    虽然看起来您使用的是 Log4j 1x,但它的 API 非常灵活,我们可以通过扩展 org.apache.log4j.Layout 轻松定义自己的布局。

    我们需要一个将根据目标结构转换为 JSON 的案例类:

    case class LoggedMessage(name: String,
                             appId: String,
                             thread: String,
                             time: Long,
                             level: String,
                             message: String)
    

    Layout 可能会扩展如下。要访问“app_id”的值,我们将使用 Log4j 的映射诊断上下文

    import org.apache.log4j.Layout
    import org.apache.log4j.spi.LoggingEvent
    import org.json4s.DefaultFormats
    import org.json4s.native.Serialization.write
    
    class JsonLoggingLayout extends Layout {
      // required by the API
      override def ignoresThrowable(): Boolean = false
      // required by the API
      override def activateOptions(): Unit = { /* nothing */ }
    
      override def format(event: LoggingEvent): String = {
        // we are using json4s for JSON serialization
        implicit val formats = DefaultFormats
    
        // retrieve app_id from Mapped Diagnostic Context
        val appId = event.getMDC("app_id") match {
          case null => "[no_app]" // logged messages outside our app
          case defined: AnyRef => defined.toString
        }
        val message = LoggedMessage("TODO",
                                    appId,
                                    Thread.currentThread().getName,
                                    event.getTimeStamp,
                                    event.getLevel.toString,
                                    event.getMessage.toString)
        write(message) + "\n"
      }
    
    }
    

    最后,在创建 Spark 会话时,我们将 app_id 值放入 M​​DC:

    import org.apache.log4j.{Logger, MDC}
    
    // create Spark session
    
    MDC.put("app_id", session.sparkContext.applicationId)
    
    logger.info("-------- this is info --------")
    logger.warn("-------- THIS IS A WARNING --------")
    logger.error("-------- !!! ERROR !!! --------")
    

    这会产生以下日志:

    {"name":"TODO","appId":"local-1550247707920","thread":"main","time":1550247708149,"level":"INFO","message":"-------- this is info --------"}
    {"name":"TODO","appId":"local-1550247707920","thread":"main","time":1550247708150,"level":"WARN","message":"-------- THIS IS A WARNING --------"}
    {"name":"TODO","appId":"local-1550247707920","thread":"main","time":1550247708150,"level":"ERROR","message":"-------- !!! ERROR !!! --------"}
    

    当然,不要忘记参考 log4j config xml 中的实现:

    <appender name="Json" class="org.apache.log4j.ConsoleAppender">
      <layout class="stackoverflow.q54706582.JsonLoggingLayout" />
    </appender>
    

    【讨论】:

    • 只有import org.json4s.native.Serialization.write 应更改为import org.json4s.jackson.Serialization.write。这对我有用
    • 是的,这取决于你在 json4s 中使用的是什么,无论是 native 还是 jackson。
    猜你喜欢
    • 1970-01-01
    • 2017-06-26
    • 1970-01-01
    • 1970-01-01
    • 2010-10-29
    • 2016-10-30
    • 1970-01-01
    • 2016-09-28
    • 1970-01-01
    相关资源
    最近更新 更多