【问题标题】:Send client requests to broker only when a worker is available and broker is available仅当工作人员可用且代理可用时才向代理发送客户端请求
【发布时间】:2016-08-07 19:46:33
【问题描述】:

ZeroMQ 用于分布式消息传递,我正在使用为 python 中的偏执海盗模式提供的示例代码。我有一个客户(可能有更多客户)一个经纪人和多个工人

我已经修改了示例,即使没有可用的工作人员,客户端也会继续向代理队列发送请求,而不是重试并最终退出。当它们最终可用时,它们将被分发给工作人员。在我的场景中,每个工作人员处理给定请求所需的时间各不相同。

我看到的问题是,当代理出现故障(变得不可用)时,客户端无法判断代理不可用,它会继续发送 .send() 请求。这些请求会丢失。在代理再次可用后,仅处理新请求。

Client.py

from random import randint
import time

import zmq

HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32

#  Paranoid Pirate Protocol constants
PPP_READY = "PPP_READY"      # Signals worker is ready
PPP_HEARTBEAT = "PPP_HEARTBEAT"  # Signals worker heartbeat

def worker_socket(context, poller):
    """Helper function that returns a new configured socket
       connected to the Paranoid Pirate queue"""
    worker = context.socket(zmq.DEALER) # DEALER
    identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
    worker.setsockopt(zmq.IDENTITY, identity)
    poller.register(worker, zmq.POLLIN)
    worker.connect("tcp://localhost:5556")
    worker.send(PPP_READY)
    return worker

context = zmq.Context(1)
poller = zmq.Poller()

liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT

heartbeat_at = time.time() + HEARTBEAT_INTERVAL

worker = worker_socket(context, poller)
cycles = 0
while True:
    socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))

    # Handle worker activity on backend
    if socks.get(worker) == zmq.POLLIN:
        #  Get message
        #  - 3-part envelope + content -> request
        #  - 1-part HEARTBEAT -> heartbeat
        frames = worker.recv_multipart()
        if not frames:
            break # Interrupted

        if len(frames) == 3:
            print "I: Normal reply: ", frames
            liveness = HEARTBEAT_LIVENESS
            time.sleep(4)  # Do some heavy work
            worker.send_multipart(frames)
        elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
           # print "I: Queue heartbeat"
            liveness = HEARTBEAT_LIVENESS
        else:
            print "E: Invalid message: %s" % frames
        interval = INTERVAL_INIT
    else:
        liveness -= 1
        if liveness == 0:
            print "W: Heartbeat failure, can't reach queue"
            print ("W: Reconnecting in")
            time.sleep(interval)

            if interval < INTERVAL_MAX:
                interval *= 2
            poller.unregister(worker)
            worker.setsockopt(zmq.LINGER, 0)
            worker.close()
            worker = worker_socket(context, poller)
            liveness = HEARTBEAT_LIVENESS
    if time.time() > heartbeat_at:
        heartbeat_at = time.time() + HEARTBEAT_INTERVAL
        #print "I: Worker heartbeat"
        worker.send(PPP_HEARTBEAT)

Broker.py

from collections import OrderedDict
import time
import threading
import zmq

HEARTBEAT_LIVENESS = 3     # 3..5 is reasonable
HEARTBEAT_INTERVAL = 1.0   # Seconds

#  Paranoid Pirate Protocol constants
PPP_READY = "PPP_READY"      # Signals worker is ready
PPP_HEARTBEAT = "PPP_HEARTBEAT"  # Signals worker heartbeat
PPP_BUSY = "PPP_BUSY"
PPP_FREE = "PPP_FREE"

class Worker(object):
    def __init__(self, address):
        self.address = address
        self.expiry = time.time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

class WorkerQueue(object):
    def __init__(self):
        self.queue = OrderedDict()

    def ready(self, worker):
        self.queue.pop(worker.address, None)
        self.queue[worker.address] = worker

    def purge(self):
        """Look for & kill expired workers."""
        t = time.time()
        expired = []
        for address,worker in self.queue.iteritems():
            if t > worker.expiry:  # Worker expired
                expired.append(address)
        for address in expired:
            print "W: Idle worker expired: %s" % address
            self.queue.pop(address, None)

    def next(self):
        address, worker = self.queue.popitem(False)
        return address

context = zmq.Context(1)
clcontext = zmq.Context()

frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER)  # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556")  # For workers

poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)

poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)

workers = WorkerQueue()

heartbeat_at = time.time() + HEARTBEAT_INTERVAL

while True:
    if len(workers.queue) > 0:
        poller = poll_both
    else:
        poller = poll_workers

    socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))

    # Handle worker activity on backend
    if socks.get(backend) == zmq.POLLIN:
        # Use worker address for LRU routing
        frames = backend.recv_multipart()
        if not frames:
            break

        address = frames[0]
        workers.ready(Worker(address))

        # Validate control message, or return reply to client
        msg = frames[1:]
        if len(msg) == 1:
            if msg[0] not in (PPP_READY, PPP_HEARTBEAT):
                print "E: Invalid message from worker: %s" % msg
        else:
            print ("sending: %s"%msg)
            frontend.send_multipart(msg)

        # Send heartbeats to idle workers if it's time
        if time.time() >= heartbeat_at:
            for worker in workers.queue:
                msg = [worker, PPP_HEARTBEAT]
                backend.send_multipart(msg)
            heartbeat_at = time.time() + HEARTBEAT_INTERVAL

    if socks.get(frontend) == zmq.POLLIN:
        frames = frontend.recv_multipart()
        print ("client frames: %s" % frames)
        if not frames:
            break

        frames.insert(0, workers.next())
        backend.send_multipart(frames)

    workers.purge()

Worker.py

from random import randint
import time

import zmq

HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32

#  Paranoid Pirate Protocol constants
PPP_READY = "PPP_READY"      # Signals worker is ready
PPP_HEARTBEAT = "PPP_HEARTBEAT"  # Signals worker heartbeat

def worker_socket(context, poller):
    """Helper function that returns a new configured socket
       connected to the Paranoid Pirate queue"""
    worker = context.socket(zmq.DEALER) # DEALER
    identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
    worker.setsockopt(zmq.IDENTITY, identity)
    poller.register(worker, zmq.POLLIN)
    worker.connect("tcp://localhost:5556")
    worker.send(PPP_READY)
    return worker

context = zmq.Context(1)
poller = zmq.Poller()

liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT

heartbeat_at = time.time() + HEARTBEAT_INTERVAL

worker = worker_socket(context, poller)
cycles = 0
while True:
    socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))

    # Handle worker activity on backend
    if socks.get(worker) == zmq.POLLIN:
        #  Get message
        #  - 3-part envelope + content -> request
        #  - 1-part HEARTBEAT -> heartbeat
        frames = worker.recv_multipart()
        if not frames:
            break # Interrupted

        if len(frames) == 3:
            print "I: Normal reply: ", frames
            liveness = HEARTBEAT_LIVENESS
            time.sleep(4)  # Do some heavy work
            worker.send_multipart(frames)
        elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
           # print "I: Queue heartbeat"
            liveness = HEARTBEAT_LIVENESS
        else:
            print "E: Invalid message: %s" % frames
        interval = INTERVAL_INIT
    else:
        liveness -= 1
        if liveness == 0:
            print "W: Heartbeat failure, can't reach queue"
            print ("W: Reconnecting in")
            time.sleep(interval)

            if interval < INTERVAL_MAX:
                interval *= 2
            poller.unregister(worker)
            worker.setsockopt(zmq.LINGER, 0)
            worker.close()
            worker = worker_socket(context, poller)
            liveness = HEARTBEAT_LIVENESS
    if time.time() > heartbeat_at:
        heartbeat_at = time.time() + HEARTBEAT_INTERVAL
        #print "I: Worker heartbeat"
        worker.send(PPP_HEARTBEAT)

【问题讨论】:

    标签: python zeromq pyzmq


    【解决方案1】:

    向控制层添加 HeartBeat 信号

    如果有唯一的

    (cit.:) "problem 我看到的是,当 broker 宕机(变得不可用)时,客户端是无法判断代理不可用,它会继续发送请求。”

    只需添加一个微不足道的信号(无论是 PUB/SUB 还是其他原型),它可以使用 .setsockopt( zmq.CONFLATE ) 使用更新的 API 保留最后一个带有时间戳的存在,或者有一些更强大的双边握手,允许发送者根据其向消费者进程发送下一条消息的意愿获得一个合理的假设,即所有相关方都适合并处于允许达到目标功能的状态。

    ZeroMQ 在这个意义上是非常棒的,它可以帮助将智能信令和消息处理服务集成到一个完全分布式的有限状态自动机或状态合作的 FSA-s 网络中,如果有的话希望,在异构分布式系统中迈出真正非凡的一步(队列是一些非核心,以智能、快速和可扩展的方式执行此操作的额外方法)。

    人们可能很快就会意识到分布式互锁的额外风险,必须将其纳入专业的系统设计和验证中。这类新问题可能而且确实出现在使用REQ/REP 的简单用例中,通过容易出错的消息传递,以及以更复杂的方式出现在更复杂的分布式信令/消息传递 FSA 网络中。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-09-24
      • 2016-07-31
      • 1970-01-01
      • 2017-11-18
      • 1970-01-01
      • 2017-10-03
      • 2020-05-06
      相关资源
      最近更新 更多