【问题标题】:AWS ActiveMQ fetch messages from a Consumer and send to a queueAWS ActiveMQ 从消费者获取消息并发送到队列
【发布时间】:2021-11-25 11:49:01
【问题描述】:

我正在尝试从消费者那里获取消息并将其发送到队列。为此,我使用Stomp.py 在浏览了文章和帖子后,我编写了以下代码:

import ssl
import stomp

stompurl = "xxxxxxxx.mq.us-west-2.amazonaws.com"
stompuser = "stomuser"
stomppass = "password"


class MyListener(stomp.ConnectionListener):
    msg_list = []

    def __init__(self):
        self.msg_list = []

    def on_error(self, frame):
        self.msg_list.append('(ERROR) ' + frame.body)

    def on_message(self, frame):
        self.msg_list.append(frame.body)


conn = stomp.Connection(host_and_ports=[(stompurl, "61614")], auto_decode=True)
conn.set_ssl(for_hosts=[(stompurl, "61614")], ssl_version=ssl.PROTOCOL_TLS)
lst = MyListener()
listener = conn.set_listener('', lst)
conn.connect(stompuser, stomppass, wait=True)
# conn.send(body='Test message', destination='Test_QUEUE')
conn.subscribe('Test_QUEUE', '102')
print(listener.message_list)
import time; time.sleep(2)
messages = lst.msg_list
# conn.disconnect()
print(messages)

使用此代码,我可以向Test_QUEUE 发送消息,但我无法从消费者那里获取所有消息。如何从消费者那里提取所有消息并发布到队列进行处理。

【问题讨论】:

  • 对我的回答有任何反馈吗?

标签: python activemq stomp stomp.py


【解决方案1】:

我不是 Python + STOMP 专家,但是在您创建异步(即非阻塞)消息侦听器时,我使用过的所有其他语言都必须阻止您的应用程序退出。你有一个time.sleep(2),但实际上是否有足够的时间从队列中获取所有消息?

看起来您的应用程序将在print(messages) 之后退出,这意味着如果您在time.sleep(2) 期间没有收到所有消息,那么您的应用程序将直接终止。

【讨论】:

    猜你喜欢
    • 2012-08-11
    • 2013-08-01
    • 2015-02-02
    • 1970-01-01
    • 2023-04-07
    • 2014-06-27
    • 1970-01-01
    • 2013-09-08
    • 2017-11-06
    相关资源
    最近更新 更多