【问题标题】:MQTT over WebSockets using Netty?使用 Netty 通过 WebSockets 进行 MQTT?
【发布时间】:2023-03-05 11:31:01
【问题描述】:

我想通过 Websockets 使用 MQTT。在 Netty 中使用 Websockets 非常简单:

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("handler", new WebSocketServerHandler());

我找到了基于 Netty 的 MQTT 代理(moquette)。

 NettyMQTTHandler handler = new NettyMQTTHandler();
 ServerBootstrap b = new ServerBootstrap();
            b.group(m_bossGroup, m_workerGroup)
             .channel(NioServerSocketChannel.class) 
             .childHandler(new ChannelInitializer<SocketChannel>() { 
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //pipeline.addFirst("metrics", new BytesMetricsHandler(m_metricsCollector));
                    pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT));
                    pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimoutHandler());
                    //pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
                    pipeline.addLast("decoder", new MQTTDecoder());
                    pipeline.addLast("encoder", new MQTTEncoder());
                    pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
                    pipeline.addLast("handler", handler);
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)
             .option(ChannelOption.SO_REUSEADDR, true)
             .childOption(ChannelOption.SO_KEEPALIVE, true); 

所以理论上我应该能够通过 Websocket 发送 MQTT,但我不知道是否可以使用 Netty?有没有人有任何线索或想法如何做到这一点?我应该使用 MessageToMessageCodec 和 BinaryWebSocketFrame 吗?

干杯!

【问题讨论】:

    标签: java websocket netty mqtt


    【解决方案1】:

    假设您的 MQTTDecoder 使用 ByteBufs 并产生一些 MQTT 消息对象,而 MQTTEncoder 则相反,通常是这种情况。

    那么,您的编解码器使用的ByteBufs 不是Web Socket 消息。它们需要成为 Web Socket 帧的有效负载。我会在管道中插入以下处理程序:

    • MessageToMessageDecoder 将 WebSocket 文本(或二进制)帧转换为ByteBuf,以便MQTTDecoder 可以使用它。转换应该非常简单 - 只需获取 Web Socket 框架的内容即可。
    • MessageToMessageEncoderByteBuf 转换为 Web Socket 文本(或二进制)帧,以便 Netty 的 WebSocketFrameEncoder 可以使用它。转换也应该非常简单——只需将MQTTEncoder 编码的ByteBuf 与Web Socket 框架对象包装起来即可。

    生成的管道将如下所示:

    1. HttpResponseEncoder
    2. HttpRequestDecoder
    3. HttpObjectAggregator(65536)
    4. WebSocketServerProtocolHandler("/your-websocket-endpoint-path")
    5. WebSocketFrameToByteBufDecoder 扩展 MessageToMessageDecoder
    6. ByteBufToWebSocketFrameEncoder 扩展 MessageToMessageEncoder
    7. MQTTEncoder
    8. MQTTDecoder
    9. MessageMetricsHandler
    10. handler

    WebSocketServerProtocolHandler 将与您的 Web 套接字客户端执行必要的握手,并在 WebSocketFrameToByteBufDecoder 之前插入 WebSocketFrameEncoderWebSocketFrameDecoder。成功握手后生成的管道将如下所示:

    1. WebSocketFrameEncoder
    2. WebSocketFrameDecoder
    3. WebSocketFrameToByteBufDecoder 扩展 MessageToMessageDecoder
    4. ByteBufToWebSocketFrameEncoder 扩展 MessageToMessageEncoder
    5. MQTTEncoder
    6. MQTTDecoder
    7. MessageMetricsHandler
    8. handler

    【讨论】:

    • 感谢提供线索。我只是将 MQTTDecoder 和 MQTTEncoder 更改为 MessageToMessageDecoder / MessageToMessageEncoder,它就像一个魅力 ;-) 代码:code.google.com/p/moquette-mqtt/issues/…
    • 是的,如果您可以完全控制编解码器的实现,您甚至可以这样做。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-04
    • 2013-05-15
    • 2012-11-02
    • 1970-01-01
    • 2014-11-30
    • 2013-06-26
    相关资源
    最近更新 更多