【问题标题】:How to write to Kafka from Python logging module?如何从 Python 日志记录模块写入 Kafka?
【发布时间】:2014-02-01 20:21:42
【问题描述】:

我有一个大型复杂应用程序,它大量使用 Python 日志记录模块。

我需要开始将这些日志放入 Kafka 集群,并且需要确保我不会在此过程中更改数据。

对我来说,理想的解决方案是为 Kafka 创建一个新的处理程序 - 并允许日志在一段时间内同时进入旧的日志记录解决方案和 kafka。然后最终关闭旧的日志处理程序并发送到 Kafka。

但是,我没有看到任何 kafka 日志记录处理程序 - 只有 kafka 客户端。添加一个 kafka 客户端意味着跟踪每个当前的日志记录调用并向新的 kafka 客户端添加一个单独的调用。获得相同的结果会很困难。

【问题讨论】:

  • 为什么不自己实现一个处理程序呢?它只需要覆盖几个方法。
  • 我对日志记录或 kafka 的内容都不是很熟悉 - 所以希望有一个完全成熟的解决方案可以适合我当前的 sprint。鉴于两者的未知数和这个应用程序的重要性,如果我自己编写它,我将不得不推出它。

标签: python apache-kafka


【解决方案1】:

处理程序的实现非常简单。实际上,设置环境比实现处理程序花费更多时间。

处理程序构造函数接受可选参数key。如果提供,写入的消息将被发送到此键指定的单个分区。如果未提供,消息将在服务器之间循环分发。

我没有对其进行太多测试,但它是如此简单,以至于我看不出这里可能会出现什么问题。希望对你有用。

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer,KeyedProducer
import logging,sys

class KafkaLoggingHandler(logging.Handler):

    def __init__(self, host, port, topic, key=None):
        logging.Handler.__init__(self)
        self.kafka_client = KafkaClient(host, port)
        self.key = key
        if key is None:
            self.producer = SimpleProducer(self.kafka_client, topic)
        else:
            self.producer = KeyedProducer(self.kafka_client, topic)

    def emit(self, record):
        #drop kafka logging to avoid infinite recursion
        if record.name == 'kafka':
            return
        try:
            #use default formatting
            msg = self.format(record)
            #produce message
            if self.key is None:
                self.producer.send_messages(msg)
            else:
                self.producer.send(self.key, msg)
        except:
            import traceback
            ei = sys.exc_info()
            traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr)
            del ei

    def close(self):
        self.producer.stop()
        logging.Handler.close(self)

kh = KafkaLoggingHandler("localhost", 9092, "test_log")
#OR
#kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1")

logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
logger.addHandler(kh)
logger.info("The %s boxing wizards jump %s", 5, "quickly")
logger.debug("The quick brown %s jumps over the lazy %s", "fox",  "dog")
try:
    import math
    math.exp(1000)
except:
    logger.exception("Problem with %s", "math.exp")

附:处理程序使用这个 Kafka 客户端:https://github.com/mumrah/kafka-python

【讨论】:

    【解决方案2】:

    这是一个很棒的修复,谢谢!该代码在过去几年中有所更新,现在不推荐使用某些功能。不过,此修复的整体设计非常非常有帮助,再次感谢您。

    SimpleProducer (deprecated) --> KafkaProducer
    SimpleConsumer (deprecated) --> KafkaConsumer
    

    这是我使用 Kafka 1.0.0 和 kafka-python 1.4.2 修改的 sn-p 以及只是生产者,因为我在另一端通过 logstash 消费。

    希望这对你有用!

    tester.py(主程序)

    # -*- coding: utf-8 -*-
    """Module to test out logging to kafka."""
    
    import json
    import logging
    
    from utils.kafka_handler import KafkaHandler
    from kafka import KafkaProducer
    
    
    def run_it(logger=None):
        """Run the actual connections."""
    
        logger = logging.getLogger(__name__)
        # enable the debug logger if you want to see ALL of the lines
        #logging.basicConfig(level=logging.DEBUG)
        logger.setLevel(logging.DEBUG)
    
        kh = KafkaHandler(['localhost:9092'], 'sebtest')
        logger.addHandler(kh)
    
        logger.info("I'm a little logger, short and stout")
        logger.debug("Don't tase me bro!")
    
    
    if __name__ == "__main__":
        run_it()
    

    utils/kafka_handler.py(日志工具)

    # -*- coding: utf-8 -*-
    """Module to provide kafka handlers for internal logging facility."""
    
    import json
    import logging
    import sys
    
    from kafka import KafkaProducer
    
    
    class KafkaHandler(logging.Handler):
        """Class to instantiate the kafka logging facility."""
    
        def __init__(self, hostlist, topic='corp_it_testing', tls=None):
            """Initialize an instance of the kafka handler."""
            logging.Handler.__init__(self)
            self.producer = KafkaProducer(bootstrap_servers=hostlist,
                                          value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                          linger_ms=10)
            self.topic = topic
    
        def emit(self, record):
            """Emit the provided record to the kafka_client producer."""
            # drop kafka logging to avoid infinite recursion
            if 'kafka.' in record.name:
                return
    
            try:
                # apply the logger formatter
                msg = self.format(record)
                self.producer.send(self.topic, {'message': msg})
                self.flush(timeout=1.0)
            except Exception:
                logging.Handler.handleError(self, record)
    
        def flush(self, timeout=None):
            """Flush the objects."""
            self.producer.flush(timeout=timeout)
    
        def close(self):
            """Close the producer and clean up."""
            self.acquire()
            try:
                if self.producer:
                    self.producer.close()
    
                logging.Handler.close(self)
            finally:
                self.release()
    

    【讨论】:

      猜你喜欢
      • 2011-09-17
      • 1970-01-01
      • 1970-01-01
      • 2019-12-08
      • 2022-07-06
      • 2015-02-25
      • 2020-11-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多