处理程序的实现非常简单。实际上,设置环境比实现处理程序花费更多时间。
处理程序构造函数接受可选参数key。如果提供,写入的消息将被发送到此键指定的单个分区。如果未提供,消息将在服务器之间循环分发。
我没有对其进行太多测试,但它是如此简单,以至于我看不出这里可能会出现什么问题。希望对你有用。
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer,KeyedProducer
import logging,sys
class KafkaLoggingHandler(logging.Handler):
def __init__(self, host, port, topic, key=None):
logging.Handler.__init__(self)
self.kafka_client = KafkaClient(host, port)
self.key = key
if key is None:
self.producer = SimpleProducer(self.kafka_client, topic)
else:
self.producer = KeyedProducer(self.kafka_client, topic)
def emit(self, record):
#drop kafka logging to avoid infinite recursion
if record.name == 'kafka':
return
try:
#use default formatting
msg = self.format(record)
#produce message
if self.key is None:
self.producer.send_messages(msg)
else:
self.producer.send(self.key, msg)
except:
import traceback
ei = sys.exc_info()
traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr)
del ei
def close(self):
self.producer.stop()
logging.Handler.close(self)
kh = KafkaLoggingHandler("localhost", 9092, "test_log")
#OR
#kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1")
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
logger.addHandler(kh)
logger.info("The %s boxing wizards jump %s", 5, "quickly")
logger.debug("The quick brown %s jumps over the lazy %s", "fox", "dog")
try:
import math
math.exp(1000)
except:
logger.exception("Problem with %s", "math.exp")
附:处理程序使用这个 Kafka 客户端:https://github.com/mumrah/kafka-python