【问题标题】:Way to send half logs to RabbitMQ before sending the complete log to logstash/elasticsearch在将完整日志发送到 logstash/elasticsearch 之前将一半日志发送到 RabbitMQ 的方法
【发布时间】:2015-09-27 17:24:39
【问题描述】:

我有几个函数,每个函数都创建特定于一个事务的日志;它是一个多线程应用程序,因此 func1 的函数入口可以是随机进行的事务,但对于单个事务,它将仅通过 func1、func2 和 func3 顺序进行。

func1(transactionId) {
     log("%d Now in func1", transactionId);
}

func2(transactionId) {
     log("%d Now in func2", transactionId);
}

func3(transactionId) {
     log("%d Now in func3", transactionId);
}

现在,我想一次只为每笔交易写入logstash;那就是

 1 Now in func1 Now in func2 Now in fun3

然后这需要最后去elasticsearch;

我正在考虑将一半事务日志写入 RabbitMQ 临时队列,然后在完成事务后,将其提交到 RabbitMQ 生产者队列以将消息发送到 logstash;

喜欢

func1(transactionId) {
     add2RMQ(transactionId, "Now in func1");
}

func2(transactionId) {
     add2RMQ("transactionId, "Now in func2");
}

func3(transactionId) {
      add2RMQ("transactionId, "Now in func3");
      /* Last point of transaction */
      commit2RMQ(transactionId);
}

commit2RMQ 执行 logstash 的时间应该会收到特定于事务的完整消息以写入 elasticsearch。

问题:

  1. 什么是解决此问题的正确解决方案,以便将特定于事务的数据立即发送到 elasticsearch?
  2. 我们可以用 RabbitMQ 解决这个问题吗?如果是这样,我需要为此使用哪些正确的 API?
  3. 有没有什么方法可以在不使用 RabbitMQ 而仅使用 logstash 和 elasticsearch 的情况下实现相同的目标?
  4. 我不想使用 elasticsearch 更新 API,因为它可能会为每个特定于事务的日志消息消耗大量搜索操作。

【问题讨论】:

    标签: elasticsearch rabbitmq logstash apache-kafka logstash-forwarder


    【解决方案1】:

    尝试聚合与单个事务有关的不同日志行并不是一个容易解决的问题,特别是如果您将消息队列系统添加到混合中作为要聚合的日志的中间存储。我会采用不涉及其他子系统(如 RabbitMQ)的不同方式。

    此外,如果您尝试将多个日志行连接成一个日志行,则会丢失每个日志行可以提供的详细信息,例如每个函数执行所花费的时间。如果func2func3 分别抛出异常会怎样?您是否应该存储仅由func1 组成的部分日志,分别仅由func1func2 组成?

    我将要写的内容可能可以转换为任何语言和任何日志记录解决方案,但为了便于说明,我假设您的程序是用 Java 编写的并且您使用的是 Log4J。

    因此,我将利用 Log4J's Mapped Diagnostic Context (MDC) 将您的事务 ID(以及可能的其他数据,例如用户名等)存储在您的每个日志行中。这样,您可以轻松检索与单个事务有关的所有日志行。这样做的好处是您不必聚合任何内容,只需提供足够的上下文信息,以便 Kibana 稍后可以为您完成。

    在您的伪代码中,您将交易 ID 直接添加到您的消息中。为此使用 MDC 而不是将 ID 记录到您的消息中的优点是,它使您无需在 Logstash 中解析所有消息,以重新发现您在创建日志行时已经知道的事务 ID。

    所以我们的想法是,在您的代码中,一旦您有了事务 ID,就将其添加到当前的每线程日志记录上下文中,如下所示:

    import org.apache.log4j.MDC;
    
    ...
    func1(transactionId) {
         // add the transaction ID to the logging context
         MDC.put("transactionID", transactionId);
         log("Now in func1");
    }
    
    func2(transactionId) {
         log("Now in func2");
    }
    
    func3(transactionId) {
         log("Now in func3");
    }
    

    然后在您的 Log4J 配置文件中,您可以使用 %X{transactionID} 模式指定附加程序以存储它,在这种情况下,我将它添加到线程名称之后,但您可以将它放在任何您喜欢的位置:

    log4j.appender.consoleAppender.layout.ConversionPattern = %d [%t] [%X{transactionID}] %5p %c - %m%n
    

    您的日志将如下所示:

    2015-09-28T05:07:28.425Z [http-8084-2] [625562271762]  INFO YourClass - Now in func1
    2015-09-28T05:07:29.776Z [http-8084-2] [625562271762]  INFO YourClass - Now in func2
    2015-09-28T05:07:30.652Z [http-8084-2] [625562271762]  INFO YourClass - Now in func3
                                                  ^
                                                  |
                                      the transaction ID is here
    

    当您有这样的日志行时,通过 Logstash grok 过滤器检索事务 ID 并将其存储在您的 logstash 索引中自己的 transactionID 字段中是小菜一碟。在 Kibana 中,您可以搜索交易 ID 并按时间戳降序排序,您将显示该交易的所有上下文。

    试一试!

    【讨论】:

    • 非常感谢;我非常感谢您在为我提供特定于故障案例的解决方案方面解决问题的方式。我会做那个实验。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-09-26
    • 1970-01-01
    • 1970-01-01
    • 2022-07-07
    • 2022-01-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多