【问题标题】:Calling more than one functions as RABBITMQ message调用多个函数作为 RABBITMQ 消息
【发布时间】:2020-02-19 19:43:06
【问题描述】:

我刚开始使用 RabbitMQ 和 Python。我一直在阅读兔子官方页面上的 tuts,但我不知道如何使用 Rabbitmq 做其他事情。

我一直在尝试运行这个 [tutorial] (https://www.rabbitmq.com/tutorials/tutorial-three-python.html) 的示例,它运行良好,.. 但我需要知道如何创建多个函数并通过 Rabbitmq Messages 调用它们?.. .(我也在用这个[例子](Python and RabbitMQ - Best way to listen to consume events from multiple channels?)来指导我。)

我希望有人知道如何做到这一点...(我会再重复一遍,我对这个主题很陌生)...

这是我拥有的一些代码。

我使用此代码作为教程发送消息..

import pika
import sys

url = 'amqp://oamogcgg:xxxxxxxxxxxxxxxxxxxxxxxxx@salamander.rmq.cloudamqp.com/oamogcgg'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()


channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: THIS IS A TEST MESSAGE !!!!!!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

在这个文件中是我收到消息的地方。

import pika
import sys
import threading

threads=[]


#function 1 

def validator1(channel):    
channel.queue_declare(queue='queue_name')
print (' [*] Waiting messsaes for valiadtor1 press CTRL+C')


def callback(ch, method, properties, body):
   print (" Received %s" % (body))
   sleep(2) #I need stop it for two minutes

channel.basic_consume(callback, queue='queue_name', no_ack=True)
channel.start_consuming()

#function 2

def validator2(channel):    
channel.queue_declare(queue='queue_name')
print (' [*] Waiting messsaes for valiadtor2 press CTRL+C')


def callback(ch, method, properties, body):
  print (" Received %s" % (body))
  sleep(2) #I need stop it for two minutes

channel.basic_consume(callback, queue='queue_name', no_ack=True)
channel.start_consuming()



def manager():
 url = 'amqp://oamogcgg:xxxxxxxxxxxxxxxxxxxxxxxxx@salamander.rmq.cloudamqp.com/oamogcgg'
 params = pika.URLParameters(url)

#channel 1
 connection1= pika.BlockingConnection(params)
 channel1 = connection1.channel()

 channel1.exchange_declare(exchange='logs', exchange_type='fanout')
 result = channel1.queue_declare(queue='', exclusive=True)
 queue_name = result.method.queue
 channel1.queue_bind(exchange='logs', queue=queue_name)

#channel 2

 connection2= pika.BlockingConnection(params)
 channel2 = connection2.channel()


channel2.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel2.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel2.queue_bind(exchange='logs', queue=queue_name)



#creating threads

t1 = threading.Thread(target=validator1, args=(channel1,))
t1.daemon = True
threads.append(t1)
t1.start()  

t2 = threading.Thread(target=valiadtor2, args=(channel2,))
t2.daemon = True
threads.append(t2)


t2.start()
for t in threads:
    t.join()


manager()

【问题讨论】:

  • BUT I need to know how can I create more than one functions and call them through Rabbitmq Messages 是什么意思?您想在同一条消息上调用多个函数还是要为同一个队列调用多个消费者?你能把你的场景解释清楚一点吗?
  • 是的,我需要对一些消息调用多个函数,可以吗?..

标签: python function rabbitmq queue pika


【解决方案1】:

如果您的函数是依赖的(我的意思是,一个函数作用于另一个函数的输出),那么您可以在回调函数中一一调用所有这些函数。成功执行最后一个函数后,您可以确认该消息。

如果这些函数是独立的,那么您可以为要在消息上执行的每个函数维护多个队列。可以使用fanout 交换将同一消息路由到多个队列,如RabbitMQ Tutorial-3 中所述。

【讨论】:

  • 谢谢,我尝试在另一个文件中创建我的函数,然后我只在回调函数中导入它们,当它们都成功执行后,我可以确认消息....我工作得很好.....但是现在我想做的是在收到该消息时将相同的确认消息打印到我的 producer.py 文件中,以确保该消息已被receive.py 接收....您有什么想法如何做到这一点?
  • 你的意思是,当消费者肯定地确认消息时,你想打印来自生产者的消息?
  • 我想知道当消费者收到消息时,如何在生产者中打印一些消息??或者我的生产者如何知道消费者在收到消息时发送的 ack?
  • 生产者和消费者是两个不同的东西。生产者只是将消息发布到队列中。消费者消费队列中的消息。所有生产者都关心消息是否成功路由到队列。这同样适用于消费者在消费时。如果你想让你的生产者知道消费者发送的 ack,那么你需要实现 RPC 样式。教程中也提供了 rabbitmq.com/tutorials/tutorial-six-python.html
  • 谢谢,我已经开始阅读 RPC。我认为这将是我需要实施的解决方案
猜你喜欢
  • 2012-02-27
  • 2013-06-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-11-25
  • 2014-06-05
  • 1970-01-01
  • 2016-04-05
相关资源
最近更新 更多