【问题标题】:Receive a message with RabbitMQ then process it then send back the results使用 RabbitMQ 接收消息然后处理它然后发回结果
【发布时间】:2016-06-27 15:49:53
【问题描述】:

我想(直接)从脚本发送消息,然后对其进行处理,然后发回结果。 所以这就像一个双重发布-订阅。

我有 2 个脚本:

  • 处理器
  • 客户

客户端直接向处理器发送消息(简单字符串),然后处理器脚本计算字符串中的字符并将结果发送回客户端。

这就是我尝试做的:

处理器等待消息,计算一些东西,然后回复原始发送者。

#Processer.py:
import pika
import sys

#Sends back the score
#addr: Connection address
#exchName: Exchange name (where to send)
#rKey: Name of the queue for direct messages
#score: The detected score
def SendActualScore(addr, exchName, rKey, score):
    #Send the image thru the created channel with the given routing key (queue name)
    channel.basic_publish(exchange=exchName, routing_key=rKey, body=score)
    print "(*) Sent: " + score

#When we receive something this is called
def CallbackImg(ch, method, properties, body):
    print "(*) Received: " + str(body)
    score = str(len(body))
    #Send back the score
    SendActualScore('localhost', 'valami', rKey, score)


#Subscribe connection
#Receive messages thru this
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#RECEIVE MESSAGES - Subscribe
channel.exchange_declare(exchange='valami', type='direct')
#Define a queue, where we don't need the name
#After we disconnected delete the queue (exclusive flag)
result = channel.queue_declare(exclusive=True)
#We need the name of our temporary queue
queue_name = result.method.queue

rKeys = sys.argv[1:]
for rKey in rKeys:
    channel.queue_bind(exchange='valami', queue=queue_name, routing_key = rKey)

channel.basic_consume(CallbackImg, queue=queue_name, no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

客户端只是发送消息,然后等待答复。

#Client.py:
import pika
import sys

connAddr = 'localhost'

#Establish connection
connection = pika.BlockingConnection(pika.ConnectionParameters(connAddr))
channel = connection.channel()

#Define an exchange channel, we don't need a queue
channel.exchange_declare(exchange='valami', type='direct')

#Send the image thru the created channel
channel.basic_publish(exchange='valami', routing_key='msg', body='Message in the body')

print "[*] Sent"

def Callback(ch, method, properties, body):
    print "(*) Received: " + str(body)

result = channel.queue_declare(exclusive=True)
#We need the name of our temporary queue
queue_name = result.method.queue

channel.queue_bind(exchange='valami', queue=queue_name)

channel.basic_consume(Callback, queue=queue_name, no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

可能有多个客户,我不知道如何将消息直接发回给他们。

【问题讨论】:

    标签: python rabbitmq


    【解决方案1】:

    您是否查看过使用 python 和 pika 在 RabbitMQ 中使用 RPC 的教程? http://www.rabbitmq.com/tutorials/tutorial-six-python.html


    您需要在客户端中执行的操作的要点可在 RPC 教程中找到,但有一些修改。

    在您的客户端中,您需要创建一个独占队列 - 与您在服务器中所做的方式相同。

    从客户端发送消息时,需要将reply_to设置为客户端独占队列的名称

    来自教程:

    channel.basic_publish(exchange='',
                          routing_key='rpc_queue',
                          properties=pika.BasicProperties(
                                reply_to = callback_queue,
                                ),
                          body=request)
    

    在服务器上,当您收到一条消息时,您需要从消息中读取reply_to 标头,然后读取basic_publish 对该队列的回复。


    与其考虑“客户端”和“服务器”,不如根据“消息生产者”和“消息消费者”来构建它可能会有所帮助。

    在您的场景中,您的两个流程都需要既是发布者又是消费者。 “客户端”将发布原始消息并使用响应。 “服务器”将使用原始消息并发布响应。

    您的代码中唯一真正的区别是在原始消息上使用了reply_to 标头。这是您应该向其发布响应的队列的名称。

    希望有帮助!


    附:我在我的RabbitMQ Patterns 电子书中介绍了这一点的核心大纲 - RPC 和请求/回复,就像你需要的那样。这本书讲的是原理和模式,而不是具体的编程语言(虽然我主要写的是 node.js,并不真正了解 python)。

    【讨论】:

    • 是的,但我需要使用 pub-sub 来完成这项工作。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-11-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多