如何从Python日志记录模块写入Kafka?

肯法

我有一个大型的复杂应用程序,该应用程序大量使用Python日志记录模块。

我需要开始将这些日志添加到Kafka集群中,并需要确保在此过程中不要更改数据。

对我来说,理想的解决方案是为Kafka创建一个新的处理程序-并允许日志同时并行访问旧的日志记录解决方案和kafka。然后最终关闭旧的日志记录处理程序,然后将其发送给Kafka。

但是,我看不到任何kafka记录处理程序-只有kafka客户端。添加kafka客户端意味着跟踪当前的每个日志记录调用,并向新的kafka客户端添加单独的调用。获得相同的结果将是困难的。

野火

处理程序的实现非常简单。实际上,设置环境比实现处理程序要花费更多的时间。

处理程序构造函数接受可选参数key如果提供,则写入的消息将发送到此键指定的单个分区。如果未提供,则消息将以循环方式在服务器之间分发。

我没有做太多测试,但是它是如此简单,以至于我看不到这里可能出什么问题。希望它会有用。

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer,KeyedProducer
import logging,sys

class KafkaLoggingHandler(logging.Handler):

    def __init__(self, host, port, topic, key=None):
        logging.Handler.__init__(self)
        self.kafka_client = KafkaClient(host, port)
        self.key = key
        if key is None:
            self.producer = SimpleProducer(self.kafka_client, topic)
        else:
            self.producer = KeyedProducer(self.kafka_client, topic)

    def emit(self, record):
        #drop kafka logging to avoid infinite recursion
        if record.name == 'kafka':
            return
        try:
            #use default formatting
            msg = self.format(record)
            #produce message
            if self.key is None:
                self.producer.send_messages(msg)
            else:
                self.producer.send(self.key, msg)
        except:
            import traceback
            ei = sys.exc_info()
            traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr)
            del ei

    def close(self):
        self.producer.stop()
        logging.Handler.close(self)

kh = KafkaLoggingHandler("localhost", 9092, "test_log")
#OR
#kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1")

logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
logger.addHandler(kh)
logger.info("The %s boxing wizards jump %s", 5, "quickly")
logger.debug("The quick brown %s jumps over the lazy %s", "fox",  "dog")
try:
    import math
    math.exp(1000)
except:
    logger.exception("Problem with %s", "math.exp")

PS处理程序使用以下Kafka客户端:https : //github.com/mumrah/kafka-python

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章