【问题标题】:Spring MVC + Mosquitto + MQTT Integration can't get any messageSpring MVC + Mosquitto + MQTT 集成收不到任何消息
【发布时间】:2015-05-29 18:52:11
【问题描述】:

使用 Spring 的集成库,我正在尝试连接到 mosquitto 并读取/发送消息...但是有些事情我无法弄清楚。

1 - 启动应用程序时,应用程序连接到 mosquitto,但 mosquitto 在几秒钟内再次收到来自同一个应用程序的数百个连接请求。这是日志的例子:

New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.
New client connected from 127.0.0.1 as springClient (c1, k60).
Sending CONNACK to springClient (0, 0)
Received SUBSCRIBE from springClient
    0001/001/INF (QoS 1)
springClient 1 0001/001/INF
Sending SUBACK to springClient
New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.

2 - 使用此配置,我无法从 mosquitto 收到任何消息:

春季 XML :

<!-- This is for reading messages -->
<bean id="mqttInbound" class="com.mobistech.drc.m2mproject.mqtt.MqttCustomInboundAdapter">
    <beans:constructor-arg name="clientId" value="springClient" />
    <beans:constructor-arg name="clientFactory" ref="clientFactory" />
    <beans:constructor-arg name="topic" value="0001/001/INF" />
    <beans:property name="autoStartup" value="true"></beans:property>
    <beans:property name="outputChannel" ref="fromBrokerChannel"></beans:property>
</bean>

 <int:channel  id="fromBrokerChannel" />

自定义适配器:

public class MqttCustomInboundAdapter extends MqttPahoMessageDrivenChannelAdapter {

    public MqttCustomInboundAdapter(String clientId,
            MqttPahoClientFactory clientFactory, String[] topic) {
        super(clientId, clientFactory, topic);
        // TODO Auto-generated constructor stub
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception
    {
        super.messageArrived(topic, message);
        System.out.println("**************** Message from topic : " + topic);
        System.out.println("**************** Message : " + new String(message.getPayload()));
    }

    public void addTopicIfNotExists(String topic)
    {
        for(String topicName:getTopic())
        {
            if(topicName.equals(topic))return;
        }

        addTopic(topic);

        System.out.println("************* Added Topic : " + topic);

        for(String topicName:getTopic())
        {
            System.out.println(topicName);
        }
    }
}

我没有使用 service-activator,因为我需要知道到达的消息是从哪个主题发送的,所以我已经包装了 MqttPahoMessageDrivenChannelAdapter,正如它在 Spring Integration Docs 中提到的那样

那么有什么建议吗?

【问题讨论】:

    标签: java spring-mvc mqtt mosquitto paho


    【解决方案1】:

    我设法用 java config 配置 mqtt

    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
    
        MqttPahoMessageDrivenChannelAdapter mqtt = new MqttPahoMessageDrivenChannelAdapter( applicationName + "-sub", clientFactory( ), "/#" );
        mqtt.setQos( 2 );
        mqtt.setOutputChannel( outbount( ) );
        mqtt.setAutoStartup( true );
        mqtt.setTaskScheduler( taskScheduler( ) );
    
        return mqtt;
    }
    
    @Bean
    public MqttPahoMessageHandler mqqtMessageHandler() {
    
        return new MqttPahoMessageHandler( applicationName + "-pub", clientFactory( ) );
    }
    
    @Bean
    public DefaultMqttPahoClientFactory clientFactory() {
    
        DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory( );
        clientFactory.setUserName( "test" );
        clientFactory.setPassword( "test" );
        clientFactory.setServerURIs( new String[] { "tcp://url:1883" } );
        return clientFactory;
    }
    
    @Bean
    public PublishSubscribeChannel outbount() {
    
        PublishSubscribeChannel psc = new PublishSubscribeChannel( );
        psc.subscribe( new MessageHandler( ) {
    
            @Override
            public void handleMessage( Message<?> message ) throws MessagingException {
    
                logger.warn( message );
    
            }
        } );
    
        return psc;
    }
    

    要发送消息,请添加以下内容:

    @Autowired
    MqttPahoMessageHandler mqtt;
    
    @RequestMapping( "/" )
    public ModelAndView getHomePage() throws MqttPersistenceException, MqttException {
    
        Message<String> message = MessageBuilder.withPayload( "spring - test" ).setHeader( MqttHeaders.TOPIC, "/topic" ).build( );
    
        mqtt.handleMessage( message );
    
        return new ModelAndView( "home" );
    }   
    

    【讨论】:

      【解决方案2】:

      我找到了。在做了一些研究之后,我决定使用 service-activator 来激活服务(这很明显)。在此之后我可以收到消息。

      关于蚊子的奇怪行为,我发现这与蚊子无关。当 MqttCustomInboundAdapter 的 autoStartup 属性设置为 true 时,应用程序发送的连接请求过多。这就是 mosquitto 获取此连接请求并尝试将它们一一连接导致旧连接断开的原因。

      新的 XML 如下所示:

       <bean id="mqttInbound" class="com.mobistech.drc.m2mproject.mqtt.MqttCustomInboundAdapter">
          <beans:constructor-arg name="clientId" value="springClient" />
          <beans:constructor-arg name="clientFactory" ref="clientFactory" />
          <beans:constructor-arg name="topic" value="0001/001/INF" />
          <beans:property name="autoStartup" value="false"></beans:property>
          <beans:property name="outputChannel" ref="fromBrokerChannel"></beans:property>
          <beans:property name="converter" ref="mqttMessageConverter"></beans:property>
      </bean>
      
       <int:channel id="fromBrokerChannel" />
      <int:service-activator input-channel="fromBrokerChannel" ref="mqttServiceActivator" ></int:service-activator>
      

      现在我可以收到消息了...

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2022-01-01
        • 1970-01-01
        • 2019-12-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多