【发布时间】:2020-03-13 23:43:40
【问题描述】:
我目前正在开展一个项目,外部应用通过 MQTT 协议发送来自多个传感器的数据。
我想收集所有这些数据,并将它们发送到外部服务器。我想创建 2 个 MQTT 代理:
- 一个本地(在装有发送数据的应用程序的机器上)
- 远程服务器中的一个
我将在两者之间创建一个网桥。这是我的 MQTT 服务器应用程序 ActiveMQ 提供的一种可能性(我想这是一个常见功能)。
通过这种方式,数据生成应用程序将在本地代理上发布,并通过网桥将相同的数据发布到远程代理上。关键是让应用程序在连接丢失的情况下正常运行。
当我失去代理之间的网络连接时,我在没有连接期间无法获得应用程序生成的数据。您知道是否可以配置网桥以使其按我想要的方式工作?
我是否必须开发一个小程序来监听来自本地代理的所有主题、检测连接丢失并将所有丢失的消息重新发送到远程代理?
我从我的两个代理添加配置文件。我的第一个 ActiveMQ 服务器与我的应用程序在同一台机器上,第二个 ActiveMQ 服务器在同一网络上的另一台机器上。两台计算机完美地相互ping通。
本地经纪人:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.16.100:61616)"/>
</networkConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
</beans>
远程代理:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
</beans>
为了模拟两个代理之间的断开连接,我只是将第二台计算机与网络断开连接。
我在两台计算机上都使用 MQTTBox 来订阅我写的主题。这就是我看到在第二台计算机断开连接期间在本地代理中发送的主题上的数据在我重新连接时没有发布在远程代理的同一主题上。
编辑:新信息
我今天再次尝试了测试,我注意到我的 MQTT 客户端 MQTTBox 上有一个“保留”复选框。 所以:
- 在计算机 A 上,我发布了一条消息,并在主题 /test 上进行了保留检查,而计算机 B 正在监听 /#
- 两台电脑连接后,显然工作正常,我在电脑B上看到了消息。
- 当我断开计算机 B 的连接,发布 2 条带有保留检查的消息然后重新连接计算机 B 时,我只看到我发布的 2 条消息中的最新消息...
更好,但我也想看看另一条消息...如果有人可以帮助我,我迷路了...
我还可以为要发布的消息设置 QoS。我尝试使用 Qos = 0 和 QoS = 1 :同样的事情。
【问题讨论】: