【发布时间】:2016-08-07 19:46:33
【问题描述】:
ZeroMQ 用于分布式消息传递,我正在使用为 python 中的偏执海盗模式提供的示例代码。我有一个客户(可能有更多客户)一个经纪人和多个工人。
我已经修改了示例,即使没有可用的工作人员,客户端也会继续向代理队列发送请求,而不是重试并最终退出。当它们最终可用时,它们将被分发给工作人员。在我的场景中,每个工作人员处理给定请求所需的时间各不相同。
我看到的问题是,当代理出现故障(变得不可用)时,客户端无法判断代理不可用,它会继续发送 .send() 请求。这些请求会丢失。在代理再次可用后,仅处理新请求。
Client.py
from random import randint
import time
import zmq
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32
# Paranoid Pirate Protocol constants
PPP_READY = "PPP_READY" # Signals worker is ready
PPP_HEARTBEAT = "PPP_HEARTBEAT" # Signals worker heartbeat
def worker_socket(context, poller):
"""Helper function that returns a new configured socket
connected to the Paranoid Pirate queue"""
worker = context.socket(zmq.DEALER) # DEALER
identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt(zmq.IDENTITY, identity)
poller.register(worker, zmq.POLLIN)
worker.connect("tcp://localhost:5556")
worker.send(PPP_READY)
return worker
context = zmq.Context(1)
poller = zmq.Poller()
liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
worker = worker_socket(context, poller)
cycles = 0
while True:
socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))
# Handle worker activity on backend
if socks.get(worker) == zmq.POLLIN:
# Get message
# - 3-part envelope + content -> request
# - 1-part HEARTBEAT -> heartbeat
frames = worker.recv_multipart()
if not frames:
break # Interrupted
if len(frames) == 3:
print "I: Normal reply: ", frames
liveness = HEARTBEAT_LIVENESS
time.sleep(4) # Do some heavy work
worker.send_multipart(frames)
elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
# print "I: Queue heartbeat"
liveness = HEARTBEAT_LIVENESS
else:
print "E: Invalid message: %s" % frames
interval = INTERVAL_INIT
else:
liveness -= 1
if liveness == 0:
print "W: Heartbeat failure, can't reach queue"
print ("W: Reconnecting in")
time.sleep(interval)
if interval < INTERVAL_MAX:
interval *= 2
poller.unregister(worker)
worker.setsockopt(zmq.LINGER, 0)
worker.close()
worker = worker_socket(context, poller)
liveness = HEARTBEAT_LIVENESS
if time.time() > heartbeat_at:
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
#print "I: Worker heartbeat"
worker.send(PPP_HEARTBEAT)
Broker.py
from collections import OrderedDict
import time
import threading
import zmq
HEARTBEAT_LIVENESS = 3 # 3..5 is reasonable
HEARTBEAT_INTERVAL = 1.0 # Seconds
# Paranoid Pirate Protocol constants
PPP_READY = "PPP_READY" # Signals worker is ready
PPP_HEARTBEAT = "PPP_HEARTBEAT" # Signals worker heartbeat
PPP_BUSY = "PPP_BUSY"
PPP_FREE = "PPP_FREE"
class Worker(object):
def __init__(self, address):
self.address = address
self.expiry = time.time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
class WorkerQueue(object):
def __init__(self):
self.queue = OrderedDict()
def ready(self, worker):
self.queue.pop(worker.address, None)
self.queue[worker.address] = worker
def purge(self):
"""Look for & kill expired workers."""
t = time.time()
expired = []
for address,worker in self.queue.iteritems():
if t > worker.expiry: # Worker expired
expired.append(address)
for address in expired:
print "W: Idle worker expired: %s" % address
self.queue.pop(address, None)
def next(self):
address, worker = self.queue.popitem(False)
return address
context = zmq.Context(1)
clcontext = zmq.Context()
frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER) # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556") # For workers
poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)
poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)
workers = WorkerQueue()
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
while True:
if len(workers.queue) > 0:
poller = poll_both
else:
poller = poll_workers
socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))
# Handle worker activity on backend
if socks.get(backend) == zmq.POLLIN:
# Use worker address for LRU routing
frames = backend.recv_multipart()
if not frames:
break
address = frames[0]
workers.ready(Worker(address))
# Validate control message, or return reply to client
msg = frames[1:]
if len(msg) == 1:
if msg[0] not in (PPP_READY, PPP_HEARTBEAT):
print "E: Invalid message from worker: %s" % msg
else:
print ("sending: %s"%msg)
frontend.send_multipart(msg)
# Send heartbeats to idle workers if it's time
if time.time() >= heartbeat_at:
for worker in workers.queue:
msg = [worker, PPP_HEARTBEAT]
backend.send_multipart(msg)
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
if socks.get(frontend) == zmq.POLLIN:
frames = frontend.recv_multipart()
print ("client frames: %s" % frames)
if not frames:
break
frames.insert(0, workers.next())
backend.send_multipart(frames)
workers.purge()
Worker.py
from random import randint
import time
import zmq
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32
# Paranoid Pirate Protocol constants
PPP_READY = "PPP_READY" # Signals worker is ready
PPP_HEARTBEAT = "PPP_HEARTBEAT" # Signals worker heartbeat
def worker_socket(context, poller):
"""Helper function that returns a new configured socket
connected to the Paranoid Pirate queue"""
worker = context.socket(zmq.DEALER) # DEALER
identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt(zmq.IDENTITY, identity)
poller.register(worker, zmq.POLLIN)
worker.connect("tcp://localhost:5556")
worker.send(PPP_READY)
return worker
context = zmq.Context(1)
poller = zmq.Poller()
liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
worker = worker_socket(context, poller)
cycles = 0
while True:
socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))
# Handle worker activity on backend
if socks.get(worker) == zmq.POLLIN:
# Get message
# - 3-part envelope + content -> request
# - 1-part HEARTBEAT -> heartbeat
frames = worker.recv_multipart()
if not frames:
break # Interrupted
if len(frames) == 3:
print "I: Normal reply: ", frames
liveness = HEARTBEAT_LIVENESS
time.sleep(4) # Do some heavy work
worker.send_multipart(frames)
elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
# print "I: Queue heartbeat"
liveness = HEARTBEAT_LIVENESS
else:
print "E: Invalid message: %s" % frames
interval = INTERVAL_INIT
else:
liveness -= 1
if liveness == 0:
print "W: Heartbeat failure, can't reach queue"
print ("W: Reconnecting in")
time.sleep(interval)
if interval < INTERVAL_MAX:
interval *= 2
poller.unregister(worker)
worker.setsockopt(zmq.LINGER, 0)
worker.close()
worker = worker_socket(context, poller)
liveness = HEARTBEAT_LIVENESS
if time.time() > heartbeat_at:
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
#print "I: Worker heartbeat"
worker.send(PPP_HEARTBEAT)
【问题讨论】: