【发布时间】:2016-05-29 19:21:08
【问题描述】:
我现在正在用 Python 实现一个数据订阅者,它订阅一个数据发布者(实际上是一个 ZeroMQ 发布者套接字),并且一旦提供任何新消息就会得到通知。在我的订阅者中,消息在收到后被转储到数据处理器。完成后,订阅者还将收到处理器的通知。由于数据处理器是用 C++ 编写的,因此我必须使用一个简单的 C++ 模块来扩展 Python 代码。
以下是我的数据订阅者的简化可运行代码示例。代码main.py,其中模块proc代表处理器,订阅localhost:10000上的ZeroMQ套接字,设置回调,并通过调用proc.onMsg将接收到的消息发送给处理器。
#!/bin/python
# main.py
import gevent
import logging
import zmq.green as zmq
import pub
import proc
logging.basicConfig( format='[%(levelname)s] %(message)s', level=logging.DEBUG )
SUB_ADDR = 'tcp://localhost:10000'
def setupMqAndReceive():
'''Setup the message queue and receive messages.
'''
ctx = zmq.Context()
sock = ctx.socket( zmq.SUB )
# add topics
sock.setsockopt_string( zmq.SUBSCRIBE, 'Hello' )
sock.connect( SUB_ADDR )
while True:
msg = sock.recv().decode( 'utf-8' )
proc.onMsg( msg )
def callback( a, b ):
print( '[callback]', a, b )
def main():
'''Entrance of the module.
'''
pub.start()
proc.setCallback( callback )
'''A simple on-liner
gevent.spawn( setupMqAndReceive ).join()
works. However, the received messages will not be
processed by the processor.
'''
gevent.spawn( setupMqAndReceive )
proc.start()
模块proc 被简化为导出三个函数:
-
setCallback设置回调函数,当消息处理完毕后,可以通知我的订阅者; -
onMsg被订阅者调用; -
start设置一个新的工作线程来处理来自订阅者的消息,并使主线程加入等待工作线程退出。
完整版本的源代码可以在githubhttps://github.com/more-more-tea/python_gil 上找到。然而,它并没有按我的预期运行。一旦添加了处理器线程,订阅者就无法在 gevent 循环中接收来自发布者的数据。如果我简单地删除数据处理器模块,订阅者 gevent 循环可以接收来自发布者的消息。
代码有什么问题吗?我怀疑 GIL 会干扰消息处理器中 pthread 的并发性,或者 gevent 循环处于饥饿状态。任何有关该问题或如何调试它的提示将不胜感激!
【问题讨论】:
标签: python c++ multithreading gil