【问题标题】:Reply to Hono command on AMQP device在 AMQP 设备上回复 Hono 命令
【发布时间】:2021-10-31 17:23:55
【问题描述】:

我正在尝试创建一个能够接收来自 hono 的命令并回复它的原型设备。

我已经安装了 hono 1.10.0 并运行以下 python 代码

import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
from hono import tenantId, deviceId, devicePassword, device_uri, biz_app_uri


correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'

print("Business application subscribing for the command reply--------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, "consumer@HONO", "verysecret"))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
# Give it some time to link
time.sleep(5)


print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', f'{deviceId}@{tenantId}', devicePassword))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
# Give it some time to link
time.sleep(2)


print("Business application sending a command------------------------------------------------------------")
msg = Message(
    address=f'command/{tenantId}/{deviceId}',
    reply_to=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    subject="call",
    body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], "consumer@HONO", "verysecret", address=f'command/{tenantId}')).run()
time.sleep(2)


print("Device sending a command response-----------------------------------------------------------------")
resp = Message(
    address=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    properties={
        'status': 200,
        'device_id': deviceId,
        'tenant_id': tenantId
    },
    subject="call",
    body="Hello Alice!"
)
Container(AmqpSender(device_uri, [resp], f'{deviceId}@{tenantId}', devicePassword)).run()
time.sleep(2)


print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
print("Business application stops listening for command responsets---------------------------------------")
cr_container.stop()
cr_thread.join(timeout=5)
print("everything stopped")

我在Difficulty in Sending AMQP 1.0 Message的帮助下,根据我对https://www.eclipse.org/hono/docs/api/command-and-control/https://www.eclipse.org/hono/docs/user-guide/amqp-adapter/#sending-a-response-to-a-command的理解做了这个实现。

目前我似乎并没有那么错,因为设备收到了命令,并且发送消息也没有显示任何错误。然而,在接收端什么都没有到达。澄清一下,AmqpReceiver 实现适用于我侦听遥测数据的场景。因此,如果实现应该是相同的(除了不同的地址),那么这应该不是问题。

我非常相信我对消息中的地址/reply_to 做错了,但我无法确认,因为 hono pod 中的日志没有告诉我任何信息:(

br 阿明

======更新================================

我当前运行的代码如下

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import AtLeastOnce


class Amqp(MessagingHandler):
    def __init__(self, server, address, user, password, options=None):
        super(Amqp, self).__init__()
        self.server = server
        self.address = address
        self.user = user
        self.password = password
        self.options = options
        self.connection = None

    def create_connection(self, event):
        self.connection = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=self.user,
            password=self.password
        )
        print("Connection established")

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")

    def on_link_opened(self, event):
        if event.link.is_sender:
            print("Opened sender link")
        if event.link.is_receiver:
            print("Opened receiver link for source address '{0}'".format(event.receiver.source.address))


class AmqpReceiver(Amqp):
    def __init__(self, server, address, user, password, options=None):
        super(AmqpReceiver, self).__init__(server, address, user, password, options)
        self.server = server
        self.user = user
        self.password = password

    def on_start(self, event):
        self.create_connection(event)
        event.container.create_receiver(context=self.connection, source=self.address, options=self.options)
        print("Receiver created")

    def on_message(self, event):
        print(f'Receiver [{self.address}] got message:')
        print(f'  {event.message.reply_to}')
        print(f'  {event.message.correlation_id}')
        print(f'  {event.message.properties}')
        print(f'  {event.message.subject}')
        print(f'  {event.message.body}')
        #just for test purposes - the device sends imediatelly the reply if a reply_to is given
        if event.message.reply_to is not None:
            reply_to = event.message.reply_to.split('/')
            tenant_id = reply_to[1]
            device_id = reply_to[2]
            resp = Message(
                address=event.message.reply_to,
                correlation_id=event.message.correlation_id,
                content_type="text/plain",
                properties={
                    'status': 200,
                    'tenant_id': tenant_id,
                    'device_id': device_id
                },
                body=f'Reply on {event.message.body}'
            )
            sender = event.container.create_sender(self.connection, None, options=AtLeastOnce())
            sender.send(resp)
            sender.close()
            print("Reply send")


class AmqpSender(Amqp):
    def __init__(self, server, messages, user, password, address=None, options=None):
        super(AmqpSender, self).__init__(server, address, user, password, options)
        self.messages = messages

    def on_start(self, event):
        self.create_connection(event)
        event.container.create_sender(context=self.connection, target=self.address)
        print("Sender created")

    def on_sendable(self, event):
        print("In Msg send")
        for msg in self.messages:
            event.sender.send(msg)
        event.sender.close()
        event.connection.close()
        print("Sender & connection closed")

在测试脚本中我使用如下

from __future__ import print_function, unicode_literals
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver


biz_app_uri = f'amqp://localhost:15672'
device_uri = f'amqp://localhost:5672'
tenantId = 'ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8'
deviceId = 'b932fb15-fdbd-4c12-9ed7-40aaa8763412'

biz_app_user = 'consumer@HONO'
biz_app_pw = 'verysecret'
device_user = f'{deviceId}@{tenantId}'
device_pw = 'my-secret-password'

correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'


print("Business application subscribing for command replies-------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, biz_app_user, biz_app_pw))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
time.sleep(2)

print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', device_user, device_pw))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
time.sleep(2)

print("Business application sending a command------------------------------------------------------------")
msg = Message(
    address=f'command/{tenantId}/{deviceId}',
    reply_to=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    subject="call",
    body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], biz_app_user, biz_app_pw, address=f'command/{tenantId}')).run()

time.sleep(10)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
#print("Business application stops listening ---------------------------------------")
#cr_container.stop()
#cr_thread.join(timeout=5)
#print("everything stopped")

如果我运行该代码示例,我会得到以下日志(见下文),并且代码被卡住,因为命令回复接收器保持打开状态。

登录 hono 调度路由器:

2021-11-14 19:08:29.420176 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:29.429734 +0000 SERVER (info) [C115] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36742
2021-11-14 19:08:29.447479 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:29.448213 +0000 ROUTER (info) [C115] Connection Opened: dir=in host=10.42.0.70:36742 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=a782f51c-9679-41fb-a682-8ea603ccf1ac props=
2021-11-14 19:08:29.448316 +0000 ROUTER_CORE (info) [C115][L123] Link attached: dir=out source={command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/myCorrelationId expire:sess} target={<none> expire:sess}
2021-11-14 19:08:33.423325 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:33.430810 +0000 SERVER (info) [C116] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36868
2021-11-14 19:08:33.445574 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:33.446328 +0000 ROUTER (info) [C116] Connection Opened: dir=in host=10.42.0.70:36868 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=92cb7173-2940-4330-a995-f26eccef0905 props=
2021-11-14 19:08:33.446388 +0000 ROUTER_CORE (info) [C116][L124] Link attached: dir=in source={<none> expire:sess} target={command/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8 expire:sess}
2021-11-14 19:08:33.447762 +0000 ROUTER_CORE (info) [C116][L124] Link detached: del=1 presett=0 psdrop=0 acc=0 rej=0 rel=0 mod=0 delay1=0 delay10=0 blocked=no

登录 amqp 适配器

2021-11-14 19:08:31,511 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Connected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
2021-11-14 19:19:29,875 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Disconnected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null

【问题讨论】:

  • > 我已经安装了 hono 1.10.9 并运行以下 python 代码你能更具体一点吗?您是否使用 Helm 图表部署到 k8s 集群?我还假设您的意思是 1.10.0,对吗?
  • "但是在接收端什么都没有到达。"我假设您的意思是北向应用程序没有收到设备发送的命令响应?
  • 您对 hono 版本 1.10.0 的看法是正确的......我看到的 helm 版本号与应用程序不同
  • @ArminGruber 请将您的代码添加到问题本身,而不是在外部网站上。

标签: python eclipse-hono eclipse-iot


【解决方案1】:

您的设备发送的命令响应似乎包含错误的地址。正如AMQP Adapter User Guide 中所指出的,响应的address 属性必须设置为命令的reply-to 属性的值。该值通常与您的应用程序在命令消息中设置的 reply-to 值不同,因为协议适配器需要将一些附加信息编码到对地址的回复中,以便能够在转发时确定正确的设备 ID下游命令响应。

因此,您需要在您的代码中检查设备端的命令消息并将其reply-to 值用作命令响应的address 值。

除此之外,AMQP 适配器还期望命令响应中的状态属性为 AMQP 1.0 类型 int(一个 32 位有符号整数)。但是,使用您的代码,属性值默认编码为 AMQP 1.0 long(64 位有符号整数)。为了正确编码,您需要从proton._data 导入int32 类,然后将属性值设置为int32(200)。然后适配器接受命令响应并将其转发到下游。

【讨论】:

  • 好的,我明白了,您对 reply_to 修改是正确的。北向应用发送command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/myCorrelationId,设备接收command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/b932fb15-fdbd-4c12-9ed7-40aaa876341但是,即使我用后面的地址值设置地址值,它也不会在接收端结束。我将更新问题中的新代码(+ amqp 部分)
  • 与此同时,我稍微更改了代码,以便将回调函数移交给在 on_message 方法中调用的 AMQP 接收器。此回调将消息获取变量reply_to 值并使用它设置地址值。不幸的是,这并没有改变结果:(
  • 很难跟踪您实际运行的代码。您可以将您的代码放入 gist 或 GitHub 项目中吗?
  • 当然,我对其进行了一些清理和简化,以便于阅读(希望如此)github.com/armin85/hono-test
  • 你好,凯,我在上面评论中提到的存储库中更新了一点代码。你能给我一个提示,从 hono log perspektiv 看,就我所见,事情看起来很好。我相应地更新了问题
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-04-27
  • 1970-01-01
  • 1970-01-01
  • 2018-06-22
相关资源
最近更新 更多