【问题标题】:Use of pyzmq's logging handler in pythonpython中pyzmq的日志处理程序的使用
【发布时间】:2017-03-06 05:24:25
【问题描述】:

我想在 Python 程序中引入基于 zmq 的登录。当我面临ZMQError: Address in use 错误时,我决定将其归结为一个简单的概念证明。我能够运行精简版,但没有收到任何日志条目。这是我使用的代码:

日志发布者:

import time
import logging
from zmq.log import handlers as zmqHandler


logger = logging.getLogger('myapp')
logger.setLevel(logging.ERROR)
zmqH=zmqHandler.PUBHandler('tcp://127.0.0.1:12344')
logger.addHandler(zmqH)
for i in range(50):
    logger.error('error test...')
    print "Send error #%s" % (str(i))
    time.sleep(1)

结果

Send error #0
Send error #1
Send error #2
Send error #3
Send error #4
...

日志订阅者:

import time
import zmq

def sub_client():
    port = "12344"       
    context = zmq.Context()
    socket = context.socket(zmq.SUB)    
    socket.connect("tcp://127.0.0.1:%s" % port)
    # Generate 30 entries
    for i in range (30):
        print "Listening to publishers..."
        message = socket.recv()
        print "Received error #%s: %s" % (str(i), message)
        time.sleep(1)

sub_client()

结果

Listening to publishers...

所以订阅者在socket.recv() 的调用中被锁定。我在不同的控制台中开始发布者和订阅者。当我使用 netstat 时,这两个进程都会出现:

C:\>netstat -a -n -o | findstr 12344
  TCP    127.0.0.1:12344        0.0.0.0:0              LISTEN          1336
  TCP    127.0.0.1:12344        127.0.0.1:51937        ESTABLISHED     1336
  TCP    127.0.0.1:51937        127.0.0.1:12344        ESTABLISHED     8624

我没有在这里看到我的错误,有什么想法吗?

除了手头的问题,我一般如何使用这个zmq监听器。 我是否必须为每个进程创建一个PUBHandler 实例,然后将其添加到logger 的所有实例中(logging.getLogger('myapp') 创建一个自己的记录器实例,对吗?)还是我必须为每个进程创建一个自己的PUBHandler我使用的所有不同的类?由于PUBHandlerclass 有一个createLock(),我认为它不是线程保存...

为了完整起见,我想提一下doc of the PUBHandler class

我在 Win7 上使用 python(x,y) 发行版,带有 python 2.7.10 和 pyzmq 14.7.0-14

[更新] 我排除了 windows 防火墙作为丢失包的来源

【问题讨论】:

    标签: python python-2.7 logging zeromq pyzmq


    【解决方案1】:

    我猜你错过了在服务器中设置 PUB 套接字。

    应该这样做,

    publisher = context.socket(zmq.PUB)
    publisher.bind('tcp://127.0.0.1:12344')
    zmqh = PUBHandler(publisher)
    logger = logging.getLogger('myapp')
    logger.setLevel(logging.ERROR)
    logger.addHandler(zmqh)
    

    我希望这会有所帮助。

    【讨论】:

    • @Akhay 根据文档,handler = PUBHandler('inproc://loc') 应该相当于单独创建套接字并将其绑定到接口,但我会尝试一下
    • 我尝试了你的建议,但这并没有改变任何东西。我仍然没有收到订阅方的日志。
    【解决方案2】:

    问题出在订阅者方面。最初,订阅者过滤掉所有消息,直到设置过滤器。使用socket.setsockopt(opt, value) 函数将其存档。 pyZMQ的描述对这个功能的使用不是很清楚:

    getsockopt(opt) 获取由创建的新套接字的默认套接字选项 这个上下文

    但是zmq_setsockopt函数的文档很清楚(see here):

    int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len)

    ...

    ZMQ_SUBSCRIBE:建立消息过滤器ZMQ_SUBSCRIBE 选项应在ZMQ_SUB 套接字上建立一个新的消息过滤器。 新创建的ZMQ_SUB 套接字将过滤掉所有传入的 消息,因此您应该调用此选项来建立一个 初始消息过滤器。

    所以解决方案是使用socket.setsockopt(zmq.SUBSCRIBE,filter) 设置一个过滤器,其中filter 是您要过滤的字符串。使用filter='' 显示所有消息。像filter='ERROR' 这样的过滤器将只显示错误消息并抑制所有其他类型,如WARNINGINFODEBUG

    sub_client() 函数看起来像这样:

    import time
    import zmq
    
    def sub_client():
        port = "12344"       
        context = zmq.Context()
        socket = context.socket(zmq.SUB)    
        socket.connect("tcp://127.0.0.1:%s" % port)
        socket.setsockopt(zmq.SUBSCRIBE,'')
    
    
        # Process 30 updates
        print "Listening to publishers..."
        for i in range (30):       
            print "Listening to publishers..."
            message = socket.recv()
            print "Received error #%s: %s" % (str(i), message)
            time.sleep(1)
    
    sub_client()
    

    【讨论】:

      【解决方案3】:

      我知道这是一个较旧的帖子,但如果有人登陆这里,这就是订阅者的样子

      def sub_client():
          port = "12345"       
          context = zmq.Context()
          socket = context.socket(zmq.SUB)    
          socket.connect("tcp://localhost:%s" % port)
          socket.subscribe("")
          
          # Process 30 updates
          for i in range (30):
              print("Listening to publishers...")
              message = socket.recv()
              print("Received error #%s: %s",str(i), message)
              time.sleep(1)
      
          sub_client()
      

      Publisher 看起来像

      import zmq
      import logging
      import time
      from zmq.log.handlers import PUBHandler
      
      port = "12345"       
      context = zmq.Context()
      pub = context.socket(zmq.PUB)
      pub.bind("tcp://*:%s" % port)
      
      handler = PUBHandler(pub)
      logger = logging.getLogger()
      logger.setLevel(logging.ERROR)
      logger.addHandler(handler)
      for i in range(50):
          logger.error('error test...')
          print("publish error",str(i))
          time.sleep(1)
      

      【讨论】:

      • 好的,socket.subscribe("") 是对 socket.setsockopt(zmq.SUBSCRIBE,""') 的高级调用。很高兴知道,谢谢
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-07-14
      • 1970-01-01
      • 2020-03-27
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多